From dbbe3b88afbe551e2b2b42c8197babb8348f1735 Mon Sep 17 00:00:00 2001 From: mike Date: Sun, 9 Jun 2024 09:22:48 -0700 Subject: [PATCH] Lots of refactoring, trying to get a proper Config class going --- domain/BackupRotator.py | 261 ++++++++++++++++------------------ domain/config/Config.py | 135 ++++-------------- domain/config/ConfigFile.py | 271 ++++++++++++++++++++++++++++++++++++ domain/config/Scanner.py | 120 ++++++++++++++++ main.py | 15 +- 5 files changed, 550 insertions(+), 252 deletions(-) create mode 100644 domain/config/ConfigFile.py create mode 100644 domain/config/Scanner.py diff --git a/domain/BackupRotator.py b/domain/BackupRotator.py index 0fe945f..b9e58b2 100755 --- a/domain/BackupRotator.py +++ b/domain/BackupRotator.py @@ -13,59 +13,64 @@ Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE fil """ -from domain.Logger import Logger from domain.config.Config import Config +from domain.config.ConfigFile import ConfigFile +from domain.Logger import Logger from domain.Util import Util import datetime from pathlib import Path import shutil -import yaml class BackupRotator: def __init__( self, + config_paths: [Path] = None, debug: bool = False, systemd: bool = False, write_to_syslog: bool = False ): - self.__logger = Logger( name=type(self).__name__, debug=debug, systemd=systemd, write_to_syslog=write_to_syslog, ) - self.__config_helper = Config( - logger=self.__logger + + self.__config = Config( + logger=self.__logger, + config_files_paths=config_paths ) - self.__dry_run = False - self.__configs = [] - self.__config_paths = [] + self.__global_dry_run = True self.__calculated_actions = [] - def run(self, configs, dry_run: bool = False): + def run(self, global_dry_run: bool = True): - self.info("Begin") + self.info("Begin rotating") - self.__dry_run = dry_run - self.__config_paths = configs - - self._consume_configs(self.__config_paths) + self.__global_dry_run = global_dry_run + if self.__global_dry_run: + self.info(f"Running as a dry run, globally.") + else: + self.info(f"Won't run as a global dry run.") # Rotate once per config - for config_index in range(len(self.__configs)): + config_file_index = -1 + for config_file in self.__config.config_files: - # - config = self.__configs[config_index] + config_file: ConfigFile + config_file_index += 1 - # - self.info(f"Rotating for config {config_index + 1} of {len(self.__configs)} : {config['__path']}") - self._do_rotate(config) + self.info( + f"Rotating for config {config_file_index + 1} of {len(self.__config.config_files)}" + f" : {config_file.path}" + f"\n{config_file}" + ) + self._do_rotate(config_file) @staticmethod def current_time(): @@ -86,51 +91,34 @@ class BackupRotator: def error(self, s): self.__logger.error(s) - def _consume_configs(self, paths: [Path] = None): + def _do_rotate(self, config: ConfigFile): - configs = self.__config_helper.gather_valid_configs(paths=paths) + self.info( + f"Rotating for config: {config.path}" + ) + if config.dry_run: + self.info( + f"Config {config.path.name} is set for a dry run (no deleting)." + ) + else: + self.info( + f"Config {config.path.name} is not set for a dry run (will delete)." + ) - for config in configs: + self._rotate_paths(config=config) + + def _rotate_paths(self, config: ConfigFile): + + paths = config.rotatable_paths + self.info(f"Begin rotating {len(paths)} paths") + + for path in paths: - self._consume_config(path=config) - - def _consume_config(self, path: Path): - - self.debug(f"Consuming config: {path}") - assert path.is_file(), ( - f"Cannot consume config file because it isn't a file: {path}" - ) - - # Open the file - self.debug(f"Opening config file for consumption: {path}") - f = open(str(path)) - if not f: - raise Exception(f"Unable to open config file: {path}") - - # Parse - config_raw = yaml.safe_load(f) - assert config_raw is not None, ( - f"Config file seems to be null or empty: {path}" - ) - - # Add its own path - config_raw["__path"] = path - - # Consume to internal - self.__configs.append(config_raw) - self.info(f"Consumed config from path: {path}") + path: Path + + self._rotate_path(config=config, path=path) - def _do_rotate(self, config): - - self._rotate_paths(config) - - def _rotate_paths(self, config): - - self.info("Begin rotating " + str(len(config["paths"])) + " paths") - for path in config["paths"]: - self._rotate_path(config, path) - - def _rotate_path(self, config, path: Path): + def _rotate_path(self, config: ConfigFile, path: Path): assert path.is_dir(), ( f"Path should be a directory: {path}" @@ -141,61 +129,63 @@ class BackupRotator: ) found_any_rotation_keys = False - if "maximum-items" in config.keys(): + if config.maximum_items: found_any_rotation_keys = True self._rotate_path_for_maximum_items( config=config, path=path, - max_items=config["maximum-items"] ) - if "maximum-age" in config.keys(): + if config.maximum_age: found_any_rotation_keys = True self._rotate_path_for_maximum_age( config=config, path=path, - max_age_days=config["maximum-age"] ) assert found_any_rotation_keys is True, ( "Config needs one of the following keys: \"maximum-items\"" ) - def _rotate_path_for_maximum_items(self, config, path: Path, max_items: int): + def _rotate_path_for_maximum_items(self, config: ConfigFile, path: Path): assert path.is_dir(), f"Path should be a directory: {path}" - self.info(f"Rotating path for a maximum of {max_items} items: {path}") + self.info( + f"Rotating path for a maximum of {config.maximum_items} items: {path}" + ) - children = self._gather_rotation_candidates(config, path) + candidate_items = self._gather_rotation_candidates(config=config, path=path) - minimum_items = self._determine_minimum_items(config) + minimum_items = self._determine_minimum_items(config=config) # Do we need to rotate anything out? - if len(children) < minimum_items: + if len(candidate_items) < minimum_items: + self.info( - f"Path only has {len(children)} items" + f"Path only has {len(candidate_items)} items" f", which does not meet the minimum threshold of {minimum_items} items." " Won't rotate this path." ) return - elif len(children) <= max_items: + + elif len(candidate_items) <= config.maximum_items: self.info( - f"Path only has {len(children)} items" - f", but needs more than {max_items} for rotation" + f"Path only has {len(candidate_items)} items" + f", but needs more than {config.maximum_items} for rotation" "; Won't rotate this path." ) return - self.info(f"Found {len(children)} items to examine") + self.info(f"Found {len(candidate_items)} items to examine") # - maximum_purge_count = len(children) - minimum_items - purge_count = len(children) - max_items + maximum_purge_count = len(candidate_items) - minimum_items + purge_count = len(candidate_items) - config.maximum_items self.info(f"Want to purge {purge_count} items") if purge_count > maximum_purge_count: @@ -206,16 +196,16 @@ class BackupRotator: ) purge_count = maximum_purge_count - children_to_purge = [] + items_to_purge = [] for purge_index in range(purge_count): # item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item( - config, children + config=config, items=candidate_items ) item_to_purge: Path - children.remove(item_to_purge) + candidate_items.remove(item_to_purge) self.info( f"Found next item to purge: ({purge_index + 1})" @@ -224,104 +214,101 @@ class BackupRotator: ) # - children_to_purge.append(item_to_purge) - + items_to_purge.append(item_to_purge) + # self.info("Removing items") - for child_to_purge in children_to_purge: + for item_to_purge in items_to_purge: - child_to_purge: Path + item_to_purge: Path - self.debug(f"Purging item: {child_to_purge.name}") + self.debug(f"Purging item: {item_to_purge.name}") - self._remove_item(config, child_to_purge) + self._remove_item(config=config, path=item_to_purge) - def _rotate_path_for_maximum_age(self, config, path: Path, max_age_days: int): + def _rotate_path_for_maximum_age(self, config: ConfigFile, path: Path): assert path.is_dir(), f"Path should be a directory: {path}" self.info( - f"Rotating path for max age of {max_age_days} days: {path}" + f"Rotating path for max age of {config.maximum_age} days: {path}" ) - children = self._gather_rotation_candidates(config, path) - minimum_items = self._determine_minimum_items(config) + candidate_items = self._gather_rotation_candidates(config=config, path=path) + minimum_items = self._determine_minimum_items(config=config) # Do we need to rotate anything out? - if len(children) < minimum_items: + if len(candidate_items) < minimum_items: self.info( - f"Path only has {len(children)} items" + f"Path only has {len(candidate_items)} items" f", which does not meet the minimum threshold of {minimum_items} items." f" Won't rotate this path." ) return self.info( - f"Examining {len(children)} items for deletion" + f"Examining {len(candidate_items)} items for deletion" ) - children_to_delete = [] - for child in children: + items_to_delete = [] + for item in candidate_items: - age_seconds = self._detect_item_age_seconds(config, child) - age_days = self._detect_item_age_days(config, child) + age_seconds = self._detect_item_age_seconds(config=config, item=item) + age_days = self._detect_item_age_days(config=config, item=item) age_formatted = Util.seconds_to_time_string(age_seconds) - if age_days > max_age_days: + if age_days > config.maximum_age: self.info( - f"[Old enough ] {child.name} ({age_formatted})" + f"[Old enough ] {item.name} ({age_formatted})" ) - children_to_delete.append(child) + items_to_delete.append(item) else: self.info( - f"[Not Old enough] {child.name} ({age_formatted})" + f"[Not Old enough] {item.name} ({age_formatted})" ) - if len(children_to_delete) > 0: + if len(items_to_delete) > 0: self.info("Removing old items ...") - for child_to_delete in children_to_delete: - self._remove_item(config, child_to_delete) + for item in items_to_delete: + self._remove_item(config, item) else: self.info("No old items to remove") - def _gather_rotation_candidates(self, config, path: Path): + def _gather_rotation_candidates(self, config: ConfigFile, path: Path) -> [Path]: self.debug(f"Begin gathering rotation candidates for: {path}") candidates: [Path] = [] - if "target-type" not in config.keys(): - raise Exception("Please provide the configuration key: target-type") - for item_name in path.iterdir(): item_path = path / item_name self.debug(f"Found an item: {item_name} ==> {item_path}") - if config["target-type"] == "file": + if config.target_type == "file": if not item_path.is_file(): self.debug(f"Not a file; Skipping: {item_name}") continue - elif config["target-type"] == "directory": + elif config.target_type == "directory": if not item_path.is_dir(): self.debug(f"Not a directory; Skipping: {item_name}") continue - + else: raise Exception( - "Configuration key \"target-type\" must be \"file\" or \"directory\"" + f"Unsupported target type: {config.target_type}" ) candidates.append(item_path) return candidates - def _pick_oldest_item(self, config, items) -> (Path, float, float, str): + def _pick_oldest_item(self, config: ConfigFile, items: [Path]) -> (Path, float, float, str): best_item = None best_ctime = None @@ -338,69 +325,65 @@ class BackupRotator: return best_item, best_ctime, age_seconds, age_string @staticmethod - def _detect_item_date(config, item: Path) -> datetime.datetime: + def _detect_item_date(config: ConfigFile, item: Path) -> datetime.datetime: - assert "date-detection" in config.keys(), ( - "Please provide config key: \"date-detection\"" - ) - detection = config["date-detection"] - - if detection == "file": + if config.date_detection == "file": ctime = datetime.datetime.fromtimestamp( item.stat().st_ctime, tz=datetime.timezone.utc ) else: raise AssertionError( - f"Invalid value for \"date-detection\"" - "; Should be one of [file]: {detection}" + f"Unsupported date-detection option: {config.date_detection}" ) return ctime - def _detect_item_age_seconds(self, config, item: Path) -> float: + def _detect_item_age_seconds(self, config: ConfigFile, item: Path) -> float: now = datetime.datetime.now() - ctime = self._detect_item_date(config, item) + ctime = self._detect_item_date(config=config, item=item) delta = now - ctime.now() return delta.seconds - def _detect_item_age_days(self, config, item: Path) -> int: + def _detect_item_age_days(self, config: ConfigFile, item: Path) -> int: - age_seconds = self._detect_item_age_seconds(config, item) + age_seconds = self._detect_item_age_seconds( + config=config, item=item + ) age_days = int(age_seconds / 86400) - + return age_days - def _remove_item(self, config, path: Path): + def _remove_item(self, config: ConfigFile, path: Path): if path.is_file(): - self._remove_file(config, path) + self._remove_file(config=config, file_path=path) elif path.is_dir(): - self._remove_directory(config, path) + self._remove_directory(config=config, dir_path=path) else: raise AssertionError( f"Don't know how to remove this item: {path}" ) - def _remove_file(self, config, file_path: Path): + def _remove_file(self, config: ConfigFile, file_path: Path): if not file_path.is_file(): raise Exception( f"Tried to remove a file, but this path isn't a file: {file_path}" ) - if self.__dry_run: + if self.__global_dry_run: self.info(f"Won't purge file during global-level dry run: {file_path}") - elif "dry-run" in config.keys() and config["dry-run"] is True: + elif config.dry_run is True: self.info(f"Won't purge file during config-level dry run: {file_path}") @@ -408,7 +391,7 @@ class BackupRotator: self.info(f"Purging file: {file_path}") file_path.unlink() - def _remove_directory(self, config, dir_path: Path): + def _remove_directory(self, config: ConfigFile, dir_path: Path): if not dir_path.is_dir(): raise Exception( @@ -416,11 +399,11 @@ class BackupRotator: f", but this path isn't a directory: {dir_path}" ) - if self.__dry_run: + if self.__global_dry_run: self.info(f"Won't purge directory during global-level dry run: {dir_path}") - elif "dry-run" in config.keys() and config["dry-run"] is True: + elif config.dry_run: self.info(f"Won't purge directory during config-level dry run: {dir_path}") @@ -433,9 +416,9 @@ class BackupRotator: minimum_items = 0 - if "minimum-items" in config.keys(): + if config.minimum_items is not None: - minimum_items = config["minimum-items"] + minimum_items = config.minimum_items self.info( f"Won't delete anything unless a minimum of {minimum_items} items were found" @@ -443,7 +426,7 @@ class BackupRotator: else: self.info( - "No value found for \"minimum-items\"" + "No minimum number of items specified" "; Will not enforce minimum item constraint." ) diff --git a/domain/config/Config.py b/domain/config/Config.py index 6a0a07f..10c5a44 100644 --- a/domain/config/Config.py +++ b/domain/config/Config.py @@ -1,120 +1,41 @@ +from domain.config.ConfigFile import ConfigFile +from domain.config.Scanner import Scanner from domain.Logger import Logger -from domain.Util import Util -# import os from pathlib import Path class Config: - __DEFAULT_VALID_EXTENSIONS = [ - "yaml", - "yml" - ] - - def __init__(self, logger: Logger): - + def __init__(self, logger: Logger, config_files_paths: [Path]): + self.__logger = logger - self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS + + self.__config_files_paths: [Path] = config_files_paths + self.__configs: {} = None + + self.__scanner = Scanner( + logger=self.__logger + ) + + self._consume_configs() - def debug(self, s): - self.__logger.debug(f"[{type(self).__name__}] {s}") + def _consume_configs(self, paths: [Path] = None): + + config_paths = self.__scanner.gather_valid_config_paths(paths=paths) + + for config_path in config_paths: + + config = ConfigFile( + logger=self.__logger, + path=config_path + ) + + self.__configs[config.key] = config - def info(self, s): - self.__logger.info(f"[{type(self).__name__}] {s}") - - def warn(self, s): - self.__logger.warning(f"[{type(self).__name__}] {s}") - - def error(self, s): - self.__logger.error(f"[{type(self).__name__}] {s}") - - def gather_valid_config_paths(self, paths: list = None) -> [Path]: - - assert paths is not None, "Config paths cannot be None" - assert len(paths) > 0, "Must provide at least one config file path" - - self.info("Gathering valid configs") - - file_paths = [] - config_paths = [] - not_config_paths = [] - - # First gather all files that are potential configs - for path_str in paths: - - path = Path(path_str) - - self.info(f"Inspecting path: {path}") - - if not path.exists(): - - self.error(f"Path doesn't exist: {path}") - - if path.is_file(): - - self.debug( - f"Path is a file; Adding directly to potential config candidates: {path}" - ) - file_paths.append(path) - - elif path.is_dir(): - - self.debug( - f"Path is a dir;" - " Scanning recursively for potential config candidate files: {path}" - ) - - for file_path in Util.get_dir_files_recursive(path=path): - self.info(f"> Candidate file: {file_path}") - file_paths.append(file_path) - - else: - raise AssertionError( - f"Don't know how to handle path that isn't a file or dir: {path}" - ) - - # Now, filter for files with valid YAML extensions - for file_path in file_paths: - - if self.check_file_extension(file_path=file_path, extensions=None): - config_paths.append(file_path) - else: - not_config_paths.append(file_path) - - self.info("Filtered out non-config files:") - if len(not_config_paths) > 0: - for not_config_path in not_config_paths: - self.info(f"> {not_config_path}") - else: - self.info("> [none]") - - self.info("Kept config-looking files:") - if len(config_paths) > 0: - for config_path in config_paths: - self.info(f"> {config_path}") - else: - self.info("> [none]") - - return config_paths - - def check_file_extension(self, file_path: Path, extensions: list = None) -> bool: - - if extensions is None: - extensions = self.__valid_extensions - - file_extension = file_path.suffix - - # Strip preceding dot from extension - if len(file_extension) > 0 and file_extension[0] == ".": - file_extension = file_extension[1:] - file_extension = file_extension.lower() - - for valid_extension in extensions: - if file_extension == valid_extension: - return True - - return False + @property + def config_files(self) -> [ConfigFile]: + return self.__configs.values() diff --git a/domain/config/ConfigFile.py b/domain/config/ConfigFile.py new file mode 100644 index 0000000..b5d069f --- /dev/null +++ b/domain/config/ConfigFile.py @@ -0,0 +1,271 @@ + + +from domain.Logger import Logger + + +from pathlib import Path +import yaml + + +class ConfigFile: + + __VALID_TARGET_TYPES = [ + "file", + "directory" + ] + + __VALID_DATE_DETECTION_TYPES = [ + "file" + ] + + def __init__( + self, logger: Logger, + path: Path, + ): + + self.__logger = logger + self.__path = path.absolute() + + # noinspection PyTypeChecker + self.__data: dict = None + + self.__dry_run: bool = True + + # noinspection PyTypeChecker + self.__target_type: str = None + + # noinspection PyTypeChecker + self.__date_detection: str = None + + self.__rotatable_paths: [Path] = [] + + self.__minimum_items: int = 0 + # noinspection PyTypeChecker + self.__maximum_items: int = None + # noinspection PyTypeChecker + self.__maximum_age: int = None + + self._load() + self._consume() + + def __str__(self): + + s = "" + + s += "*** Config File ***" + s += f"> Path: {self.__path}" + s += f"> Dry run: " + ("Yes" if self.__dry_run else "No") + s += f"> Minimum items: {self.__minimum_items}" + s += f"> Maximum items: {self.__maximum_items}" + s += f"> Maximum age (in days): {self.__maximum_age}" + s += f"> Target type: {self.__target_type}" + s += f"> Date detection: {self.__date_detection}" + s += f"> Rotatable paths: " + if len(self.__rotatable_paths) > 0: + for p in self.__rotatable_paths: + s += f">> {p}" + else: + s += ">> [none]" + + return s + + def _load(self): + + self.info(f"Loading config: {self.__path}") + + assert self.__path.is_file(), ( + f"Cannot load config file because it isn't a file: {self.__path}" + ) + + # Open the file + self.debug(f"Opening config file for load: {self.__path}") + f = open(str(self.__path)) + if not f: + raise Exception(f"Unable to open config file: {self.__path}") + + # Load data + self.__data = yaml.safe_load(f) + assert self.__data is not None, ( + f"Config file seems to be null or empty: {self.__path}" + ) + + # Consume to internal + self.info(f"Loaded config from path: {self.__path}") + + def _consume(self): + + try: + + if "options" in self.__data.keys(): + + self.info(f"Found options setting") + options = self.__data["options"] + assert isinstance(options, dict), "Options must be a dict" + + if "dry-run" in options.keys(): + + dry_run = self.__data["options"]["dry-run"] + self.info(f"Found dry run option: {dry_run}") + assert isinstance(dry_run, bool), "dry-run setting must be boolean" + self.__dry_run = dry_run + else: + self.warning(f"No dry-run option found; Will use default: {self.__dry_run}") + + if "minimum-items" in options.keys(): + + minimum_items = options["minimum-items"] + self.info(f"Found minimum-items option: {minimum_items}") + assert isinstance(minimum_items, int), ( + f"Option minimum-items must be int, but got: {minimum_items}" + ) + self.__minimum_items = minimum_items + else: + self.warning( + f"No minimum-items option found; Will use default: {self.__minimum_items}" + ) + + if "maximum-items" in options.keys(): + + maximum_items = options["maximum-items"] + self.info(f"Found maximum-items option: {maximum_items}") + assert isinstance(maximum_items, int), ( + f"Option maximum-items must be int, but got: {maximum_items}" + ) + assert maximum_items > 0, ( + f"Option maximum-items is zero, which doesn't make sense." + ) + self.__maximum_items = maximum_items + else: + self.warning( + f"No maximum-items option found; Will use default: {self.__maximum_items}" + ) + + if "maximum-age" in options.keys(): + + maximum_age = options["maximum-age"] + self.info(f"Found maximum-age option (max age in days): {maximum_age}") + assert isinstance(maximum_age, int), ( + f"Option maximum-age must be int, but got: {maximum_age}" + ) + assert maximum_age > 0, ( + f"Option maximum-age is zero, which doesn't make sense." + ) + self.__maximum_age = maximum_age + else: + self.warning( + f"No maximum-age option found; Will use default: {self.__maximum_age}" + ) + + assert "target-type" in options.keys(), ( + f"Option target-type is required" + ) + target_type = options["target-type"] + self.info(f"Found target-type option: {target_type}") + assert isinstance(target_type, str), ( + f"Option target-type must be str, but got: {target_type}" + ) + assert target_type in self.__VALID_TARGET_TYPES, ( + f"Option target-type must be one of: {self.__VALID_TARGET_TYPES}" + ) + self.__target_type = target_type + + if "date-detection" in options.keys(): + date_detection = options["date-detection"] + self.info(f"Found date-detection option: {date_detection}") + assert isinstance(date_detection, str), ( + f"Option date-detection must be str, but got: {date_detection}" + ) + assert date_detection in self.__VALID_DATE_DETECTION_TYPES, ( + f"Option date-detection must be one of: {self.__VALID_DATE_DETECTION_TYPES}" + ) + self.__date_detection = date_detection + else: + self.warning( + f"Option date-detection not found; Will use default: {self.__date_detection}" + ) + + assert "paths" in self.__data, ( + f"Could not find 'paths' key" + ) + rotatable_paths = self.__data["paths"] + if isinstance(rotatable_paths, str): + rotatable_paths = [rotatable_paths] + assert isinstance(rotatable_paths, list), ( + "Rotatable 'paths' key must be a string or list" + ) + self.__rotatable_paths = rotatable_paths + self.info(f"Found {len(self.__rotatable_paths)} rotatable paths") + + except KeyError as e: + + self.error( + f"Failed to load config due to KeyError" + f"\nFile: {self.__path}" + f"\nError: {str(e)}" + ) + raise e + + except AssertionError as e: + + self.error( + f"Failed to load config due to AssertionError" + f"\nFile: {self.__path}" + f"\nError: {str(e)}" + ) + raise e + + def debug(self, s): + self.__logger.debug(f"({self.__path.name}) {s}") + + def info(self, s): + self.__logger.info(f"({self.__path.name}) {s}") + + def warning(self, s): + self.__logger.warning(f"({self.__path.name}) {s}") + + def error(self, s): + self.__logger.error(f"({self.__path.name}) {s}") + + @property + def key(self) -> str: + return str(self.__path) + + @property + def path(self) -> Path: + return self.__path + + @property + def data(self) -> dict: + return self.__data + + @property + def dry_run(self) -> bool: + return self.__dry_run + + @dry_run.setter + def dry_run(self, b: bool): + self.__dry_run = b + + @property + def target_type(self) -> str: + return self.__target_type + + @property + def date_detection(self) -> str: + return self.__date_detection + + @property + def rotatable_paths(self) -> [Path]: + return self.__rotatable_paths + + @property + def minimum_items(self) -> int: + return self.__minimum_items + + @property + def maximum_items(self) -> int: + return self.__maximum_items + + @property + def maximum_age(self) -> int: + return self.__maximum_age diff --git a/domain/config/Scanner.py b/domain/config/Scanner.py new file mode 100644 index 0000000..7975895 --- /dev/null +++ b/domain/config/Scanner.py @@ -0,0 +1,120 @@ + + +from domain.Logger import Logger +from domain.Util import Util + + +# import os +from pathlib import Path + + +class Scanner: + + __DEFAULT_VALID_EXTENSIONS = [ + "yaml", + "yml" + ] + + def __init__(self, logger: Logger): + + self.__logger = logger + self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS + + def debug(self, s): + self.__logger.debug(f"[{type(self).__name__}] {s}") + + def info(self, s): + self.__logger.info(f"[{type(self).__name__}] {s}") + + def warn(self, s): + self.__logger.warning(f"[{type(self).__name__}] {s}") + + def error(self, s): + self.__logger.error(f"[{type(self).__name__}] {s}") + + def gather_valid_config_paths(self, paths: list = None) -> [Path]: + + assert paths is not None, "Config paths cannot be None" + assert len(paths) > 0, "Must provide at least one config file path" + + self.info("Gathering valid configs") + + file_paths = [] + config_paths = [] + not_config_paths = [] + + # First gather all files that are potential configs + for path_str in paths: + + path = Path(path_str) + + self.info(f"Inspecting path: {path}") + + if not path.exists(): + + self.error(f"Path doesn't exist: {path}") + + if path.is_file(): + + self.debug( + f"Path is a file; Adding directly to potential config candidates: {path}" + ) + file_paths.append(path) + + elif path.is_dir(): + + self.debug( + f"Path is a dir;" + " Scanning recursively for potential config candidate files: {path}" + ) + + for file_path in Util.get_dir_files_recursive(path=path): + self.info(f"> Candidate file: {file_path}") + file_paths.append(file_path) + + else: + raise AssertionError( + f"Don't know how to handle path that isn't a file or dir: {path}" + ) + + # Now, filter for files with valid YAML extensions + for file_path in file_paths: + + if self.check_file_extension(file_path=file_path, extensions=None): + config_paths.append(file_path) + else: + not_config_paths.append(file_path) + + self.info("Filtered out non-config files:") + if len(not_config_paths) > 0: + for not_config_path in not_config_paths: + self.info(f"> {not_config_path}") + else: + self.info("> [none]") + + self.info("Kept config-looking files:") + if len(config_paths) > 0: + for config_path in config_paths: + self.info(f"> {config_path}") + else: + self.info("> [none]") + + return config_paths + + def check_file_extension(self, file_path: Path, extensions: list = None) -> bool: + + if extensions is None: + extensions = self.__valid_extensions + + file_extension = file_path.suffix + + # Strip preceding dot from extension + if len(file_extension) > 0 and file_extension[0] == ".": + file_extension = file_extension[1:] + file_extension = file_extension.lower() + + for valid_extension in extensions: + if file_extension == valid_extension: + return True + + return False diff --git a/main.py b/main.py index 5166ebc..4f9c723 100755 --- a/main.py +++ b/main.py @@ -44,30 +44,33 @@ def main(): parser.add_argument( "--config", "-c", - dest="config_files", + dest="config_paths", default=[], action="append", type=str, - help="Specify a configuration file. Can be called multiple times." + help="Specify a configuration file or configuration directory. Can be called multiple times." ) parser.add_argument( "--dry-run", "-d", - dest="dry_run", + dest="global_dry_run", default=False, action="store_true", - help="Only perform an analysis; Don't delete anything." + help=( + "Only perform an analysis;" + " Don't delete anything no matter what configs say (configs can specify dry run, too)." + ) ) args = parser.parse_args() rotator = BackupRotator( + config_paths=args.config_paths, debug=args.debug, systemd=args.systemd, write_to_syslog=args.write_to_syslog, ) rotator.run( - configs=args.config_files, - dry_run=args.dry_run + global_dry_run=args.global_dry_run )