#!/bin/python # MIT License # # Copyright (c) 2026 dev@ptrace.dev # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # import pickle import sys import os import tarfile import shutil import subprocess import time from functools import wraps from enum import Enum from concurrent.futures import ThreadPoolExecutor from uuid import uuid4 from argparse import ArgumentParser, RawDescriptionHelpFormatter from pathlib import Path from datetime import datetime PATH_HOME = Path.home() PATH_CONFIG = PATH_HOME / Path(".config") PATH_CONFIG_SOD_DIR = Path("sod-backup-helper") PATH_CONFIG_FILE = Path("sod.config") PATH_CONFIG_FILE_DOT = Path(".sod-config") SUCCESS = True FAILURE = False ARGP_DESCRIPTION = """ Creates a backup from your supplied path and distributes it to specific directories. Files getting archived and compressed using `tar` command. Stored as `____.tar.gz` You can also change the exact pattern of the file name, like this: sod -c "my_prefix__.tar.gz" Note: It stores a config file in `~/.config/sod-backup-helper` or `~/.sod-config`. It depends if the `.config` directory exists. """ HELP_PATH = "" HELP_P = "Adds backup paths. Example: `-p Foo Bar` or `-p Foo -p Bar`" HELP_L = "Lists all backup paths." HELP_R = "Removes the backup path by index. Index is supplied by the `-l` argument." HELP_N = "Removes ALL backup paths. Passing `-nn` removes the config file." HELP_C = "Define a new pattern. You can also use the following palceholders: " HELP_Z = "Enable compression." HELP_X = "Don't create the archive, just print debug info." HELP_D = "Prints the current state." class Archive_Backend(Enum): NONE = 0 PYTHON = 1 GNU = 2 class State: def __init__(self): self.backup_destinations = [] self.name_pattern = "" default_state = State() default_state.backup_destinations = [] default_state.name_pattern = "_%Y_%m_%d_" class Args: def __init__(self): self.parser = ArgumentParser( prog = "Safe our Data", formatter_class = RawDescriptionHelpFormatter, description = ARGP_DESCRIPTION ) self.parser.add_argument("path", type=Path, nargs="?", default=None, help=HELP_PATH) self.parser.add_argument("-p", "--backup-path-add", action="extend", nargs="+", type=str, help=HELP_P) self.parser.add_argument("-l", "--backup-path-list", action="store_true", help=HELP_L) self.parser.add_argument("-r", "--backup-path-remove", type=int, help=HELP_R) self.parser.add_argument("-n", "--backup-path-nuke", action="count", default=0, help=HELP_N) self.parser.add_argument("-c", "--pattern-change", type=str, help=HELP_C) self.parser.add_argument("-z", "--archive-compress", action="store_true", help=HELP_Z) self.parser.add_argument("-x", "--debug-dry-run", action="store_true", help=HELP_X) self.parser.add_argument("-d", "--debug", action="store_true", help=HELP_D) def get(self): return self.parser.parse_args() def help(self): self.parser.print_help() return self args_handle = Args() args = args_handle.get() def timer(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) print(f"{func.__name__} took {time.perf_counter() - start:.6f}s") return result return wrapper def run(): config_exists, config_fp = config_find() if not config_exists: if not config_create_file(config_fp): return FAILURE success, state = config_read(config_fp) if not success: return FAILURE if args.path: return backup_do(state, args.path) operation = SUCCESS if args.backup_path_list: backup_path_list(state) elif cringe(args.pattern_change): backup_name_fmt_new(state, args.pattern_change) elif args.debug: debug_view(state, config_fp) elif cringe(args.backup_path_add): operation = backup_path_add(state, args.backup_path_add) elif cringe(args.backup_path_remove): operation = backup_path_remove(state, args.backup_path_remove) elif args.backup_path_nuke == 1: backup_path_nuke(state) elif args.backup_path_nuke == 2: return config_delete(config_fp) if operation == SUCCESS: config_write(state, config_fp) return operation def cringe(anything) -> bool: return anything is not None def path_process_and_validate(state: State, path: Path) -> tuple[bool, Path, Path]: if not path.exists(): print("This path does not exist:", path) return FAILURE, Path(), Path() stem = path.stem archive_name = backup_name_create(state, stem) return SUCCESS, archive_name, path @timer def archive_make(archive_fp, file_to_archive) -> bool: backend = archive_backend_determine() if args.debug_dry_run: dbg("-- Archiving -------------") dbg(f"archive_fp: {archive_fp}") dbg(f"file_to_archive: {file_to_archive}") dbg(f"backend: {backend}") dbg(f"compression: {args.archive_compress}") return SUCCESS try: if backend == Archive_Backend.GNU: archive_backend_gnu(archive_fp, file_to_archive) elif backend == Archive_Backend.PYTHON: archive_backend_python(archive_fp, file_to_archive) else: print("This should not happen! Blame the developer!") return FAILURE return SUCCESS except Exception as e: print("Could not archive file:", e) return FAILURE def archive_backend_python(archive_fp, file_to_archive): compression = "w:gz" if args.archive_compress else "w:" with tarfile.open(archive_fp, compression) as tar: tar.add(file_to_archive) def archive_backend_gnu(archive_fp, file_to_archive): compression = "-czf" if args.archive_compress else "-cf" cmd = ["tar", compression, archive_fp, file_to_archive] proc = subprocess.run(cmd, capture_output=True) # fugly check, I know if proc.returncode != 0: print(proc.stderr) print(proc.stdout) proc.check_returncode() def archive_backend_determine() -> Archive_Backend: probe_tar_cmd = ["tar", "--version"] s = subprocess.run(probe_tar_cmd, capture_output=True) if s.returncode == 0: return Archive_Backend.GNU else: return Archive_Backend.PYTHON def make_directory_if_not_exists(fp: Path) -> bool: if args.debug_dry_run: dbg(f"mkdir: {fp}") return SUCCESS try: fp.mkdir(exist_ok=True) return SUCCESS except Exception as e: print("Could not create directory:", e) return FAILURE def backup_do(state: State, fp) -> bool: if len(state.backup_destinations) == 0: print("No backup path defined. Canceling ...") return FAILURE operation, archive_name, unarchived_file_path = path_process_and_validate(state, fp) if not operation: return operation first_backup_dir = state.backup_destinations[0] intermediate_path = Path(unarchived_file_path.stem) archive_dir = first_backup_dir / intermediate_path archive_fp = archive_dir / archive_name if not make_directory_if_not_exists(archive_dir): return FAILURE if not archive_make(archive_fp, unarchived_file_path): return FAILURE if not backup_distribute(state, archive_name, intermediate_path): return FAILURE print("Backups archived and distributed.") return SUCCESS @timer def backup_distribute(state: State, archive_name, intermediate_path) -> bool: def copy_file(src, dst): shutil.copy(src, dst) def distribute(src, destinations, max_workers=5): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(copy_file, src, dst) for dst in destinations] for future in futures: future.result() source_file = state.backup_destinations[0] / intermediate_path / archive_name if not args.debug_dry_run and not source_file.exists(): print("[ Error ]: Sanity check failed, could not find:", source_file) return FAILURE valid_backup_paths = [] for i in range(1, len(state.backup_destinations)): path = state.backup_destinations[i] / intermediate_path if not make_directory_if_not_exists(path): print("[ Warning ]: Skipping current operation.") continue valid_backup_paths.append(path / archive_name) if args.debug_dry_run: dbg("-- Distribute ------------") dbg(f"source_file: {source_file}") dbg("valid_backup_paths:") for dbg_paths in valid_backup_paths: dbg(f" {dbg_paths}") return SUCCESS try: distribute(source_file, valid_backup_paths) except Exception as e: print("Could not copy backups to other destinations:", e) return FAILURE return SUCCESS def backup_name_create(state: State, fn) -> Path: as_date = datetime.now().strftime(state.name_pattern) out = (as_date.replace("", fn) .replace("", str(uuid4()))) file_ext = "tar.gz" if args.archive_compress else "tar" return Path(f"{out}.{file_ext}") def backup_name_fmt_new(state: State, fmt): state.name_pattern = fmt print("Created new pattern:", state.name_pattern) def backup_path_nuke(state: State): if len(state.backup_destinations) == 0: print("No paths in list") return safe_phrase = "I'm damn sure!" print("Are you sure to remove every path? " \ f"Type `{safe_phrase}` to confirm it, or nothing to cancel:") user = input("> ") if user == safe_phrase: state.backup_destinations = [] print("Removed all paths.") return print("Operation canceled.") def backup_path_remove(state: State, path_index): length = len(state.backup_destinations) if length == 0: print("No paths in list") return SUCCESS if path_index < 0 or path_index > length-1 : print("Invalid index. Can't be lower than zero or higher than", length-1) return FAILURE removed = state.backup_destinations.pop(path_index) print("Removed:", removed) return SUCCESS def backup_path_add(state: State, paths): if not paths: print("No paths supplied") return FAILURE d = state.backup_destinations for path in paths: p = Path(path).resolve() if not p.exists(): print("[ Warning ]: Skipped this path, since it does not exist:", p) continue if not p in d: d.append(p) print("\nCurrent backup paths:") backup_path_list(state) return SUCCESS def backup_path_list(state: State): if not state.backup_destinations: print("No backup paths defined yet") for idx, path in enumerate(state.backup_destinations): print(f" {idx}:", path) print() def config_find() -> tuple[bool, Path]: path_config_at_home = PATH_HOME / PATH_CONFIG_FILE_DOT path_config_at_dotconfig = PATH_CONFIG / PATH_CONFIG_SOD_DIR / PATH_CONFIG_FILE if PATH_CONFIG.exists(): if path_config_at_dotconfig.exists(): return SUCCESS, path_config_at_dotconfig else: return FAILURE, path_config_at_dotconfig if path_config_at_home: return SUCCESS, path_config_at_home return FAILURE, path_config_at_home def config_create_file(config_fp: Path) -> bool: try: config_fp.parent.mkdir(exist_ok=True) success = config_write(default_state, config_fp) if not success: return FAILURE print(f"Created configuration file at `{config_fp}`") return SUCCESS except Exception as e: print("Error: Could not create file:", e) return FAILURE def config_delete(config_fp: Path) -> bool: safe_phrase = "YEEES!" print("Are you sure you want to delete the configuration file?") print(f"Type `{safe_phrase}` to delete, type nothing to abort:") user = input("> ") if user == safe_phrase: try: os.remove(config_fp) print("Deleted:", config_fp) return SUCCESS except Exception as e: print("Could not delete file:", e) return FAILURE print("Operation canceled.") return SUCCESS def config_write(state: State, config_fp) -> bool: try: with open(config_fp, 'wb') as f: pickle.dump(state, f) return SUCCESS except Exception as e: print("Error: Could not write config file:", e) return FAILURE def config_read(config_fp) -> tuple[bool, State]: try: with open(config_fp, 'rb') as f: return SUCCESS, pickle.load(f) except Exception as e: print("Error: Could not read config file:", e) return FAILURE, State() def debug_view(state: State, config_fp): print("Config Path:", config_fp) debug_display_state_file("State File ", state) debug_display_state_file("Default Config ", default_state) def debug_display_state_file(label, state: State): print(f"-- Debug: {label:-<40}") print() for k, v in state.__dict__.items(): if isinstance(v, list): print(f"{' '*4}{k}:") for e in v: print(f"{' '*8}{e}") print() else: print(f"{' '*4}{k}: {v}") print() def dbg(msg): print("[ DEBUG ]:", msg) if __name__ == "__main__": if len(sys.argv) == 1: args_handle.help() sys.exit(0) if run(): sys.exit(0) else: sys.exit(-1)