173 lines
6.9 KiB
Python
Executable File
173 lines
6.9 KiB
Python
Executable File
#!/usr/bin/python3
|
|
from pathlib import Path
|
|
import json
|
|
import tarfile
|
|
import zipfile
|
|
import datetime
|
|
from functools import partial
|
|
import time
|
|
import os
|
|
import subprocess
|
|
def tar_filter(filters, tarinfo):
|
|
if any([filter_name in tarinfo.name for filter_name in filters]):
|
|
print(f"\tFiltering: {tarinfo.name}")
|
|
return None
|
|
return tarinfo
|
|
|
|
def should_filter_file(filters, file_path):
|
|
return any([filter_name in str(file_path) for filter_name in filters])
|
|
|
|
def create_zip_archive(archive_path: Path, inputs: list, ignore_patterns: list):
|
|
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for file_or_dir in inputs:
|
|
file_or_dir = Path(file_or_dir).expanduser().resolve()
|
|
print(f"Compressing: {file_or_dir}")
|
|
|
|
if file_or_dir.is_file():
|
|
if not should_filter_file(ignore_patterns, file_or_dir):
|
|
zipf.write(file_or_dir, arcname=file_or_dir.name)
|
|
else:
|
|
for root, dirs, files in os.walk(file_or_dir):
|
|
root_path = Path(root)
|
|
for file in files:
|
|
file_path = root_path / file
|
|
if not should_filter_file(ignore_patterns, file_path):
|
|
relative_path = file_path.relative_to(file_or_dir.parent)
|
|
zipf.write(file_path, arcname=str(relative_path))
|
|
|
|
def create_tar_archive(archive_path: Path, inputs: list, ignore_patterns: list):
|
|
filter_function = partial(tar_filter, ignore_patterns)
|
|
with tarfile.open(archive_path, "w:xz") as f:
|
|
for file_or_dir in inputs:
|
|
file_or_dir = Path(file_or_dir).expanduser().resolve()
|
|
print(f"Compressing: {file_or_dir}")
|
|
f.add(file_or_dir, filter=filter_function)
|
|
|
|
def check_remote_connection(remote_location: str) -> bool:
|
|
"""Test if we can reach the remote location using ssh."""
|
|
try:
|
|
# Extract hostname from remote location (format: user@host:path)
|
|
host = remote_location.split('@', 1)[1].split(':', 1)[0] if '@' in remote_location else remote_location.split(':', 1)[0]
|
|
|
|
# Test connection with SSH
|
|
result = subprocess.run(
|
|
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", host, "echo OK"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=10,
|
|
text=True
|
|
)
|
|
return result.returncode == 0
|
|
except Exception as e:
|
|
print(f"Error checking remote connection: {e}")
|
|
return False
|
|
|
|
def ensure_remote_directory(remote_location: str) -> bool:
|
|
"""Ensure the target directory exists on the remote host."""
|
|
try:
|
|
# Parse remote location to extract parts
|
|
if '@' in remote_location:
|
|
host = remote_location.split(':', 1)[0]
|
|
path = remote_location.split(':', 1)[1]
|
|
else:
|
|
host = remote_location.split(':', 1)[0]
|
|
path = remote_location.split(':', 1)[1]
|
|
|
|
# Create directory on remote host
|
|
result = subprocess.run(
|
|
["ssh", host, f"mkdir -p {path}"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=10,
|
|
text=True
|
|
)
|
|
return result.returncode == 0
|
|
except Exception as e:
|
|
print(f"Error ensuring remote directory: {e}")
|
|
return False
|
|
|
|
def copy_to_remote(file_path: Path, remote_location: str) -> bool:
|
|
"""Copy file to remote location using scp."""
|
|
try:
|
|
print(f"Copying {file_path} to {remote_location}...")
|
|
result = subprocess.run(
|
|
["scp", str(file_path), remote_location],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=300, # Longer timeout for file transfer
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
print(f"Successfully copied to {remote_location}")
|
|
return True
|
|
else:
|
|
print(f"Error copying to remote: {result.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error copying to remote: {e}")
|
|
return False
|
|
|
|
def local_backup(output_dir: Path, backup_name: str, inputs: list, ignore_patterns: list, archive_type="tar", remote_location=None):
|
|
current_time = datetime.datetime.now().isoformat()
|
|
start = time.time()
|
|
|
|
if archive_type == "zip":
|
|
archive_path = output_dir / f"{backup_name}-{current_time}.zip"
|
|
print(archive_path)
|
|
create_zip_archive(archive_path, inputs, ignore_patterns)
|
|
else:
|
|
archive_path = output_dir / f"{backup_name}-{current_time}.tar.gz"
|
|
print(archive_path)
|
|
create_tar_archive(archive_path, inputs, ignore_patterns)
|
|
|
|
print(f"Local backup completed in: {(time.time() - start) / 60:.2f} mins")
|
|
|
|
# Handle remote copy if configured
|
|
if remote_location:
|
|
remote_start = time.time()
|
|
print(f"Remote location configured: {remote_location}")
|
|
|
|
if check_remote_connection(remote_location):
|
|
print("Remote connection successful")
|
|
|
|
if ensure_remote_directory(remote_location):
|
|
print("Remote directory exists or was created successfully")
|
|
|
|
if copy_to_remote(archive_path, remote_location):
|
|
print(f"Remote backup completed in: {(time.time() - remote_start) / 60:.2f} mins")
|
|
else:
|
|
print("Failed to copy backup to remote location")
|
|
else:
|
|
print("Failed to ensure remote directory exists")
|
|
else:
|
|
print("Failed to connect to remote location")
|
|
|
|
|
|
def backup_all():
|
|
config_dir = Path(__file__).parent / "configs"
|
|
# Get all JSON files except the example config
|
|
config_files = sorted([f for f in config_dir.glob("*.json") if f.name != "example-config.json"])
|
|
|
|
if not config_files:
|
|
print("No configuration files found. Please create a config file in the 'configs' directory.")
|
|
print("You can copy the example-config.json to create your own configuration:")
|
|
print("cp configs/example-config.json configs/my-backup.json")
|
|
return
|
|
|
|
print(f"Found {len(config_files)} config files: {[f.name for f in config_files]}")
|
|
|
|
for file in config_files:
|
|
print(f"\nProcessing backup configuration: {file.name}")
|
|
text = file.read_text()
|
|
config = json.loads(text)
|
|
output_dir = Path(config["outputDir"]).expanduser().resolve()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
inputs = config["inputs"]
|
|
ignore_patterns = config["ignorePatterns"]
|
|
backup_name = file.stem
|
|
archive_type = config.get("archiveType", "tar")
|
|
remote_location = config.get("remoteLocation", None)
|
|
local_backup(output_dir, backup_name, inputs, ignore_patterns, archive_type, remote_location)
|
|
if __name__ == "__main__":
|
|
backup_all() |