diff --git a/domain/BackupRotator.py b/domain/BackupRotator.py index 37b2565..0fe945f 100755 --- a/domain/BackupRotator.py +++ b/domain/BackupRotator.py @@ -19,10 +19,8 @@ from domain.Util import Util import datetime -import os -# import pprint +from pathlib import Path import shutil -import time import yaml @@ -78,38 +76,52 @@ class BackupRotator: def debug(self, s): self.__logger.debug(s) + def info(self, s): self.__logger.info(s) + def warn(self, s): - self.__logger.warn(s) + self.__logger.warning(s) + def error(self, s): self.__logger.error(s) - def _consume_configs(self, paths: list=None): + def _consume_configs(self, paths: [Path] = None): configs = self.__config_helper.gather_valid_configs(paths=paths) + for config in configs: + self._consume_config(path=config) - def _consume_config(self, path: str): + def _consume_config(self, path: Path): + + self.debug(f"Consuming config: {path}") + assert path.is_file(), ( + f"Cannot consume config file because it isn't a file: {path}" + ) # Open the file - f = open(path) + self.debug(f"Opening config file for consumption: {path}") + f = open(str(path)) if not f: - raise Exception("Unable to open config file: " + path) + raise Exception(f"Unable to open config file: {path}") # Parse - config = yaml.safe_load(f) + config_raw = yaml.safe_load(f) + assert config_raw is not None, ( + f"Config file seems to be null or empty: {path}" + ) # Add its own path - config["__path"] = path + config_raw["__path"] = path # Consume to internal - self.__configs.append(config) + self.__configs.append(config_raw) self.info(f"Consumed config from path: {path}") def _do_rotate(self, config): - + self._rotate_paths(config) def _rotate_paths(self, config): @@ -118,30 +130,46 @@ class BackupRotator: for path in config["paths"]: self._rotate_path(config, path) - def _rotate_path(self, config, path): + def _rotate_path(self, config, path: Path): - assert os.path.isdir(path), "Path should be a directory: {}".format(path) + assert path.is_dir(), ( + f"Path should be a directory: {path}" + ) - self.info("Rotating path: {}".format(path)) + self.info( + f"Rotating path: {path}" + ) found_any_rotation_keys = False if "maximum-items" in config.keys(): + found_any_rotation_keys = True - self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"]) + + self._rotate_path_for_maximum_items( + config=config, + path=path, + max_items=config["maximum-items"] + ) + if "maximum-age" in config.keys(): + found_any_rotation_keys = True - self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"]) - - assert found_any_rotation_keys is True, \ + + self._rotate_path_for_maximum_age( + config=config, + path=path, + max_age_days=config["maximum-age"] + ) + + assert found_any_rotation_keys is True, ( "Config needs one of the following keys: \"maximum-items\"" - - def _rotate_path_for_maximum_items(self, config, path: str, max_items: int): + ) + + def _rotate_path_for_maximum_items(self, config, path: Path, max_items: int): - assert os.path.isdir(path), "Path should be a directory: {}".format(path) + assert path.is_dir(), f"Path should be a directory: {path}" - self.info("Rotating path for a maximum of {} items: {}".format( - max_items, path - )) + self.info(f"Rotating path for a maximum of {max_items} items: {path}") children = self._gather_rotation_candidates(config, path) @@ -149,39 +177,51 @@ class BackupRotator: # Do we need to rotate anything out? if len(children) < minimum_items: - self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format( - len(children), minimum_items - )) + self.info( + f"Path only has {len(children)} items" + f", which does not meet the minimum threshold of {minimum_items} items." + " Won't rotate this path." + ) return elif len(children) <= max_items: - self.info("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format( - len(children), max_items - )) + self.info( + f"Path only has {len(children)} items" + f", but needs more than {max_items} for rotation" + "; Won't rotate this path." + ) return - self.info("Found {} items to examine".format(len(children))) + + self.info(f"Found {len(children)} items to examine") # maximum_purge_count = len(children) - minimum_items purge_count = len(children) - max_items - self.info("Want to purge {} items".format(purge_count)) - + self.info(f"Want to purge {purge_count} items") + if purge_count > maximum_purge_count: - self.info("Reducing purge count from {} to {} items to respect minimum items setting ({})".format( - purge_count, maximum_purge_count, minimum_items - )) + self.info( + f"Reducing purge count from" + f" {purge_count} to {maximum_purge_count} items" + f" to respect minimum items setting ({minimum_items})" + ) purge_count = maximum_purge_count children_to_purge = [] for purge_index in range(purge_count): # - item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children) + item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item( + config, children + ) + item_to_purge: Path + children.remove(item_to_purge) - self.info("Found next item to purge: ({}) {} ({})".format( - purge_index + 1, - os.path.basename(item_to_purge), - item_age - )) + + self.info( + f"Found next item to purge: ({purge_index + 1})" + f" {item_to_purge.name}" + f" ({item_age})" + ) # children_to_purge.append(item_to_purge) @@ -189,79 +229,99 @@ class BackupRotator: # self.info("Removing items") for child_to_purge in children_to_purge: - child_basename = os.path.basename(child_to_purge) + + child_to_purge: Path + + self.debug(f"Purging item: {child_to_purge.name}") + self._remove_item(config, child_to_purge) - def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int): + def _rotate_path_for_maximum_age(self, config, path: Path, max_age_days: int): - assert os.path.isdir(path), "Path should be a directory: {}".format(path) + assert path.is_dir(), f"Path should be a directory: {path}" - self.info("Rotating path for max age of {} days: {}".format(max_age_days, path)) + self.info( + f"Rotating path for max age of {max_age_days} days: {path}" + ) children = self._gather_rotation_candidates(config, path) minimum_items = self._determine_minimum_items(config) # Do we need to rotate anything out? if len(children) < minimum_items: - self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format( - len(children), minimum_items - )) + self.info( + f"Path only has {len(children)} items" + f", which does not meet the minimum threshold of {minimum_items} items." + f" Won't rotate this path." + ) return - self.info("Examining {} items for deletion".format(len(children))) + self.info( + f"Examining {len(children)} items for deletion" + ) children_to_delete = [] for child in children: age_seconds = self._detect_item_age_seconds(config, child) age_days = self._detect_item_age_days(config, child) age_formatted = Util.seconds_to_time_string(age_seconds) - child_basename = os.path.basename(child) if age_days > max_age_days: - self.info("[Old enough ] {} ({})".format( - child_basename, age_formatted - )) + self.info( + f"[Old enough ] {child.name} ({age_formatted})" + ) children_to_delete.append(child) else: - self.info("[Not Old enough] {} ({})".format( - child_basename, age_formatted - )) + self.info( + f"[Not Old enough] {child.name} ({age_formatted})" + ) if len(children_to_delete) > 0: + self.info("Removing old items ...") + for child_to_delete in children_to_delete: - basename = os.path.basename(child_to_delete) self._remove_item(config, child_to_delete) + else: self.info("No old items to remove") + + def _gather_rotation_candidates(self, config, path: Path): + self.debug(f"Begin gathering rotation candidates for: {path}") - @staticmethod - def _gather_rotation_candidates(config, path): - - candidates = [] + candidates: [Path] = [] if "target-type" not in config.keys(): raise Exception("Please provide the configuration key: target-type") - for item_name in os.listdir(path): + for item_name in path.iterdir(): - item_path = os.path.join(path, item_name) + item_path = path / item_name + self.debug(f"Found an item: {item_name} ==> {item_path}") if config["target-type"] == "file": - if not os.path.isfile(item_path): + + if not item_path.is_file(): + self.debug(f"Not a file; Skipping: {item_name}") continue + elif config["target-type"] == "directory": - if not os.path.isdir(item_path): + + if not item_path.is_dir(): + self.debug(f"Not a directory; Skipping: {item_name}") continue + else: - raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"") + raise Exception( + "Configuration key \"target-type\" must be \"file\" or \"directory\"" + ) candidates.append(item_path) return candidates - def _pick_oldest_item(self, config, items): + def _pick_oldest_item(self, config, items) -> (Path, float, float, str): best_item = None best_ctime = None @@ -278,77 +338,113 @@ class BackupRotator: return best_item, best_ctime, age_seconds, age_string @staticmethod - def _detect_item_date(config, item): + def _detect_item_date(config, item: Path) -> datetime.datetime: - assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\"" + assert "date-detection" in config.keys(), ( + "Please provide config key: \"date-detection\"" + ) detection = config["date-detection"] - + if detection == "file": - ctime = os.path.getctime(item) + ctime = datetime.datetime.fromtimestamp( + item.stat().st_ctime, tz=datetime.timezone.utc + ) + else: - raise AssertionError(f"Invalid value for \"date-detection\"; Should be one of [file]: {detection}") - + raise AssertionError( + f"Invalid value for \"date-detection\"" + "; Should be one of [file]: {detection}" + ) + return ctime - - def _detect_item_age_seconds(self, config, item): + + def _detect_item_age_seconds(self, config, item: Path) -> float: + + now = datetime.datetime.now() - now = time.time() ctime = self._detect_item_date(config, item) - delta = now - ctime + delta = now - ctime.now() - return delta - - def _detect_item_age_days(self, config, item): + return delta.seconds + + def _detect_item_age_days(self, config, item: Path) -> int: age_seconds = self._detect_item_age_seconds(config, item) age_days = int(age_seconds / 86400) return age_days - def _remove_item(self, config, path): + def _remove_item(self, config, path: Path): - if os.path.isfile(path): + if path.is_file(): + self._remove_file(config, path) - elif os.path.isdir(path): + + elif path.is_dir(): + self._remove_directory(config, path) + else: - raise AssertionError("Don't know how to remove this item: {}".format(path)) - - def _remove_file(self, config, file_path): + raise AssertionError( + f"Don't know how to remove this item: {path}" + ) + + def _remove_file(self, config, file_path: Path): - if not os.path.isfile(file_path): - raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path)) + if not file_path.is_file(): + raise Exception( + f"Tried to remove a file, but this path isn't a file: {file_path}" + ) if self.__dry_run: + self.info(f"Won't purge file during global-level dry run: {file_path}") + elif "dry-run" in config.keys() and config["dry-run"] is True: + self.info(f"Won't purge file during config-level dry run: {file_path}") + else: self.info(f"Purging file: {file_path}") - os.remove(file_path) + file_path.unlink() - def _remove_directory(self, config, dir_path): + def _remove_directory(self, config, dir_path: Path): - if not os.path.isdir(dir_path): - raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path)) + if not dir_path.is_dir(): + raise Exception( + f"Tried to remove a directory" + f", but this path isn't a directory: {dir_path}" + ) if self.__dry_run: + self.info(f"Won't purge directory during global-level dry run: {dir_path}") + elif "dry-run" in config.keys() and config["dry-run"] is True: + self.info(f"Won't purge directory during config-level dry run: {dir_path}") + else: + self.info(f"Purging directory: {dir_path}") shutil.rmtree(dir_path) - - def _determine_minimum_items(self, config): + def _determine_minimum_items(self, config) -> int: minimum_items = 0 if "minimum-items" in config.keys(): + minimum_items = config["minimum-items"] - self.info("Won't delete anything unless a minimum of {} items were found".format(minimum_items)) + + self.info( + f"Won't delete anything unless a minimum of {minimum_items} items were found" + ) + else: - self.info("No value found for \"minimum-items\"; Will not enforce minimum item constraint.") + self.info( + "No value found for \"minimum-items\"" + "; Will not enforce minimum item constraint." + ) return minimum_items diff --git a/domain/Util.py b/domain/Util.py index abc2cb4..fcf9732 100644 --- a/domain/Util.py +++ b/domain/Util.py @@ -1,10 +1,27 @@ +from pathlib import Path + + class Util: def __init__(self): pass + @staticmethod + def get_dir_files_recursive(path: Path) -> [Path]: + + files_paths = [] + for dir_path, dirs_names, filenames in path.walk(): + + for file_name in filenames: + + file_path = dir_path / file_name + + files_paths.append(file_path) + + return files_paths + @staticmethod def seconds_to_time_string(seconds: float): @@ -28,13 +45,18 @@ class Util: s_parts = [] for unit_label in dt_map.keys(): + unit_seconds = dt_map[unit_label] + if seconds >= unit_seconds: + unit_count = int(seconds / unit_seconds) - s_parts.append("{} {}{}".format( - unit_count, unit_label, - "" if unit_count == 1 else "s" - )) + + unit_plural = "" if unit_count == 1 else "s" + s_parts.append( + f"{unit_count} {unit_label}{unit_plural}" + ) + seconds -= unit_seconds * unit_count s = ", ".join(s_parts) diff --git a/domain/Config.py b/domain/config/Config.py similarity index 61% rename from domain/Config.py rename to domain/config/Config.py index 4a002ed..f0491c6 100644 --- a/domain/Config.py +++ b/domain/config/Config.py @@ -1,8 +1,11 @@ from domain.Logger import Logger +from domain.Util import Util -import os + +# import os +from pathlib import Path class Config: @@ -12,37 +15,24 @@ class Config: "yml" ] - def __init__(self, logger): + def __init__(self, logger: Logger): self.__logger = logger self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS def debug(self, s): self.__logger.debug(f"[{type(self).__name__}] {s}") + def info(self, s): self.__logger.info(f"[{type(self).__name__}] {s}") + def warn(self, s): - self.__logger.warn(f"[{type(self).__name__}] {s}") + self.__logger.warning(f"[{type(self).__name__}] {s}") + def error(self, s): self.__logger.error(f"[{type(self).__name__}] {s}") - @staticmethod - def get_dir_files_recursive(path: str): - - files_paths = [] - - for dir_path, dirnames, filenames in os.walk(path): - - for file_name in filenames: - - file_path = os.path.join(dir_path, file_name) - files_paths.append(file_path) - # print("Uhm yeah", dir_path, "--", dirnames, "--", file_name) - # print("==>", file_path) - - return files_paths - - def gather_valid_configs(self, paths: list=None): + def gather_valid_configs(self, paths: list = None) -> [Path]: assert paths is not None, "Config paths cannot be None" assert len(paths) > 0, "Must provide at least one config file path" @@ -54,25 +44,42 @@ class Config: not_configs = [] # First gather all files that are potential configs - for path in paths: + for path_str in paths: + + path = Path(path_str) self.info(f"Inspecting path: {path}") - if os.path.isfile(path): - self.debug(f"Path is a file; Adding directly to potential config candidates: {path}") + if not path.exists(): + + self.error(f"Path doesn't exist: {path}") + + if path.is_file(): + + self.debug( + f"Path is a file; Adding directly to potential config candidates: {path}" + ) file_paths.append(path) - elif os.path.isdir(path): - self.debug(f"Path is a dir; Scanning recursively for potential config candidate files: {path}") - for file_path in Config.get_dir_files_recursive(path=path): + elif path.is_dir(): + + self.debug( + f"Path is a dir;" + " Scanning recursively for potential config candidate files: {path}" + ) + + for file_path in Util.get_dir_files_recursive(path=path): self.info(f"> Candidate file: {file_path}") file_paths.append(file_path) else: - raise AssertionError(f"Don't know how to handle path that isn't a file or dir: {path}") + raise AssertionError( + f"Don't know how to handle path that isn't a file or dir: {path}" + ) # Now, filter for files with valid YAML extensions for file_path in file_paths: + if self.check_file_extension(file_path=file_path, extensions=None): configs.append(file_path) else: @@ -94,20 +101,20 @@ class Config: return configs - def check_file_extension(self, file_path, extensions: list=None): + def check_file_extension(self, file_path: Path, extensions: list = None) -> bool: if extensions is None: extensions = self.__valid_extensions - file_name, file_extension = os.path.splitext(file_path) + file_extension = file_path.suffix + + # Strip preceding dot from extension if len(file_extension) > 0 and file_extension[0] == ".": file_extension = file_extension[1:] file_extension = file_extension.lower() - + for valid_extension in extensions: - #print(file_name, "---", file_extension, "---", valid_extension) if file_extension == valid_extension: return True return False -