634 lines
24 KiB
Python
634 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Robust Patch Handler
|
|
A Python module for applying patch files with intelligent conflict handling.
|
|
Can be used as both a CLI tool and an importable module.
|
|
"""
|
|
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
import sys
|
|
import re
|
|
import shutil
|
|
import difflib
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Any
|
|
import argparse
|
|
import json
|
|
|
|
class GitPatchHandler:
|
|
"""
|
|
A robust patch handler that uses Git for patch application with fallbacks
|
|
and intelligent conflict handling.
|
|
"""
|
|
|
|
def __init__(self, repo_path: str = ".", verbose: bool = False):
|
|
self.repo_path = Path(repo_path).absolute()
|
|
self.verbose = verbose
|
|
self.use_git = self._check_git_available()
|
|
self._log(f"Initialized patch handler in {self.repo_path}")
|
|
self._log(f"Git available: {self.use_git}")
|
|
|
|
def _log(self, message: str) -> None:
|
|
"""Log messages if verbose mode is enabled"""
|
|
if self.verbose:
|
|
print(f"[PatchHandler] {message}", file=sys.stderr)
|
|
|
|
def _check_git_available(self) -> bool:
|
|
"""Check if git is available in the system"""
|
|
try:
|
|
subprocess.run(['git', '--version'], check=True, capture_output=True, timeout=5)
|
|
return True
|
|
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
|
|
return False
|
|
|
|
def apply_patch_series(self, patch_files: List[str], target_dir: str = ".",
|
|
stop_on_conflict: bool = False, rollback_on_failure: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Apply multiple patches with conflict handling and dependency detection.
|
|
|
|
Args:
|
|
patch_files: List of paths to patch files
|
|
target_dir: Directory to apply patches to
|
|
stop_on_conflict: Stop applying patches if a conflict occurs
|
|
rollback_on_failure: Rollback all patches if any fail
|
|
|
|
Returns:
|
|
Dictionary with application results
|
|
"""
|
|
target_path = self.repo_path / target_dir
|
|
self._log(f"Applying {len(patch_files)} patches to {target_path}")
|
|
|
|
# Analyze patches first
|
|
analysis = self.analyze_patch_series(patch_files)
|
|
ordered_patches = self._order_patches_by_dependency(analysis)
|
|
|
|
results = []
|
|
all_success = True
|
|
applied_patches = []
|
|
|
|
for patch_file in ordered_patches:
|
|
self._log(f"Applying patch: {patch_file}")
|
|
|
|
if rollback_on_failure:
|
|
result = self.apply_with_rollback(patch_file, str(target_path))
|
|
else:
|
|
result = self.apply_single_patch(patch_file, str(target_path))
|
|
|
|
result_data = {
|
|
'patch': patch_file,
|
|
'success': result['success'],
|
|
'conflicts': result.get('conflicts', []),
|
|
'message': result.get('message', ''),
|
|
'rolled_back': result.get('rolled_back', False)
|
|
}
|
|
results.append(result_data)
|
|
|
|
if result['success']:
|
|
applied_patches.append(patch_file)
|
|
else:
|
|
all_success = False
|
|
if stop_on_conflict:
|
|
self._log(f"Stopping on conflict in {patch_file}")
|
|
break
|
|
|
|
return {
|
|
'success': all_success,
|
|
'patches_applied': applied_patches,
|
|
'results': results,
|
|
'analysis': analysis
|
|
}
|
|
|
|
def apply_single_patch(self, patch_file: str, target_dir: str = ".") -> Dict[str, Any]:
|
|
"""
|
|
Apply a single patch file using git with fallbacks.
|
|
|
|
Args:
|
|
patch_file: Path to the patch file
|
|
target_dir: Directory to apply the patch to
|
|
|
|
Returns:
|
|
Dictionary with application results
|
|
"""
|
|
patch_path = Path(patch_file).absolute()
|
|
target_path = self.repo_path / target_dir
|
|
|
|
self._log(f"Applying single patch: {patch_path} -> {target_path}")
|
|
|
|
if not patch_path.exists():
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': f'Patch file not found: {patch_file}'
|
|
}
|
|
|
|
if not self.use_git:
|
|
return self._fallback_patch_apply(str(patch_path), str(target_path))
|
|
|
|
try:
|
|
# Use git apply with 3-way merge for better conflict resolution
|
|
cmd = [
|
|
'git', 'apply',
|
|
'--3way', # Allow 3-way merge if base available
|
|
'--allow-overlap', # Allow overlapping patches
|
|
'--verbose', # Get detailed output
|
|
str(patch_path)
|
|
]
|
|
|
|
self._log(f"Running: {' '.join(cmd)}")
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=str(target_path),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30 # Safety timeout
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return {
|
|
'success': True,
|
|
'message': 'Patch applied cleanly',
|
|
'output': result.stdout,
|
|
'method': 'git'
|
|
}
|
|
else:
|
|
return self._handle_git_failure(result, str(patch_path), str(target_path))
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': 'Patch application timed out'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': f'Unexpected error: {str(e)}'
|
|
}
|
|
|
|
def _handle_git_failure(self, result: subprocess.CompletedProcess, patch_file: str, target_dir: str) -> Dict[str, Any]:
|
|
"""Handle git apply failures intelligently"""
|
|
stderr = result.stderr.lower()
|
|
stdout = result.stdout
|
|
|
|
# Analyze the error type
|
|
if 'conflict' in stderr:
|
|
conflicts = self._extract_conflicts(stderr, stdout)
|
|
return {
|
|
'success': False,
|
|
'message': 'Patch conflicts detected',
|
|
'conflicts': conflicts,
|
|
'output': result.stdout,
|
|
'error': result.stderr,
|
|
'method': 'git'
|
|
}
|
|
elif 'already exists' in stderr or 'already applied' in stderr:
|
|
return {
|
|
'success': True, # Treat as success since change is already present
|
|
'message': 'Patch already applied or redundant',
|
|
'output': result.stdout,
|
|
'method': 'git'
|
|
}
|
|
elif 'patch does not apply' in stderr:
|
|
# Try with fuzz factor
|
|
return self._try_fuzzy_apply(patch_file, target_dir, result)
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': f'Patch failed: {result.stderr}',
|
|
'output': result.stdout,
|
|
'error': result.stderr,
|
|
'method': 'git'
|
|
}
|
|
|
|
def _try_fuzzy_apply(self, patch_file: str, target_dir: str, previous_result: subprocess.CompletedProcess) -> Dict[str, Any]:
|
|
"""Try applying patch with --reject to apply what fits, then check for leftover .rej files"""
|
|
self._log("Attempting fuzzy patch application with --reject")
|
|
try:
|
|
cmd = [
|
|
'git', 'apply',
|
|
'--3way',
|
|
'--allow-overlap',
|
|
'--verbose',
|
|
'--reject', # Apply hunks that work, write .rej for those that don't
|
|
str(patch_file)
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=target_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
# Check for leftover .rej files — these mean hunks were skipped
|
|
rej_files = list(Path(target_dir).rglob('*.rej'))
|
|
if rej_files:
|
|
rej_names = [str(r.relative_to(target_dir)) for r in rej_files]
|
|
self._log(f"Found {len(rej_files)} reject file(s): {rej_names}")
|
|
# Clean up .rej files so they don't pollute the tree
|
|
for rej in rej_files:
|
|
try:
|
|
rej.unlink()
|
|
except OSError:
|
|
pass
|
|
return {
|
|
'success': False,
|
|
'message': f'Patch partially applied — {len(rej_files)} hunk(s) rejected',
|
|
'output': result.stdout,
|
|
'error': result.stderr,
|
|
'method': 'git-fuzzy',
|
|
'rejected_files': rej_names
|
|
}
|
|
|
|
if result.returncode == 0:
|
|
return {
|
|
'success': True,
|
|
'message': 'Patch applied cleanly (fuzzy)',
|
|
'output': result.stdout,
|
|
'error': result.stderr,
|
|
'method': 'git-fuzzy'
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': 'Fuzzy application also failed',
|
|
'output': result.stdout,
|
|
'error': result.stderr,
|
|
'method': 'git-fuzzy'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': f'Fuzzy application error: {str(e)}'
|
|
}
|
|
|
|
def _extract_conflicts(self, stderr: str, stdout: str) -> List[str]:
|
|
"""Extract conflict information from git output"""
|
|
conflicts = []
|
|
|
|
# Look for file names in conflict messages
|
|
conflict_patterns = [
|
|
r'error:\s*([^\n]*conflict[^\n]*\.patch[^\n]*)',
|
|
r'conflict\s+in\s+([^\s]+)',
|
|
r'([/\w][^\s]*\.[\w]+).*conflict'
|
|
]
|
|
|
|
for pattern in conflict_patterns:
|
|
matches = re.findall(pattern, stderr + stdout, re.IGNORECASE)
|
|
conflicts.extend(matches)
|
|
|
|
# Also look for .rej files mentioned
|
|
rej_matches = re.findall(r'([^\s]+\.rej)', stderr + stdout)
|
|
conflicts.extend(rej_matches)
|
|
|
|
return list(set(conflicts)) # Remove duplicates
|
|
|
|
def _fallback_patch_apply(self, patch_file: str, target_dir: str) -> Dict[str, Any]:
|
|
"""Fallback to system patch command if git isn't available"""
|
|
self._log("Falling back to system patch command")
|
|
try:
|
|
# Try with patch command
|
|
result = subprocess.run(
|
|
['patch', '-p1', '-i', patch_file, '--verbose', '--dry-run'],
|
|
cwd=target_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Dry run succeeded, apply for real
|
|
result = subprocess.run(
|
|
['patch', '-p1', '-i', patch_file],
|
|
cwd=target_dir,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
return {
|
|
'success': result.returncode == 0,
|
|
'message': 'Applied via system patch',
|
|
'output': result.stdout,
|
|
'error': result.stderr if result.returncode != 0 else '',
|
|
'method': 'system-patch'
|
|
}
|
|
except FileNotFoundError:
|
|
return {
|
|
'success': False,
|
|
'fatal': True,
|
|
'message': 'Neither git nor patch command available'
|
|
}
|
|
|
|
def analyze_patch_series(self, patch_files: List[str]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze patches for potential conflicts before applying.
|
|
|
|
Args:
|
|
patch_files: List of paths to patch files
|
|
|
|
Returns:
|
|
Dictionary with patch analysis
|
|
"""
|
|
self._log(f"Analyzing {len(patch_files)} patches")
|
|
analysis = {}
|
|
all_modified_files = set()
|
|
|
|
for patch_file in patch_files:
|
|
patch_path = Path(patch_file)
|
|
if not patch_path.exists():
|
|
analysis[patch_file] = {'error': 'File not found'}
|
|
continue
|
|
|
|
with open(patch_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
patch_content = f.read()
|
|
|
|
modified_files = self._extract_modified_files(patch_content)
|
|
all_modified_files.update(modified_files)
|
|
|
|
analysis[patch_file] = {
|
|
'files_modified': modified_files,
|
|
'patch_size': len(patch_content),
|
|
'lines_changed': self._count_lines_changed(patch_content),
|
|
'conflict_risk': self._assess_conflict_risk(patch_content, modified_files),
|
|
'is_binary': self._is_binary_patch(patch_content)
|
|
}
|
|
|
|
# Detect file conflicts between patches
|
|
file_conflicts = self._detect_file_conflicts(analysis)
|
|
|
|
return {
|
|
'patches': analysis,
|
|
'summary': {
|
|
'total_patches': len(patch_files),
|
|
'total_files_modified': len(all_modified_files),
|
|
'file_conflicts_detected': file_conflicts
|
|
}
|
|
}
|
|
|
|
def _extract_modified_files(self, patch_content: str) -> List[str]:
|
|
"""Extract list of files modified by this patch"""
|
|
files = []
|
|
for line in patch_content.split('\n'):
|
|
if line.startswith('--- ') or line.startswith('+++ '):
|
|
parts = line.split(' ', 1)
|
|
if len(parts) < 2:
|
|
continue
|
|
file_path = parts[1].split('\t')[0].strip() # handle timestamps after tab
|
|
if file_path == '/dev/null':
|
|
continue
|
|
# Strip git-style a/ or b/ prefixes (standard in git format-patch output)
|
|
clean_path = re.sub(r'^[ab]/', '', file_path)
|
|
files.append(clean_path)
|
|
return list(set(files))
|
|
|
|
def _count_lines_changed(self, patch_content: str) -> int:
|
|
"""Count approximate number of lines changed in patch"""
|
|
lines_changed = 0
|
|
for line in patch_content.split('\n'):
|
|
if line.startswith('+') and not line.startswith('+++'):
|
|
lines_changed += 1
|
|
elif line.startswith('-') and not line.startswith('---'):
|
|
lines_changed += 1
|
|
return lines_changed
|
|
|
|
def _assess_conflict_risk(self, patch_content: str, modified_files: List[str]) -> str:
|
|
"""Assess conflict risk level"""
|
|
lines_changed = self._count_lines_changed(patch_content)
|
|
|
|
if lines_changed > 100:
|
|
return 'high'
|
|
elif lines_changed > 50:
|
|
return 'medium'
|
|
else:
|
|
return 'low'
|
|
|
|
def _is_binary_patch(self, patch_content: str) -> bool:
|
|
"""Check if patch contains binary files"""
|
|
return 'GIT binary patch' in patch_content or 'Binary files' in patch_content
|
|
|
|
def _detect_file_conflicts(self, analysis: Dict[str, Any]) -> List[List[str]]:
|
|
"""Detect which patches modify the same files"""
|
|
file_to_patches = {}
|
|
conflicts = []
|
|
|
|
for patch_file, info in analysis.items():
|
|
if 'files_modified' not in info:
|
|
continue
|
|
for file_path in info['files_modified']:
|
|
if file_path not in file_to_patches:
|
|
file_to_patches[file_path] = []
|
|
file_to_patches[file_path].append(patch_file)
|
|
|
|
for file_path, patches in file_to_patches.items():
|
|
if len(patches) > 1:
|
|
conflicts.append(patches)
|
|
|
|
return conflicts
|
|
|
|
def _order_patches_by_dependency(self, analysis: Dict[str, Any]) -> List[str]:
|
|
"""Order patches to minimize conflicts"""
|
|
patches = list(analysis.get('patches', {}).keys())
|
|
|
|
# Simple heuristic: smaller patches first, larger patches later
|
|
def get_patch_size(patch_file):
|
|
info = analysis['patches'].get(patch_file, {})
|
|
return info.get('patch_size', 0)
|
|
|
|
return sorted(patches, key=get_patch_size)
|
|
|
|
def apply_with_rollback(self, patch_file: str, target_dir: str) -> Dict[str, Any]:
|
|
"""
|
|
Apply patch with rollback capability on failure.
|
|
|
|
Args:
|
|
patch_file: Path to the patch file
|
|
target_dir: Directory to apply the patch to
|
|
|
|
Returns:
|
|
Dictionary with application results
|
|
"""
|
|
patch_path = Path(patch_file)
|
|
target_path = Path(target_dir)
|
|
|
|
# Extract files that will be modified
|
|
with open(patch_path, 'r') as f:
|
|
patch_content = f.read()
|
|
modified_files = self._extract_modified_files(patch_content)
|
|
|
|
# Create backup of modified files
|
|
backup_dir = tempfile.mkdtemp(prefix='patch_backup_')
|
|
backed_up_files = []
|
|
|
|
for file_path in modified_files:
|
|
full_path = target_path / file_path
|
|
if full_path.exists():
|
|
backup_path = Path(backup_dir) / file_path
|
|
backup_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(full_path, backup_path)
|
|
backed_up_files.append(str(full_path))
|
|
|
|
try:
|
|
result = self.apply_single_patch(patch_file, target_dir)
|
|
if not result['success']:
|
|
self._log("Patch failed, restoring backup")
|
|
self._restore_backup(backup_dir, target_path)
|
|
result['rolled_back'] = True
|
|
else:
|
|
result['rolled_back'] = False
|
|
|
|
result['backed_up_files'] = backed_up_files
|
|
return result
|
|
|
|
except Exception as e:
|
|
self._log(f"Exception during patch: {e}, restoring backup")
|
|
self._restore_backup(backup_dir, target_path)
|
|
return {
|
|
'success': False,
|
|
'rolled_back': True,
|
|
'message': f'Failed and rolled back: {str(e)}',
|
|
'backed_up_files': backed_up_files
|
|
}
|
|
finally:
|
|
# Clean up backup directory
|
|
shutil.rmtree(backup_dir, ignore_errors=True)
|
|
|
|
def _restore_backup(self, backup_dir: str, target_path: Path) -> None:
|
|
"""Restore files from backup"""
|
|
backup_path = Path(backup_dir)
|
|
for backup_file in backup_path.rglob('*'):
|
|
if backup_file.is_file():
|
|
relative_path = backup_file.relative_to(backup_path)
|
|
target_file = target_path / relative_path
|
|
target_file.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(backup_file, target_file)
|
|
|
|
def create_patch(self, original_file: str, modified_file: str, output_patch: str) -> bool:
|
|
"""
|
|
Create a patch file from original and modified files.
|
|
|
|
Args:
|
|
original_file: Path to original file
|
|
modified_file: Path to modified file
|
|
output_patch: Path for output patch file
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
with open(original_file, 'r') as f1, open(modified_file, 'r') as f2:
|
|
original_lines = f1.readlines()
|
|
modified_lines = f2.readlines()
|
|
|
|
diff = difflib.unified_diff(
|
|
original_lines,
|
|
modified_lines,
|
|
fromfile=original_file,
|
|
tofile=modified_file,
|
|
lineterm=''
|
|
)
|
|
|
|
with open(output_patch, 'w') as f:
|
|
f.write('\n'.join(diff))
|
|
|
|
self._log(f"Patch created: {output_patch}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._log(f"Error creating patch: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Command line interface for the patch handler"""
|
|
parser = argparse.ArgumentParser(description='Robust patch file handler')
|
|
parser.add_argument('patches', nargs='+', help='Patch files to apply')
|
|
parser.add_argument('--target-dir', '-t', default='.', help='Target directory for patches')
|
|
parser.add_argument('--repo-path', '-r', default='.', help='Repository root path')
|
|
parser.add_argument('--stop-on-conflict', '-s', action='store_true', help='Stop on first conflict')
|
|
parser.add_argument('--rollback', '-b', action='store_true', help='Rollback on failure')
|
|
parser.add_argument('--analyze-only', '-a', action='store_true', help='Only analyze patches, do not apply')
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
parser.add_argument('--output-format', '-f', choices=['text', 'json'], default='text', help='Output format')
|
|
|
|
args = parser.parse_args()
|
|
|
|
handler = GitPatchHandler(repo_path=args.repo_path, verbose=args.verbose)
|
|
|
|
if args.analyze_only:
|
|
analysis = handler.analyze_patch_series(args.patches)
|
|
if args.output_format == 'json':
|
|
print(json.dumps(analysis, indent=2))
|
|
else:
|
|
print_patch_analysis(analysis)
|
|
else:
|
|
result = handler.apply_patch_series(
|
|
args.patches,
|
|
target_dir=args.target_dir,
|
|
stop_on_conflict=args.stop_on_conflict,
|
|
rollback_on_failure=args.rollback
|
|
)
|
|
|
|
if args.output_format == 'json':
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print_patch_results(result)
|
|
|
|
sys.exit(0 if result['success'] else 1)
|
|
|
|
|
|
def print_patch_analysis(analysis: Dict[str, Any]) -> None:
|
|
"""Print patch analysis in human-readable format"""
|
|
print("Patch Analysis Report")
|
|
print("=" * 50)
|
|
|
|
summary = analysis['summary']
|
|
print(f"Total patches: {summary['total_patches']}")
|
|
print(f"Total files modified: {summary['total_files_modified']}")
|
|
print(f"File conflicts detected: {len(summary['file_conflicts_detected'])}")
|
|
|
|
if summary['file_conflicts_detected']:
|
|
print("\nPotential conflicts:")
|
|
for conflict in summary['file_conflicts_detected']:
|
|
print(f" - {', '.join(conflict)}")
|
|
|
|
print("\nPatch details:")
|
|
for patch_file, info in analysis['patches'].items():
|
|
if 'error' in info:
|
|
print(f" {patch_file}: ERROR - {info['error']}")
|
|
else:
|
|
print(f" {patch_file}:")
|
|
print(f" - Files: {', '.join(info['files_modified'])}")
|
|
print(f" - Size: {info['patch_size']} bytes")
|
|
print(f" - Lines changed: {info['lines_changed']}")
|
|
print(f" - Risk: {info['conflict_risk']}")
|
|
print(f" - Binary: {info['is_binary']}")
|
|
|
|
|
|
def print_patch_results(result: Dict[str, Any]) -> None:
|
|
"""Print patch application results in human-readable format"""
|
|
print("Patch Application Report")
|
|
print("=" * 50)
|
|
print(f"Overall success: {'YES' if result['success'] else 'NO'}")
|
|
print(f"Patches applied: {len(result['patches_applied'])}/{len(result['results'])}")
|
|
|
|
print("\nDetailed results:")
|
|
for patch_result in result['results']:
|
|
status = "✓" if patch_result['success'] else "✗"
|
|
print(f" {status} {patch_result['patch']}")
|
|
if not patch_result['success']:
|
|
print(f" Message: {patch_result['message']}")
|
|
if patch_result.get('conflicts'):
|
|
print(f" Conflicts: {', '.join(patch_result['conflicts'])}")
|
|
if patch_result.get('rolled_back'):
|
|
print(f" [ROLLED BACK]")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |