Files
PawletOS-Build/scripts/better_patch.py
2026-03-20 05:16:13 -07:00

634 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Robust Patch Handler
A Python module for applying patch files with intelligent conflict handling.
Can be used as both a CLI tool and an importable module.
"""
import subprocess
import tempfile
import os
import sys
import re
import shutil
import difflib
from pathlib import Path
from typing import List, Dict, Optional, Any
import argparse
import json
class GitPatchHandler:
"""
A robust patch handler that uses Git for patch application with fallbacks
and intelligent conflict handling.
"""
def __init__(self, repo_path: str = ".", verbose: bool = False):
self.repo_path = Path(repo_path).absolute()
self.verbose = verbose
self.use_git = self._check_git_available()
self._log(f"Initialized patch handler in {self.repo_path}")
self._log(f"Git available: {self.use_git}")
def _log(self, message: str) -> None:
"""Log messages if verbose mode is enabled"""
if self.verbose:
print(f"[PatchHandler] {message}", file=sys.stderr)
def _check_git_available(self) -> bool:
"""Check if git is available in the system"""
try:
subprocess.run(['git', '--version'], check=True, capture_output=True, timeout=5)
return True
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
return False
def apply_patch_series(self, patch_files: List[str], target_dir: str = ".",
stop_on_conflict: bool = False, rollback_on_failure: bool = False) -> Dict[str, Any]:
"""
Apply multiple patches with conflict handling and dependency detection.
Args:
patch_files: List of paths to patch files
target_dir: Directory to apply patches to
stop_on_conflict: Stop applying patches if a conflict occurs
rollback_on_failure: Rollback all patches if any fail
Returns:
Dictionary with application results
"""
target_path = self.repo_path / target_dir
self._log(f"Applying {len(patch_files)} patches to {target_path}")
# Analyze patches first
analysis = self.analyze_patch_series(patch_files)
ordered_patches = self._order_patches_by_dependency(analysis)
results = []
all_success = True
applied_patches = []
for patch_file in ordered_patches:
self._log(f"Applying patch: {patch_file}")
if rollback_on_failure:
result = self.apply_with_rollback(patch_file, str(target_path))
else:
result = self.apply_single_patch(patch_file, str(target_path))
result_data = {
'patch': patch_file,
'success': result['success'],
'conflicts': result.get('conflicts', []),
'message': result.get('message', ''),
'rolled_back': result.get('rolled_back', False)
}
results.append(result_data)
if result['success']:
applied_patches.append(patch_file)
else:
all_success = False
if stop_on_conflict:
self._log(f"Stopping on conflict in {patch_file}")
break
return {
'success': all_success,
'patches_applied': applied_patches,
'results': results,
'analysis': analysis
}
def apply_single_patch(self, patch_file: str, target_dir: str = ".") -> Dict[str, Any]:
"""
Apply a single patch file using git with fallbacks.
Args:
patch_file: Path to the patch file
target_dir: Directory to apply the patch to
Returns:
Dictionary with application results
"""
patch_path = Path(patch_file).absolute()
target_path = self.repo_path / target_dir
self._log(f"Applying single patch: {patch_path} -> {target_path}")
if not patch_path.exists():
return {
'success': False,
'fatal': True,
'message': f'Patch file not found: {patch_file}'
}
if not self.use_git:
return self._fallback_patch_apply(str(patch_path), str(target_path))
try:
# Use git apply with 3-way merge for better conflict resolution
cmd = [
'git', 'apply',
'--3way', # Allow 3-way merge if base available
'--allow-overlap', # Allow overlapping patches
'--verbose', # Get detailed output
str(patch_path)
]
self._log(f"Running: {' '.join(cmd)}")
result = subprocess.run(
cmd,
cwd=str(target_path),
capture_output=True,
text=True,
timeout=30 # Safety timeout
)
if result.returncode == 0:
return {
'success': True,
'message': 'Patch applied cleanly',
'output': result.stdout,
'method': 'git'
}
else:
return self._handle_git_failure(result, str(patch_path), str(target_path))
except subprocess.TimeoutExpired:
return {
'success': False,
'fatal': True,
'message': 'Patch application timed out'
}
except Exception as e:
return {
'success': False,
'fatal': True,
'message': f'Unexpected error: {str(e)}'
}
def _handle_git_failure(self, result: subprocess.CompletedProcess, patch_file: str, target_dir: str) -> Dict[str, Any]:
"""Handle git apply failures intelligently"""
stderr = result.stderr.lower()
stdout = result.stdout
# Analyze the error type
if 'conflict' in stderr:
conflicts = self._extract_conflicts(stderr, stdout)
return {
'success': False,
'message': 'Patch conflicts detected',
'conflicts': conflicts,
'output': result.stdout,
'error': result.stderr,
'method': 'git'
}
elif 'already exists' in stderr or 'already applied' in stderr:
return {
'success': True, # Treat as success since change is already present
'message': 'Patch already applied or redundant',
'output': result.stdout,
'method': 'git'
}
elif 'patch does not apply' in stderr:
# Try with fuzz factor
return self._try_fuzzy_apply(patch_file, target_dir, result)
else:
return {
'success': False,
'fatal': True,
'message': f'Patch failed: {result.stderr}',
'output': result.stdout,
'error': result.stderr,
'method': 'git'
}
def _try_fuzzy_apply(self, patch_file: str, target_dir: str, previous_result: subprocess.CompletedProcess) -> Dict[str, Any]:
"""Try applying patch with --reject to apply what fits, then check for leftover .rej files"""
self._log("Attempting fuzzy patch application with --reject")
try:
cmd = [
'git', 'apply',
'--3way',
'--allow-overlap',
'--verbose',
'--reject', # Apply hunks that work, write .rej for those that don't
str(patch_file)
]
result = subprocess.run(
cmd,
cwd=target_dir,
capture_output=True,
text=True,
timeout=30
)
# Check for leftover .rej files — these mean hunks were skipped
rej_files = list(Path(target_dir).rglob('*.rej'))
if rej_files:
rej_names = [str(r.relative_to(target_dir)) for r in rej_files]
self._log(f"Found {len(rej_files)} reject file(s): {rej_names}")
# Clean up .rej files so they don't pollute the tree
for rej in rej_files:
try:
rej.unlink()
except OSError:
pass
return {
'success': False,
'message': f'Patch partially applied — {len(rej_files)} hunk(s) rejected',
'output': result.stdout,
'error': result.stderr,
'method': 'git-fuzzy',
'rejected_files': rej_names
}
if result.returncode == 0:
return {
'success': True,
'message': 'Patch applied cleanly (fuzzy)',
'output': result.stdout,
'error': result.stderr,
'method': 'git-fuzzy'
}
else:
return {
'success': False,
'fatal': True,
'message': 'Fuzzy application also failed',
'output': result.stdout,
'error': result.stderr,
'method': 'git-fuzzy'
}
except Exception as e:
return {
'success': False,
'fatal': True,
'message': f'Fuzzy application error: {str(e)}'
}
def _extract_conflicts(self, stderr: str, stdout: str) -> List[str]:
"""Extract conflict information from git output"""
conflicts = []
# Look for file names in conflict messages
conflict_patterns = [
r'error:\s*([^\n]*conflict[^\n]*\.patch[^\n]*)',
r'conflict\s+in\s+([^\s]+)',
r'([/\w][^\s]*\.[\w]+).*conflict'
]
for pattern in conflict_patterns:
matches = re.findall(pattern, stderr + stdout, re.IGNORECASE)
conflicts.extend(matches)
# Also look for .rej files mentioned
rej_matches = re.findall(r'([^\s]+\.rej)', stderr + stdout)
conflicts.extend(rej_matches)
return list(set(conflicts)) # Remove duplicates
def _fallback_patch_apply(self, patch_file: str, target_dir: str) -> Dict[str, Any]:
"""Fallback to system patch command if git isn't available"""
self._log("Falling back to system patch command")
try:
# Try with patch command
result = subprocess.run(
['patch', '-p1', '-i', patch_file, '--verbose', '--dry-run'],
cwd=target_dir,
capture_output=True,
text=True
)
if result.returncode == 0:
# Dry run succeeded, apply for real
result = subprocess.run(
['patch', '-p1', '-i', patch_file],
cwd=target_dir,
capture_output=True,
text=True
)
return {
'success': result.returncode == 0,
'message': 'Applied via system patch',
'output': result.stdout,
'error': result.stderr if result.returncode != 0 else '',
'method': 'system-patch'
}
except FileNotFoundError:
return {
'success': False,
'fatal': True,
'message': 'Neither git nor patch command available'
}
def analyze_patch_series(self, patch_files: List[str]) -> Dict[str, Any]:
"""
Analyze patches for potential conflicts before applying.
Args:
patch_files: List of paths to patch files
Returns:
Dictionary with patch analysis
"""
self._log(f"Analyzing {len(patch_files)} patches")
analysis = {}
all_modified_files = set()
for patch_file in patch_files:
patch_path = Path(patch_file)
if not patch_path.exists():
analysis[patch_file] = {'error': 'File not found'}
continue
with open(patch_path, 'r', encoding='utf-8', errors='ignore') as f:
patch_content = f.read()
modified_files = self._extract_modified_files(patch_content)
all_modified_files.update(modified_files)
analysis[patch_file] = {
'files_modified': modified_files,
'patch_size': len(patch_content),
'lines_changed': self._count_lines_changed(patch_content),
'conflict_risk': self._assess_conflict_risk(patch_content, modified_files),
'is_binary': self._is_binary_patch(patch_content)
}
# Detect file conflicts between patches
file_conflicts = self._detect_file_conflicts(analysis)
return {
'patches': analysis,
'summary': {
'total_patches': len(patch_files),
'total_files_modified': len(all_modified_files),
'file_conflicts_detected': file_conflicts
}
}
def _extract_modified_files(self, patch_content: str) -> List[str]:
"""Extract list of files modified by this patch"""
files = []
for line in patch_content.split('\n'):
if line.startswith('--- ') or line.startswith('+++ '):
parts = line.split(' ', 1)
if len(parts) < 2:
continue
file_path = parts[1].split('\t')[0].strip() # handle timestamps after tab
if file_path == '/dev/null':
continue
# Strip git-style a/ or b/ prefixes (standard in git format-patch output)
clean_path = re.sub(r'^[ab]/', '', file_path)
files.append(clean_path)
return list(set(files))
def _count_lines_changed(self, patch_content: str) -> int:
"""Count approximate number of lines changed in patch"""
lines_changed = 0
for line in patch_content.split('\n'):
if line.startswith('+') and not line.startswith('+++'):
lines_changed += 1
elif line.startswith('-') and not line.startswith('---'):
lines_changed += 1
return lines_changed
def _assess_conflict_risk(self, patch_content: str, modified_files: List[str]) -> str:
"""Assess conflict risk level"""
lines_changed = self._count_lines_changed(patch_content)
if lines_changed > 100:
return 'high'
elif lines_changed > 50:
return 'medium'
else:
return 'low'
def _is_binary_patch(self, patch_content: str) -> bool:
"""Check if patch contains binary files"""
return 'GIT binary patch' in patch_content or 'Binary files' in patch_content
def _detect_file_conflicts(self, analysis: Dict[str, Any]) -> List[List[str]]:
"""Detect which patches modify the same files"""
file_to_patches = {}
conflicts = []
for patch_file, info in analysis.items():
if 'files_modified' not in info:
continue
for file_path in info['files_modified']:
if file_path not in file_to_patches:
file_to_patches[file_path] = []
file_to_patches[file_path].append(patch_file)
for file_path, patches in file_to_patches.items():
if len(patches) > 1:
conflicts.append(patches)
return conflicts
def _order_patches_by_dependency(self, analysis: Dict[str, Any]) -> List[str]:
"""Order patches to minimize conflicts"""
patches = list(analysis.get('patches', {}).keys())
# Simple heuristic: smaller patches first, larger patches later
def get_patch_size(patch_file):
info = analysis['patches'].get(patch_file, {})
return info.get('patch_size', 0)
return sorted(patches, key=get_patch_size)
def apply_with_rollback(self, patch_file: str, target_dir: str) -> Dict[str, Any]:
"""
Apply patch with rollback capability on failure.
Args:
patch_file: Path to the patch file
target_dir: Directory to apply the patch to
Returns:
Dictionary with application results
"""
patch_path = Path(patch_file)
target_path = Path(target_dir)
# Extract files that will be modified
with open(patch_path, 'r') as f:
patch_content = f.read()
modified_files = self._extract_modified_files(patch_content)
# Create backup of modified files
backup_dir = tempfile.mkdtemp(prefix='patch_backup_')
backed_up_files = []
for file_path in modified_files:
full_path = target_path / file_path
if full_path.exists():
backup_path = Path(backup_dir) / file_path
backup_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(full_path, backup_path)
backed_up_files.append(str(full_path))
try:
result = self.apply_single_patch(patch_file, target_dir)
if not result['success']:
self._log("Patch failed, restoring backup")
self._restore_backup(backup_dir, target_path)
result['rolled_back'] = True
else:
result['rolled_back'] = False
result['backed_up_files'] = backed_up_files
return result
except Exception as e:
self._log(f"Exception during patch: {e}, restoring backup")
self._restore_backup(backup_dir, target_path)
return {
'success': False,
'rolled_back': True,
'message': f'Failed and rolled back: {str(e)}',
'backed_up_files': backed_up_files
}
finally:
# Clean up backup directory
shutil.rmtree(backup_dir, ignore_errors=True)
def _restore_backup(self, backup_dir: str, target_path: Path) -> None:
"""Restore files from backup"""
backup_path = Path(backup_dir)
for backup_file in backup_path.rglob('*'):
if backup_file.is_file():
relative_path = backup_file.relative_to(backup_path)
target_file = target_path / relative_path
target_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(backup_file, target_file)
def create_patch(self, original_file: str, modified_file: str, output_patch: str) -> bool:
"""
Create a patch file from original and modified files.
Args:
original_file: Path to original file
modified_file: Path to modified file
output_patch: Path for output patch file
Returns:
True if successful
"""
try:
with open(original_file, 'r') as f1, open(modified_file, 'r') as f2:
original_lines = f1.readlines()
modified_lines = f2.readlines()
diff = difflib.unified_diff(
original_lines,
modified_lines,
fromfile=original_file,
tofile=modified_file,
lineterm=''
)
with open(output_patch, 'w') as f:
f.write('\n'.join(diff))
self._log(f"Patch created: {output_patch}")
return True
except Exception as e:
self._log(f"Error creating patch: {e}")
return False
def main():
"""Command line interface for the patch handler"""
parser = argparse.ArgumentParser(description='Robust patch file handler')
parser.add_argument('patches', nargs='+', help='Patch files to apply')
parser.add_argument('--target-dir', '-t', default='.', help='Target directory for patches')
parser.add_argument('--repo-path', '-r', default='.', help='Repository root path')
parser.add_argument('--stop-on-conflict', '-s', action='store_true', help='Stop on first conflict')
parser.add_argument('--rollback', '-b', action='store_true', help='Rollback on failure')
parser.add_argument('--analyze-only', '-a', action='store_true', help='Only analyze patches, do not apply')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
parser.add_argument('--output-format', '-f', choices=['text', 'json'], default='text', help='Output format')
args = parser.parse_args()
handler = GitPatchHandler(repo_path=args.repo_path, verbose=args.verbose)
if args.analyze_only:
analysis = handler.analyze_patch_series(args.patches)
if args.output_format == 'json':
print(json.dumps(analysis, indent=2))
else:
print_patch_analysis(analysis)
else:
result = handler.apply_patch_series(
args.patches,
target_dir=args.target_dir,
stop_on_conflict=args.stop_on_conflict,
rollback_on_failure=args.rollback
)
if args.output_format == 'json':
print(json.dumps(result, indent=2))
else:
print_patch_results(result)
sys.exit(0 if result['success'] else 1)
def print_patch_analysis(analysis: Dict[str, Any]) -> None:
"""Print patch analysis in human-readable format"""
print("Patch Analysis Report")
print("=" * 50)
summary = analysis['summary']
print(f"Total patches: {summary['total_patches']}")
print(f"Total files modified: {summary['total_files_modified']}")
print(f"File conflicts detected: {len(summary['file_conflicts_detected'])}")
if summary['file_conflicts_detected']:
print("\nPotential conflicts:")
for conflict in summary['file_conflicts_detected']:
print(f" - {', '.join(conflict)}")
print("\nPatch details:")
for patch_file, info in analysis['patches'].items():
if 'error' in info:
print(f" {patch_file}: ERROR - {info['error']}")
else:
print(f" {patch_file}:")
print(f" - Files: {', '.join(info['files_modified'])}")
print(f" - Size: {info['patch_size']} bytes")
print(f" - Lines changed: {info['lines_changed']}")
print(f" - Risk: {info['conflict_risk']}")
print(f" - Binary: {info['is_binary']}")
def print_patch_results(result: Dict[str, Any]) -> None:
"""Print patch application results in human-readable format"""
print("Patch Application Report")
print("=" * 50)
print(f"Overall success: {'YES' if result['success'] else 'NO'}")
print(f"Patches applied: {len(result['patches_applied'])}/{len(result['results'])}")
print("\nDetailed results:")
for patch_result in result['results']:
status = "" if patch_result['success'] else ""
print(f" {status} {patch_result['patch']}")
if not patch_result['success']:
print(f" Message: {patch_result['message']}")
if patch_result.get('conflicts'):
print(f" Conflicts: {', '.join(patch_result['conflicts'])}")
if patch_result.get('rolled_back'):
print(f" [ROLLED BACK]")
if __name__ == '__main__':
main()