#!/usr/bin/python3 from pathlib import Path import json import tarfile import zipfile import datetime from functools import partial import time import os import subprocess def tar_filter(filters, tarinfo): if any([filter_name in tarinfo.name for filter_name in filters]): print(f"\tFiltering: {tarinfo.name}") return None return tarinfo def should_filter_file(filters, file_path): return any([filter_name in str(file_path) for filter_name in filters]) def create_zip_archive(archive_path: Path, inputs: list, ignore_patterns: list): with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_or_dir in inputs: file_or_dir = Path(file_or_dir).expanduser().resolve() print(f"Compressing: {file_or_dir}") if file_or_dir.is_file(): if not should_filter_file(ignore_patterns, file_or_dir): zipf.write(file_or_dir, arcname=file_or_dir.name) else: for root, dirs, files in os.walk(file_or_dir): root_path = Path(root) for file in files: file_path = root_path / file if not should_filter_file(ignore_patterns, file_path): relative_path = file_path.relative_to(file_or_dir.parent) zipf.write(file_path, arcname=str(relative_path)) def create_tar_archive(archive_path: Path, inputs: list, ignore_patterns: list): filter_function = partial(tar_filter, ignore_patterns) with tarfile.open(archive_path, "w:xz") as f: for file_or_dir in inputs: file_or_dir = Path(file_or_dir).expanduser().resolve() print(f"Compressing: {file_or_dir}") f.add(file_or_dir, filter=filter_function) def check_remote_connection(remote_location: str) -> bool: """Test if we can reach the remote location using ssh.""" try: # Extract hostname from remote location (format: user@host:path) host = remote_location.split('@', 1)[1].split(':', 1)[0] if '@' in remote_location else remote_location.split(':', 1)[0] # Test connection with SSH result = subprocess.run( ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", host, "echo OK"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10, text=True ) return result.returncode == 0 except Exception as e: print(f"Error checking remote connection: {e}") return False def ensure_remote_directory(remote_location: str) -> bool: """Ensure the target directory exists on the remote host.""" try: # Parse remote location to extract parts if '@' in remote_location: host = remote_location.split(':', 1)[0] path = remote_location.split(':', 1)[1] else: host = remote_location.split(':', 1)[0] path = remote_location.split(':', 1)[1] # Create directory on remote host result = subprocess.run( ["ssh", host, f"mkdir -p {path}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10, text=True ) return result.returncode == 0 except Exception as e: print(f"Error ensuring remote directory: {e}") return False def copy_to_remote(file_path: Path, remote_location: str) -> bool: """Copy file to remote location using scp.""" try: print(f"Copying {file_path} to {remote_location}...") result = subprocess.run( ["scp", str(file_path), remote_location], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300, # Longer timeout for file transfer text=True ) if result.returncode == 0: print(f"Successfully copied to {remote_location}") return True else: print(f"Error copying to remote: {result.stderr}") return False except Exception as e: print(f"Error copying to remote: {e}") return False def local_backup(output_dir: Path, backup_name: str, inputs: list, ignore_patterns: list, archive_type="tar", remote_location=None): current_time = datetime.datetime.now().isoformat() start = time.time() if archive_type == "zip": archive_path = output_dir / f"{backup_name}-{current_time}.zip" print(archive_path) create_zip_archive(archive_path, inputs, ignore_patterns) else: archive_path = output_dir / f"{backup_name}-{current_time}.tar.gz" print(archive_path) create_tar_archive(archive_path, inputs, ignore_patterns) print(f"Local backup completed in: {(time.time() - start) / 60:.2f} mins") # Handle remote copy if configured if remote_location: remote_start = time.time() print(f"Remote location configured: {remote_location}") if check_remote_connection(remote_location): print("Remote connection successful") if ensure_remote_directory(remote_location): print("Remote directory exists or was created successfully") if copy_to_remote(archive_path, remote_location): print(f"Remote backup completed in: {(time.time() - remote_start) / 60:.2f} mins") else: print("Failed to copy backup to remote location") else: print("Failed to ensure remote directory exists") else: print("Failed to connect to remote location") def backup_all(): config_dir = Path(__file__).parent / "configs" # Get all JSON files except the example config config_files = sorted([f for f in config_dir.glob("*.json") if f.name != "example-config.json"]) if not config_files: print("No configuration files found. Please create a config file in the 'configs' directory.") print("You can copy the example-config.json to create your own configuration:") print("cp configs/example-config.json configs/my-backup.json") return print(f"Found {len(config_files)} config files: {[f.name for f in config_files]}") for file in config_files: print(f"\nProcessing backup configuration: {file.name}") text = file.read_text() config = json.loads(text) output_dir = Path(config["outputDir"]).expanduser().resolve() output_dir.mkdir(parents=True, exist_ok=True) inputs = config["inputs"] ignore_patterns = config["ignorePatterns"] backup_name = file.stem archive_type = config.get("archiveType", "tar") remote_location = config.get("remoteLocation", None) local_backup(output_dir, backup_name, inputs, ignore_patterns, archive_type, remote_location) if __name__ == "__main__": backup_all()