Files
simple-backup/backup.py
2025-04-24 16:54:51 +01:00

173 lines
6.9 KiB
Python
Executable File

#!/usr/bin/python3
from pathlib import Path
import json
import tarfile
import zipfile
import datetime
from functools import partial
import time
import os
import subprocess
def tar_filter(filters, tarinfo):
if any([filter_name in tarinfo.name for filter_name in filters]):
print(f"\tFiltering: {tarinfo.name}")
return None
return tarinfo
def should_filter_file(filters, file_path):
return any([filter_name in str(file_path) for filter_name in filters])
def create_zip_archive(archive_path: Path, inputs: list, ignore_patterns: list):
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_or_dir in inputs:
file_or_dir = Path(file_or_dir).expanduser().resolve()
print(f"Compressing: {file_or_dir}")
if file_or_dir.is_file():
if not should_filter_file(ignore_patterns, file_or_dir):
zipf.write(file_or_dir, arcname=file_or_dir.name)
else:
for root, dirs, files in os.walk(file_or_dir):
root_path = Path(root)
for file in files:
file_path = root_path / file
if not should_filter_file(ignore_patterns, file_path):
relative_path = file_path.relative_to(file_or_dir.parent)
zipf.write(file_path, arcname=str(relative_path))
def create_tar_archive(archive_path: Path, inputs: list, ignore_patterns: list):
filter_function = partial(tar_filter, ignore_patterns)
with tarfile.open(archive_path, "w:xz") as f:
for file_or_dir in inputs:
file_or_dir = Path(file_or_dir).expanduser().resolve()
print(f"Compressing: {file_or_dir}")
f.add(file_or_dir, filter=filter_function)
def check_remote_connection(remote_location: str) -> bool:
"""Test if we can reach the remote location using ssh."""
try:
# Extract hostname from remote location (format: user@host:path)
host = remote_location.split('@', 1)[1].split(':', 1)[0] if '@' in remote_location else remote_location.split(':', 1)[0]
# Test connection with SSH
result = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", host, "echo OK"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=10,
text=True
)
return result.returncode == 0
except Exception as e:
print(f"Error checking remote connection: {e}")
return False
def ensure_remote_directory(remote_location: str) -> bool:
"""Ensure the target directory exists on the remote host."""
try:
# Parse remote location to extract parts
if '@' in remote_location:
host = remote_location.split(':', 1)[0]
path = remote_location.split(':', 1)[1]
else:
host = remote_location.split(':', 1)[0]
path = remote_location.split(':', 1)[1]
# Create directory on remote host
result = subprocess.run(
["ssh", host, f"mkdir -p {path}"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=10,
text=True
)
return result.returncode == 0
except Exception as e:
print(f"Error ensuring remote directory: {e}")
return False
def copy_to_remote(file_path: Path, remote_location: str) -> bool:
"""Copy file to remote location using scp."""
try:
print(f"Copying {file_path} to {remote_location}...")
result = subprocess.run(
["scp", str(file_path), remote_location],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=300, # Longer timeout for file transfer
text=True
)
if result.returncode == 0:
print(f"Successfully copied to {remote_location}")
return True
else:
print(f"Error copying to remote: {result.stderr}")
return False
except Exception as e:
print(f"Error copying to remote: {e}")
return False
def local_backup(output_dir: Path, backup_name: str, inputs: list, ignore_patterns: list, archive_type="tar", remote_location=None):
current_time = datetime.datetime.now().isoformat()
start = time.time()
if archive_type == "zip":
archive_path = output_dir / f"{backup_name}-{current_time}.zip"
print(archive_path)
create_zip_archive(archive_path, inputs, ignore_patterns)
else:
archive_path = output_dir / f"{backup_name}-{current_time}.tar.gz"
print(archive_path)
create_tar_archive(archive_path, inputs, ignore_patterns)
print(f"Local backup completed in: {(time.time() - start) / 60:.2f} mins")
# Handle remote copy if configured
if remote_location:
remote_start = time.time()
print(f"Remote location configured: {remote_location}")
if check_remote_connection(remote_location):
print("Remote connection successful")
if ensure_remote_directory(remote_location):
print("Remote directory exists or was created successfully")
if copy_to_remote(archive_path, remote_location):
print(f"Remote backup completed in: {(time.time() - remote_start) / 60:.2f} mins")
else:
print("Failed to copy backup to remote location")
else:
print("Failed to ensure remote directory exists")
else:
print("Failed to connect to remote location")
def backup_all():
config_dir = Path(__file__).parent / "configs"
# Get all JSON files except the example config
config_files = sorted([f for f in config_dir.glob("*.json") if f.name != "example-config.json"])
if not config_files:
print("No configuration files found. Please create a config file in the 'configs' directory.")
print("You can copy the example-config.json to create your own configuration:")
print("cp configs/example-config.json configs/my-backup.json")
return
print(f"Found {len(config_files)} config files: {[f.name for f in config_files]}")
for file in config_files:
print(f"\nProcessing backup configuration: {file.name}")
text = file.read_text()
config = json.loads(text)
output_dir = Path(config["outputDir"]).expanduser().resolve()
output_dir.mkdir(parents=True, exist_ok=True)
inputs = config["inputs"]
ignore_patterns = config["ignorePatterns"]
backup_name = file.stem
archive_type = config.get("archiveType", "tar")
remote_location = config.get("remoteLocation", None)
local_backup(output_dir, backup_name, inputs, ignore_patterns, archive_type, remote_location)
if __name__ == "__main__":
backup_all()