Files
PawletOS-Build/environ.py
2026-03-20 05:16:13 -07:00

499 lines
19 KiB
Python

import os
import sys
import subprocess
import argparse
from pathlib import Path
import yaml
import requests
# Import the patch handler
from scripts.better_patch import GitPatchHandler
# Helper functions
def run_command(command, cwd=None, shell=False):
"""Run a shell command in a subprocess."""
print(f"Running command: {' '.join(command) if isinstance(command, list) else command}")
result = subprocess.run(command, cwd=cwd, capture_output=True, text=True, shell=shell)
if result.returncode != 0:
print(f"Error running command: {' '.join(command) if isinstance(command, list) else command}")
print(result.stderr)
raise subprocess.CalledProcessError(result.returncode, command)
print(result.stdout)
return result
def run_command_realtime(command, cwd=None, shell=False):
"""Run a shell command in a subprocess with real-time output."""
print(f"Running command: {' '.join(command) if isinstance(command, list) else command}")
# For repo commands, use direct terminal output to preserve progress bars
if isinstance(command, list) and "repo" in command[0]:
# Direct passthrough to terminal for repo commands
result = subprocess.run(command, cwd=cwd, shell=shell)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, command)
return result.returncode
else:
# For other commands, use the buffered approach
process = subprocess.Popen(
command,
cwd=cwd,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# Print output in real-time
for line in process.stdout:
print(line, end='', flush=True)
# Wait for process to complete and get return code
return_code = process.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, command)
return return_code
def load_repo_config(config_file="repo.yml"):
"""Load repository configuration from YAML file."""
if not os.path.exists(config_file):
print(f"Warning: Repo config file {config_file} not found")
return None
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
return config
def repo_init(source_dir, config):
"""Initialize repo with configuration."""
if not config or 'repo' not in config:
print("No repo configuration found")
sys.exit(1)
repo_config = config['repo']
# Build repo init command
init_cmd = ["repo", "init"]
# Add repo URL
if 'url' in repo_config:
init_cmd.extend(["-u", repo_config['url']])
else:
print("Error: No repo URL specified in config")
return
# Add branch if specified
if 'branch' in repo_config:
init_cmd.extend(["-b", repo_config['branch']])
# Add other options
if 'options' in repo_config:
for option in repo_config['options']:
init_cmd.append(option)
print("Initializing repo...")
run_command_realtime(init_cmd, cwd=source_dir)
# Download local manifests if specified
if 'local_manifests' in repo_config:
download_local_manifests(source_dir, repo_config['local_manifests'])
def download_local_manifests(source_dir, manifests):
"""Download local manifest files using requests."""
manifests_dir = os.path.join(source_dir, ".repo", "local_manifests")
os.makedirs(manifests_dir, exist_ok=True)
for manifest_name, manifest_url in manifests.items():
manifest_path = os.path.join(manifests_dir, f"{manifest_name}.xml")
print(f"Downloading local manifest: {manifest_name} from {manifest_url}")
try:
response = requests.get(manifest_url, timeout=30)
response.raise_for_status()
with open(manifest_path, 'w') as f:
f.write(response.text)
print(f"Successfully downloaded {manifest_name}.xml")
except requests.exceptions.RequestException as e:
print(f"Failed to download {manifest_name}.xml: {e}")
def setup_git_lfs(source_dir):
"""Set up Git LFS in the repository."""
print("Setting up Git LFS...")
try:
# Check if git-lfs is installed
run_command(["git", "lfs", "version"], cwd=source_dir)
except (subprocess.CalledProcessError, FileNotFoundError):
print("Warning: git-lfs is not installed. Skipping Git LFS setup.")
return False
try:
# Initialize Git LFS in the repo
run_command(["git", "lfs", "install"], cwd=source_dir)
# Pull LFS objects for all projects
print("Pulling Git LFS objects...")
run_command_realtime(["repo", "forall", "-c", "git lfs pull"], cwd=source_dir)
print("Git LFS setup completed successfully!")
return True
except subprocess.CalledProcessError as e:
print(f"Warning: Git LFS setup failed: {e}")
return False
def find_patches(patch_dirs, mappings=None):
"""Find all patch files recursively in the patch directories.
Returns a list of (patch_path, target_subdir) tuples where target_subdir
is the AOSP project path the patch should be applied into.
mappings (optional dict) translates patch subdirectory paths to AOSP paths,
e.g. {"android/36": "frameworks/base"}. Without a mapping the subdirectory
structure must mirror the AOSP project layout directly:
patches/
frameworks/base/0001-sig-spoof.patch -> <source>/frameworks/base/
0001-root-level.patch -> <source>/
With mappings from repo.yml:
patches/
android/36/android_frameworks_base-*.patch -> <source>/frameworks/base/
"""
if mappings is None:
mappings = {}
patches = []
seen_names = set()
for patch_dir in patch_dirs:
patch_dir_path = Path(patch_dir)
if not patch_dir_path.exists():
print(f"Warning: Patch directory does not exist: {patch_dir}")
continue
found = sorted(patch_dir_path.rglob("*.patch"))
print(f"Found {len(found)} patches in {patch_dir} (recursive)")
for patch_file in found:
if patch_file.name in seen_names:
print(f" Skipping duplicate patch name: {patch_file.name}")
continue
seen_names.add(patch_file.name)
rel_parent = patch_file.parent.relative_to(patch_dir_path)
rel_str = str(rel_parent).replace("\\", "/") # normalise on Windows
target_subdir = rel_str if rel_str != "." else "."
# Apply mapping if one is configured for this subdirectory
if target_subdir in mappings:
target_subdir = mappings[target_subdir]
patches.append((patch_file, target_subdir))
return patches
def apply_patch_legacy(patch_file, source_dir, patch_level=1, force=False):
"""Apply a single patch file using legacy patch command."""
print(f"Applying patch: {patch_file} to {source_dir} with -p{patch_level}")
try:
run_command(["patch", f"-p{patch_level}", "-i", str(patch_file)], cwd=source_dir)
except subprocess.CalledProcessError as e:
if force:
print(f"Patch failed but forcing continue: {patch_file}")
# Try to reverse any partially applied patch
try:
run_command(["patch", f"-p{patch_level}", "-R", "-i", str(patch_file)], cwd=source_dir)
except:
print(f"Could not reverse failed patch: {patch_file}")
else:
raise
def apply_patches_robust(patches, source_dir, patch_level=1, force=False, ignore_patches=None, verbose=False):
"""Apply patches using the robust patch handler.
patches is a list of (patch_path, target_subdir) tuples produced by find_patches().
Each patch is applied inside <source_dir>/<target_subdir>, so patches can be
organised into subdirectories that mirror the AOSP project layout.
"""
if ignore_patches is None:
ignore_patches = []
# Filter ignored patches (match by filename)
patches_to_apply = [(p, sub) for p, sub in patches if p.name not in ignore_patches]
if not patches_to_apply:
print("No patches to apply after filtering")
return True, []
print(f"Using robust patch handler for {len(patches_to_apply)} patches")
# Group patches by target subdir so we can run analysis per project
from collections import defaultdict
groups = defaultdict(list)
for patch_path, subdir in patches_to_apply:
groups[subdir].append(patch_path)
all_failed = []
overall_success = True
for subdir, group_patches in sorted(groups.items()):
target_dir = os.path.join(source_dir, subdir) if subdir != "." else source_dir
if not os.path.exists(target_dir):
print(f"Warning: Target directory does not exist, skipping: {target_dir}")
all_failed.extend([p.name for p in group_patches])
overall_success = False
continue
print(f"\nApplying {len(group_patches)} patch(es) to: {subdir if subdir != '.' else '<source root>'}")
handler = GitPatchHandler(repo_path=target_dir, verbose=verbose)
analysis = handler.analyze_patch_series([str(p) for p in group_patches])
print(f" Analysis: {analysis['summary']['total_patches']} patch(es), "
f"{analysis['summary']['total_files_modified']} file(s) modified, "
f"{len(analysis['summary']['file_conflicts_detected'])} potential conflict(s)")
result = handler.apply_patch_series(
[str(p) for p in group_patches],
target_dir=".",
stop_on_conflict=not force,
rollback_on_failure=True
)
print(f" Result: {'SUCCESS' if result['success'] else 'FAILED'} "
f"({len(result['patches_applied'])}/{len(group_patches)} applied)")
for patch_result in result['results']:
patch_name = Path(patch_result['patch']).name
if not patch_result['success']:
all_failed.append(patch_name)
print(f" FAILED: {patch_name}{patch_result['message']}")
if patch_result.get('conflicts'):
print(f" Conflicts: {', '.join(patch_result['conflicts'])}")
if patch_result.get('rejected_files'):
print(f" Rejected hunks in: {', '.join(patch_result['rejected_files'])}")
if not result['success']:
overall_success = False
if not force:
break
print(f"\nOverall patch result: {'SUCCESS' if overall_success else 'FAILED'}")
return overall_success, all_failed
def apply_patches_legacy(patches, source_dir, patch_level=1, force=False, ignore_patches=None):
"""Legacy patch application for fallback.
patches is a list of (patch_path, target_subdir) tuples.
"""
if ignore_patches is None:
ignore_patches = []
applied_patches = 0
failed_patches = []
for patch_file, subdir in patches:
patch_name = patch_file.name
if patch_name in ignore_patches:
print(f"Ignoring patch: {patch_name}")
continue
target_dir = os.path.join(source_dir, subdir) if subdir != "." else source_dir
try:
apply_patch_legacy(patch_file, target_dir, patch_level, force)
applied_patches += 1
except Exception as e:
print(f"Failed to apply patch {patch_name}: {e}")
failed_patches.append(patch_name)
if not force:
print("Stopping due to patch failure. Use --forceApplyPatches to continue.")
return False, failed_patches
print(f"Successfully applied {applied_patches} patches")
return True, failed_patches
def repo_sync(force=False, jobs=None, current_dir="."):
"""Sync Android source using repo with real-time output."""
print("Syncing Android source...")
sync_cmd = ["repo", "sync"]
if force:
sync_cmd.extend(["--force-sync"])
if jobs:
sync_cmd.extend([f"-j{jobs}"])
# Use real-time output for repo sync
run_command_realtime(sync_cmd, cwd=current_dir)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Android build environment setup script")
parser.add_argument("--noSync", action="store_true",
help="Skip repo sync (sync is enabled by default)")
parser.add_argument("--forceResync", action="store_true",
help="Force sync source code with repo sync --force-sync")
parser.add_argument("--applyPatches", action="store_true",
help="Apply patches from patches directory")
parser.add_argument("--forceApplyPatches", action="store_true",
help="Force apply patches even if some fail")
parser.add_argument("--patchLevel", type=int, default=1,
help="Patch level for -p option (default: 1)")
parser.add_argument("--sourceDir", type=str, default="AOSP-source",
help="Source directory (default: AOSP-source)")
parser.add_argument("--patchDir", type=str, default=None, nargs="+",
help="Patch directories to search (overrides repo.yml patches.dirs). Can specify multiple.")
parser.add_argument("--syncJobs", type=int, default=None,
help="Number of parallel jobs for repo sync (default: CPU count)")
parser.add_argument("--ignorePatches", type=str, nargs="*", default=[],
help="List of patches to ignore")
parser.add_argument("--useLegacyPatch", action="store_true",
help="Use legacy patch command instead of robust handler")
parser.add_argument("--verbosePatches", action="store_true",
help="Verbose output for patch application")
parser.add_argument("--config", type=str, default="repo.yml",
help="Path to repo configuration file (default: repo.yml)")
parser.add_argument("--no-git-lfs", action="store_true",
help="Disable Git LFS setup (enabled by default)")
return parser.parse_args()
def main():
# Parse command line arguments
args = parse_arguments()
# Use command line arguments with defaults
source_dir = args.sourceDir
patch_dirs = args.patchDir # may be None if not explicitly passed
patch_level = args.patchLevel
sync_jobs = args.syncJobs if args.syncJobs is not None else os.cpu_count()
ignore_patches = args.ignorePatches
print("Android Build Environment Setup")
print(f"Source directory: {source_dir}")
print(f"Patch directories: {', '.join(patch_dirs)}")
print(f"Patch level: -p{patch_level}")
print(f"Sync jobs: {sync_jobs}")
if ignore_patches:
print(f"Ignoring patches: {', '.join(ignore_patches)}")
print(f"Git LFS: {'Disabled' if args.no_git_lfs else 'Enabled'}")
# Get absolute path to prevent any relative path confusion
current_dir = os.path.abspath(".")
absolute_source_dir = os.path.abspath(source_dir)
print(f"Current directory: {current_dir}")
print(f"Absolute source directory: {absolute_source_dir}")
# Check if we're trying to create a nested directory
if absolute_source_dir.startswith(current_dir) and absolute_source_dir != current_dir:
# We're creating a subdirectory, which is fine
working_dir = absolute_source_dir
if not os.path.exists(working_dir):
os.makedirs(working_dir)
print(f"Created source directory: {working_dir}")
else:
print(f"Using existing source directory: {working_dir}")
else:
# Use the specified directory directly
working_dir = absolute_source_dir
if not os.path.exists(working_dir):
os.makedirs(working_dir)
print(f"Created source directory: {working_dir}")
else:
print(f"Using existing source directory: {working_dir}")
# Load repo configuration
config = load_repo_config(args.config)
# Initialize repo if config exists and .repo directory doesn't exist
repo_dir = os.path.join(working_dir, ".repo")
if config and not os.path.exists(repo_dir):
repo_init(working_dir, config)
elif config and os.path.exists(repo_dir):
print("Repo already initialized, skipping init")
elif not config:
print("No repo configuration found, skipping repo init")
# Handle repo sync - always sync unless --noSync is specified
if not args.noSync:
repo_sync(force=args.forceResync, jobs=sync_jobs, current_dir=working_dir)
else:
print("Skipping repo sync as requested")
# Setup Git LFS after repo sync (enabled by default unless --no-git-lfs is specified)
if not args.no_git_lfs:
setup_git_lfs(working_dir)
else:
print("Skipping Git LFS setup as requested")
# Apply patches
if args.applyPatches or args.forceApplyPatches:
# Resolve patch dirs: CLI --patchDir > repo.yml patches.dirs > fallback "patches"
patch_config = config.get('patches', {}) if config else {}
if patch_dirs is not None:
effective_patch_dirs = patch_dirs
elif 'dirs' in patch_config:
effective_patch_dirs = patch_config['dirs']
print(f"Using patch directories from repo.yml: {effective_patch_dirs}")
else:
effective_patch_dirs = ['patches']
# Load directory mappings from repo.yml (e.g. "android/36" -> "frameworks/base")
mappings = patch_config.get('mappings', {})
if mappings:
print(f"Patch directory mappings: {mappings}")
# Resolve relative paths against working_dir
processed_patch_dirs = []
for patch_dir in effective_patch_dirs:
if not os.path.isabs(patch_dir):
processed_patch_dirs.append(os.path.join(working_dir, patch_dir))
else:
processed_patch_dirs.append(patch_dir)
patches = find_patches(processed_patch_dirs, mappings=mappings)
if patches:
if not args.useLegacyPatch:
success, failed_patches = apply_patches_robust(
patches,
working_dir,
patch_level,
args.forceApplyPatches,
ignore_patches,
args.verbosePatches
)
else:
success, failed_patches = apply_patches_legacy(
patches,
working_dir,
patch_level,
args.forceApplyPatches,
ignore_patches
)
if not success and not args.forceApplyPatches:
print(f"Patch application failed. Failed patches: {', '.join(failed_patches)}")
sys.exit(1)
elif failed_patches:
print(f"Some patches failed but continuing (force mode): {', '.join(failed_patches)}")
else:
print("All patches applied successfully!")
else:
print(f"No patches found in {', '.join(processed_patch_dirs)}")
print("Environment setup completed!")
if __name__ == "__main__":
main()