#!/usr/bin/env python3 """ Robust Patch Handler A Python module for applying patch files with intelligent conflict handling. Can be used as both a CLI tool and an importable module. """ import subprocess import tempfile import os import sys import re import shutil import difflib from pathlib import Path from typing import List, Dict, Optional, Any import argparse import json class GitPatchHandler: """ A robust patch handler that uses Git for patch application with fallbacks and intelligent conflict handling. """ def __init__(self, repo_path: str = ".", verbose: bool = False): self.repo_path = Path(repo_path).absolute() self.verbose = verbose self.use_git = self._check_git_available() self._log(f"Initialized patch handler in {self.repo_path}") self._log(f"Git available: {self.use_git}") def _log(self, message: str) -> None: """Log messages if verbose mode is enabled""" if self.verbose: print(f"[PatchHandler] {message}", file=sys.stderr) def _check_git_available(self) -> bool: """Check if git is available in the system""" try: subprocess.run(['git', '--version'], check=True, capture_output=True, timeout=5) return True except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): return False def apply_patch_series(self, patch_files: List[str], target_dir: str = ".", stop_on_conflict: bool = False, rollback_on_failure: bool = False) -> Dict[str, Any]: """ Apply multiple patches with conflict handling and dependency detection. Args: patch_files: List of paths to patch files target_dir: Directory to apply patches to stop_on_conflict: Stop applying patches if a conflict occurs rollback_on_failure: Rollback all patches if any fail Returns: Dictionary with application results """ target_path = self.repo_path / target_dir self._log(f"Applying {len(patch_files)} patches to {target_path}") # Analyze patches first analysis = self.analyze_patch_series(patch_files) ordered_patches = self._order_patches_by_dependency(analysis) results = [] all_success = True applied_patches = [] for patch_file in ordered_patches: self._log(f"Applying patch: {patch_file}") if rollback_on_failure: result = self.apply_with_rollback(patch_file, str(target_path)) else: result = self.apply_single_patch(patch_file, str(target_path)) result_data = { 'patch': patch_file, 'success': result['success'], 'conflicts': result.get('conflicts', []), 'message': result.get('message', ''), 'rolled_back': result.get('rolled_back', False) } results.append(result_data) if result['success']: applied_patches.append(patch_file) else: all_success = False if stop_on_conflict: self._log(f"Stopping on conflict in {patch_file}") break return { 'success': all_success, 'patches_applied': applied_patches, 'results': results, 'analysis': analysis } def apply_single_patch(self, patch_file: str, target_dir: str = ".") -> Dict[str, Any]: """ Apply a single patch file using git with fallbacks. Args: patch_file: Path to the patch file target_dir: Directory to apply the patch to Returns: Dictionary with application results """ patch_path = Path(patch_file).absolute() target_path = self.repo_path / target_dir self._log(f"Applying single patch: {patch_path} -> {target_path}") if not patch_path.exists(): return { 'success': False, 'fatal': True, 'message': f'Patch file not found: {patch_file}' } if not self.use_git: return self._fallback_patch_apply(str(patch_path), str(target_path)) try: # Use git apply with 3-way merge for better conflict resolution cmd = [ 'git', 'apply', '--3way', # Allow 3-way merge if base available '--allow-overlap', # Allow overlapping patches '--verbose', # Get detailed output str(patch_path) ] self._log(f"Running: {' '.join(cmd)}") result = subprocess.run( cmd, cwd=str(target_path), capture_output=True, text=True, timeout=30 # Safety timeout ) if result.returncode == 0: return { 'success': True, 'message': 'Patch applied cleanly', 'output': result.stdout, 'method': 'git' } else: return self._handle_git_failure(result, str(patch_path), str(target_path)) except subprocess.TimeoutExpired: return { 'success': False, 'fatal': True, 'message': 'Patch application timed out' } except Exception as e: return { 'success': False, 'fatal': True, 'message': f'Unexpected error: {str(e)}' } def _handle_git_failure(self, result: subprocess.CompletedProcess, patch_file: str, target_dir: str) -> Dict[str, Any]: """Handle git apply failures intelligently""" stderr = result.stderr.lower() stdout = result.stdout # Analyze the error type if 'conflict' in stderr: conflicts = self._extract_conflicts(stderr, stdout) return { 'success': False, 'message': 'Patch conflicts detected', 'conflicts': conflicts, 'output': result.stdout, 'error': result.stderr, 'method': 'git' } elif 'already exists' in stderr or 'already applied' in stderr: return { 'success': True, # Treat as success since change is already present 'message': 'Patch already applied or redundant', 'output': result.stdout, 'method': 'git' } elif 'patch does not apply' in stderr: # Try with fuzz factor return self._try_fuzzy_apply(patch_file, target_dir, result) else: return { 'success': False, 'fatal': True, 'message': f'Patch failed: {result.stderr}', 'output': result.stdout, 'error': result.stderr, 'method': 'git' } def _try_fuzzy_apply(self, patch_file: str, target_dir: str, previous_result: subprocess.CompletedProcess) -> Dict[str, Any]: """Try applying patch with --reject to apply what fits, then check for leftover .rej files""" self._log("Attempting fuzzy patch application with --reject") try: cmd = [ 'git', 'apply', '--3way', '--allow-overlap', '--verbose', '--reject', # Apply hunks that work, write .rej for those that don't str(patch_file) ] result = subprocess.run( cmd, cwd=target_dir, capture_output=True, text=True, timeout=30 ) # Check for leftover .rej files — these mean hunks were skipped rej_files = list(Path(target_dir).rglob('*.rej')) if rej_files: rej_names = [str(r.relative_to(target_dir)) for r in rej_files] self._log(f"Found {len(rej_files)} reject file(s): {rej_names}") # Clean up .rej files so they don't pollute the tree for rej in rej_files: try: rej.unlink() except OSError: pass return { 'success': False, 'message': f'Patch partially applied — {len(rej_files)} hunk(s) rejected', 'output': result.stdout, 'error': result.stderr, 'method': 'git-fuzzy', 'rejected_files': rej_names } if result.returncode == 0: return { 'success': True, 'message': 'Patch applied cleanly (fuzzy)', 'output': result.stdout, 'error': result.stderr, 'method': 'git-fuzzy' } else: return { 'success': False, 'fatal': True, 'message': 'Fuzzy application also failed', 'output': result.stdout, 'error': result.stderr, 'method': 'git-fuzzy' } except Exception as e: return { 'success': False, 'fatal': True, 'message': f'Fuzzy application error: {str(e)}' } def _extract_conflicts(self, stderr: str, stdout: str) -> List[str]: """Extract conflict information from git output""" conflicts = [] # Look for file names in conflict messages conflict_patterns = [ r'error:\s*([^\n]*conflict[^\n]*\.patch[^\n]*)', r'conflict\s+in\s+([^\s]+)', r'([/\w][^\s]*\.[\w]+).*conflict' ] for pattern in conflict_patterns: matches = re.findall(pattern, stderr + stdout, re.IGNORECASE) conflicts.extend(matches) # Also look for .rej files mentioned rej_matches = re.findall(r'([^\s]+\.rej)', stderr + stdout) conflicts.extend(rej_matches) return list(set(conflicts)) # Remove duplicates def _fallback_patch_apply(self, patch_file: str, target_dir: str) -> Dict[str, Any]: """Fallback to system patch command if git isn't available""" self._log("Falling back to system patch command") try: # Try with patch command result = subprocess.run( ['patch', '-p1', '-i', patch_file, '--verbose', '--dry-run'], cwd=target_dir, capture_output=True, text=True ) if result.returncode == 0: # Dry run succeeded, apply for real result = subprocess.run( ['patch', '-p1', '-i', patch_file], cwd=target_dir, capture_output=True, text=True ) return { 'success': result.returncode == 0, 'message': 'Applied via system patch', 'output': result.stdout, 'error': result.stderr if result.returncode != 0 else '', 'method': 'system-patch' } except FileNotFoundError: return { 'success': False, 'fatal': True, 'message': 'Neither git nor patch command available' } def analyze_patch_series(self, patch_files: List[str]) -> Dict[str, Any]: """ Analyze patches for potential conflicts before applying. Args: patch_files: List of paths to patch files Returns: Dictionary with patch analysis """ self._log(f"Analyzing {len(patch_files)} patches") analysis = {} all_modified_files = set() for patch_file in patch_files: patch_path = Path(patch_file) if not patch_path.exists(): analysis[patch_file] = {'error': 'File not found'} continue with open(patch_path, 'r', encoding='utf-8', errors='ignore') as f: patch_content = f.read() modified_files = self._extract_modified_files(patch_content) all_modified_files.update(modified_files) analysis[patch_file] = { 'files_modified': modified_files, 'patch_size': len(patch_content), 'lines_changed': self._count_lines_changed(patch_content), 'conflict_risk': self._assess_conflict_risk(patch_content, modified_files), 'is_binary': self._is_binary_patch(patch_content) } # Detect file conflicts between patches file_conflicts = self._detect_file_conflicts(analysis) return { 'patches': analysis, 'summary': { 'total_patches': len(patch_files), 'total_files_modified': len(all_modified_files), 'file_conflicts_detected': file_conflicts } } def _extract_modified_files(self, patch_content: str) -> List[str]: """Extract list of files modified by this patch""" files = [] for line in patch_content.split('\n'): if line.startswith('--- ') or line.startswith('+++ '): parts = line.split(' ', 1) if len(parts) < 2: continue file_path = parts[1].split('\t')[0].strip() # handle timestamps after tab if file_path == '/dev/null': continue # Strip git-style a/ or b/ prefixes (standard in git format-patch output) clean_path = re.sub(r'^[ab]/', '', file_path) files.append(clean_path) return list(set(files)) def _count_lines_changed(self, patch_content: str) -> int: """Count approximate number of lines changed in patch""" lines_changed = 0 for line in patch_content.split('\n'): if line.startswith('+') and not line.startswith('+++'): lines_changed += 1 elif line.startswith('-') and not line.startswith('---'): lines_changed += 1 return lines_changed def _assess_conflict_risk(self, patch_content: str, modified_files: List[str]) -> str: """Assess conflict risk level""" lines_changed = self._count_lines_changed(patch_content) if lines_changed > 100: return 'high' elif lines_changed > 50: return 'medium' else: return 'low' def _is_binary_patch(self, patch_content: str) -> bool: """Check if patch contains binary files""" return 'GIT binary patch' in patch_content or 'Binary files' in patch_content def _detect_file_conflicts(self, analysis: Dict[str, Any]) -> List[List[str]]: """Detect which patches modify the same files""" file_to_patches = {} conflicts = [] for patch_file, info in analysis.items(): if 'files_modified' not in info: continue for file_path in info['files_modified']: if file_path not in file_to_patches: file_to_patches[file_path] = [] file_to_patches[file_path].append(patch_file) for file_path, patches in file_to_patches.items(): if len(patches) > 1: conflicts.append(patches) return conflicts def _order_patches_by_dependency(self, analysis: Dict[str, Any]) -> List[str]: """Order patches to minimize conflicts""" patches = list(analysis.get('patches', {}).keys()) # Simple heuristic: smaller patches first, larger patches later def get_patch_size(patch_file): info = analysis['patches'].get(patch_file, {}) return info.get('patch_size', 0) return sorted(patches, key=get_patch_size) def apply_with_rollback(self, patch_file: str, target_dir: str) -> Dict[str, Any]: """ Apply patch with rollback capability on failure. Args: patch_file: Path to the patch file target_dir: Directory to apply the patch to Returns: Dictionary with application results """ patch_path = Path(patch_file) target_path = Path(target_dir) # Extract files that will be modified with open(patch_path, 'r') as f: patch_content = f.read() modified_files = self._extract_modified_files(patch_content) # Create backup of modified files backup_dir = tempfile.mkdtemp(prefix='patch_backup_') backed_up_files = [] for file_path in modified_files: full_path = target_path / file_path if full_path.exists(): backup_path = Path(backup_dir) / file_path backup_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(full_path, backup_path) backed_up_files.append(str(full_path)) try: result = self.apply_single_patch(patch_file, target_dir) if not result['success']: self._log("Patch failed, restoring backup") self._restore_backup(backup_dir, target_path) result['rolled_back'] = True else: result['rolled_back'] = False result['backed_up_files'] = backed_up_files return result except Exception as e: self._log(f"Exception during patch: {e}, restoring backup") self._restore_backup(backup_dir, target_path) return { 'success': False, 'rolled_back': True, 'message': f'Failed and rolled back: {str(e)}', 'backed_up_files': backed_up_files } finally: # Clean up backup directory shutil.rmtree(backup_dir, ignore_errors=True) def _restore_backup(self, backup_dir: str, target_path: Path) -> None: """Restore files from backup""" backup_path = Path(backup_dir) for backup_file in backup_path.rglob('*'): if backup_file.is_file(): relative_path = backup_file.relative_to(backup_path) target_file = target_path / relative_path target_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(backup_file, target_file) def create_patch(self, original_file: str, modified_file: str, output_patch: str) -> bool: """ Create a patch file from original and modified files. Args: original_file: Path to original file modified_file: Path to modified file output_patch: Path for output patch file Returns: True if successful """ try: with open(original_file, 'r') as f1, open(modified_file, 'r') as f2: original_lines = f1.readlines() modified_lines = f2.readlines() diff = difflib.unified_diff( original_lines, modified_lines, fromfile=original_file, tofile=modified_file, lineterm='' ) with open(output_patch, 'w') as f: f.write('\n'.join(diff)) self._log(f"Patch created: {output_patch}") return True except Exception as e: self._log(f"Error creating patch: {e}") return False def main(): """Command line interface for the patch handler""" parser = argparse.ArgumentParser(description='Robust patch file handler') parser.add_argument('patches', nargs='+', help='Patch files to apply') parser.add_argument('--target-dir', '-t', default='.', help='Target directory for patches') parser.add_argument('--repo-path', '-r', default='.', help='Repository root path') parser.add_argument('--stop-on-conflict', '-s', action='store_true', help='Stop on first conflict') parser.add_argument('--rollback', '-b', action='store_true', help='Rollback on failure') parser.add_argument('--analyze-only', '-a', action='store_true', help='Only analyze patches, do not apply') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--output-format', '-f', choices=['text', 'json'], default='text', help='Output format') args = parser.parse_args() handler = GitPatchHandler(repo_path=args.repo_path, verbose=args.verbose) if args.analyze_only: analysis = handler.analyze_patch_series(args.patches) if args.output_format == 'json': print(json.dumps(analysis, indent=2)) else: print_patch_analysis(analysis) else: result = handler.apply_patch_series( args.patches, target_dir=args.target_dir, stop_on_conflict=args.stop_on_conflict, rollback_on_failure=args.rollback ) if args.output_format == 'json': print(json.dumps(result, indent=2)) else: print_patch_results(result) sys.exit(0 if result['success'] else 1) def print_patch_analysis(analysis: Dict[str, Any]) -> None: """Print patch analysis in human-readable format""" print("Patch Analysis Report") print("=" * 50) summary = analysis['summary'] print(f"Total patches: {summary['total_patches']}") print(f"Total files modified: {summary['total_files_modified']}") print(f"File conflicts detected: {len(summary['file_conflicts_detected'])}") if summary['file_conflicts_detected']: print("\nPotential conflicts:") for conflict in summary['file_conflicts_detected']: print(f" - {', '.join(conflict)}") print("\nPatch details:") for patch_file, info in analysis['patches'].items(): if 'error' in info: print(f" {patch_file}: ERROR - {info['error']}") else: print(f" {patch_file}:") print(f" - Files: {', '.join(info['files_modified'])}") print(f" - Size: {info['patch_size']} bytes") print(f" - Lines changed: {info['lines_changed']}") print(f" - Risk: {info['conflict_risk']}") print(f" - Binary: {info['is_binary']}") def print_patch_results(result: Dict[str, Any]) -> None: """Print patch application results in human-readable format""" print("Patch Application Report") print("=" * 50) print(f"Overall success: {'YES' if result['success'] else 'NO'}") print(f"Patches applied: {len(result['patches_applied'])}/{len(result['results'])}") print("\nDetailed results:") for patch_result in result['results']: status = "✓" if patch_result['success'] else "✗" print(f" {status} {patch_result['patch']}") if not patch_result['success']: print(f" Message: {patch_result['message']}") if patch_result.get('conflicts'): print(f" Conflicts: {', '.join(patch_result['conflicts'])}") if patch_result.get('rolled_back'): print(f" [ROLLED BACK]") if __name__ == '__main__': main()