Compare commits

..

No commits in common. "master" and "v1.3.5" have entirely different histories.

8 changed files with 317 additions and 925 deletions

View File

@ -7,7 +7,7 @@ Suppose you have a third party backup program regularly dropping backup files in
# License # License
Copyright 2024 Mike Peralta; All rights reserved Copyright 2023 Mike Peralta; All rights reserved
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more) Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)

View File

@ -6,75 +6,56 @@ Mike's Backup Rotator
A simple script to help automatically rotate backup files A simple script to help automatically rotate backup files
Copyright 2024 Mike Peralta; All rights reserved Copyright 2023 Mike Peralta; All rights reserved
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more) Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
""" """
from domain.config.Config import Config
from domain.config.ConfigFile import ConfigFile
from domain.Logger import Logger from domain.Logger import Logger
from domain.Util import Util from domain.Config import Config
import datetime import datetime
from pathlib import Path import os
# import pprint
import shutil import shutil
import sys
import time
import yaml
class BackupRotator: class BackupRotator:
def __init__( def __init__(self, debug:bool = False):
self,
config_paths: [Path] = None,
debug: bool = False,
systemd: bool = False,
write_to_syslog: bool = False,
do_test_logs: bool = True,
):
self.__do_test_logs = do_test_logs
self.__logger = Logger( self.__logger = Logger(name=type(self).__name__, debug=debug)
name=type(self).__name__, self.__config_helper = Config(logger=self.__logger)
debug=debug,
systemd=systemd,
write_to_syslog=write_to_syslog,
do_test_logs=do_test_logs,
)
self.__config = Config( self.__dry_run = False
logger=self.__logger, self.__configs = []
config_files_paths=config_paths self.__config_paths = []
)
self.__global_dry_run = True
self.__calculated_actions = [] self.__calculated_actions = []
def run(self, global_dry_run: bool = True): def run(self, configs, dry_run: bool = False):
self.info("Begin rotating") self.info("Begin")
self.__global_dry_run = global_dry_run self.__dry_run = dry_run
if self.__global_dry_run: self.__config_paths = configs
self.info(f"Running as a dry run, globally.")
else: self._consume_configs(self.__config_paths)
self.info(f"Won't run as a global dry run.")
# Rotate once per config # Rotate once per config
config_file_index = -1 for config_index in range(len(self.__configs)):
for config_file in self.__config.config_files:
config_file: ConfigFile #
config_file_index += 1 config = self.__configs[config_index]
self.info( #
f"Rotating for config {config_file_index + 1} of {len(self.__config.config_files)}" self.info(f"Rotating for config {config_index + 1} of {len(self.__configs)} : {config['__path']}")
f" : {config_file.path}" self._do_rotate(config)
f"\n{config_file}"
)
self._do_rotate(config_file)
@staticmethod @staticmethod
def current_time(): def current_time():
@ -85,333 +66,312 @@ class BackupRotator:
def debug(self, s): def debug(self, s):
self.__logger.debug(s) self.__logger.debug(s)
def info(self, s): def info(self, s):
self.__logger.info(s) self.__logger.info(s)
def warn(self, s): def warn(self, s):
self.__logger.warning(s) self.__logger.warn(s)
def error(self, s): def error(self, s):
self.__logger.error(s) self.__logger.error(s)
def _do_rotate(self, config: ConfigFile): def _consume_configs(self, paths: list=None):
self.info( configs = self.__config_helper.gather_valid_configs(paths=paths)
f"Rotating for config: {config.path}" for config in configs:
) self._consume_config(path=config)
if config.dry_run:
self.info(
f"Config {config.path.name} is set for a dry run (no deleting)."
)
else:
self.info(
f"Config {config.path.name} is not set for a dry run (will delete)."
)
self._rotate_paths(config=config) def _consume_config(self, path: str):
def _rotate_paths(self, config: ConfigFile): # Open the file
f = open(path)
if not f:
raise Exception("Unable to open config file: " + path)
paths = config.rotatable_paths # Parse
self.info(f"Begin rotating {len(paths)} paths") config = yaml.safe_load(f)
for path in paths: # Add its own path
config["__path"] = path
path: Path # Consume to internal
self.__configs.append(config)
self.info(f"Consumed config from path: {path}")
self._rotate_path(config=config, path=path) def _do_rotate(self, config):
def _rotate_path(self, config: ConfigFile, path: Path): self._rotate_paths(config)
assert path.is_dir(), ( def _rotate_paths(self, config):
f"Path should be a directory: {path}"
)
self.info( self.info("Begin rotating " + str(len(config["paths"])) + " paths")
f"Rotating path: {path}" for path in config["paths"]:
) self._rotate_path(config, path)
self._rotate_path_for_maximum_items( def _rotate_path(self, config, path):
config=config,
path=path,
)
self._rotate_path_for_maximum_age( assert os.path.isdir(path), "Path should be a directory: {}".format(path)
config=config,
path=path,
)
def _rotate_path_for_maximum_items(self, config: ConfigFile, path: Path): self.info("Rotating path: {}".format(path))
assert path.is_dir(), f"Path should be a directory: {path}" found_any_rotation_keys = False
if "maximum-items" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"])
if "maximum-age" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"])
if config.maximum_items: assert found_any_rotation_keys is True, \
self.info( "Config needs one of the following keys: \"maximum-items\""
f"Rotating path for a maximum of {config.maximum_items} items: {path}"
)
else:
self.info(
f"Not configured to rotate for maximum number of items."
)
return
self.info( def _rotate_path_for_maximum_items(self, config, path: str, max_items: int):
f"Will gather rotation candidates for maximum number of items."
)
candidate_items = self._gather_rotation_candidates(config=config, path=path) assert os.path.isdir(path), "Path should be a directory: {}".format(path)
minimum_items = self._determine_minimum_items(config=config)
self.info("Rotating path for a maximum of {} items: {}".format(
max_items, path
))
children = self._gather_rotation_candidates(config, path)
minimum_items = self._determine_minimum_items(config)
# Do we need to rotate anything out? # Do we need to rotate anything out?
if len(candidate_items) < minimum_items: if len(children) < minimum_items:
self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
self.info( len(children), minimum_items
f"Path only has {len(candidate_items)} items" ))
f", which does not meet the minimum threshold of {minimum_items} items."
" Won't rotate this path."
)
return return
elif len(children) <= max_items:
elif len(candidate_items) <= config.maximum_items: self.info("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format(
self.info( len(children), max_items
f"Path only has {len(candidate_items)} items" ))
f", but needs more than {config.maximum_items} for rotation"
"; Won't rotate this path."
)
return return
self.info("Found {} items to examine".format(len(children)))
self.info(f"Found {len(candidate_items)} items to examine")
# #
maximum_purge_count = len(candidate_items) - minimum_items maximum_purge_count = len(children) - minimum_items
purge_count = len(candidate_items) - config.maximum_items purge_count = len(children) - max_items
self.info( self.info("Want to purge {} items".format(purge_count))
f"Want to purge {purge_count} items to stay under maximum of {config.maximum_items}"
)
if purge_count > maximum_purge_count: if purge_count > maximum_purge_count:
self.info( self.info("Reducing purge count from {} to {} items to respect minimum items setting ({})".format(
f"Reducing purge count from" purge_count, maximum_purge_count, minimum_items
f" {purge_count} to {maximum_purge_count} items" ))
f" to respect minimum items setting ({minimum_items})"
)
purge_count = maximum_purge_count purge_count = maximum_purge_count
items_to_purge = [] children_to_purge = []
for purge_index in range(purge_count): for purge_index in range(purge_count):
# #
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item( item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children)
config=config, items=candidate_items children.remove(item_to_purge)
) self.info("Found next item to purge: ({}) {} ({})".format(
item_to_purge: Path purge_index + 1,
os.path.basename(item_to_purge),
candidate_items.remove(item_to_purge) item_age
))
self.info(
f"Will purge: ({purge_index + 1})"
f" {item_to_purge.name}"
f" ({item_age})"
)
# #
items_to_purge.append(item_to_purge) children_to_purge.append(item_to_purge)
# #
self.info("Removing items") self.info("Removing items")
for item_to_purge in items_to_purge: for child_to_purge in children_to_purge:
child_basename = os.path.basename(child_to_purge)
self._remove_item(config, child_to_purge)
item_to_purge: Path def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int):
self.debug(f"Purging item: {item_to_purge.name}") assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self._remove_item(config=config, path=item_to_purge) self.info("Rotating path for max age of {} days: {}".format(max_age_days, path))
def _rotate_path_for_maximum_age(self, config: ConfigFile, path: Path): children = self._gather_rotation_candidates(config, path)
minimum_items = self._determine_minimum_items(config)
assert path.is_dir(), f"Path should be a directory: {path}"
if config.maximum_age:
self.info(
f"Rotating path for max age of {config.maximum_age} days: {path}"
)
else:
self.info(
f"Not configured to rotate for a maximum number of days."
)
return
self.info(
f"Will gather rotation candidates for maximum age, in days."
)
candidate_items = self._gather_rotation_candidates(config=config, path=path)
minimum_items = self._determine_minimum_items(config=config)
# Do we need to rotate anything out? # Do we need to rotate anything out?
if len(candidate_items) < minimum_items: if len(children) < minimum_items:
self.info( self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
f"Path only has {len(candidate_items)} items" len(children), minimum_items
f", which does not meet the minimum threshold of {minimum_items} items." ))
f" Won't rotate this path."
)
return return
self.info( self.info("Examining {} items for deletion".format(len(children)))
f"Examining {len(candidate_items)} items for deletion" children_to_delete = []
) for child in children:
items_to_delete = []
for item in candidate_items:
age_seconds = Util.detect_item_age_seconds(config=config, item=item) age_seconds = self._detect_item_age_seconds(config, child)
age_days = Util.detect_item_age_days(config=config, item=item) age_days = self._detect_item_age_days(config, child)
age_formatted = Util.seconds_to_time_string(age_seconds) age_formatted = self.seconds_to_time_string(age_seconds)
child_basename = os.path.basename(child)
if age_days > config.maximum_age: if age_days > max_age_days:
self.info( self.info("[Old enough ] {} ({})".format(
f"[Old enough ] {item.name} ({age_formatted})" child_basename, age_formatted
) ))
items_to_delete.append(item) children_to_delete.append(child)
else: else:
self.info( self.info("[Not Old enough] {} ({})".format(
f"[Not Old enough] {item.name} ({age_formatted})" child_basename, age_formatted
) ))
if len(items_to_delete) > 0:
if len(children_to_delete) > 0:
self.info("Removing old items ...") self.info("Removing old items ...")
for child_to_delete in children_to_delete:
for item in items_to_delete: basename = os.path.basename(child_to_delete)
self._remove_item(config, item) self._remove_item(config, child_to_delete)
else: else:
self.info("No old items to remove") self.info("No old items to remove")
def _gather_rotation_candidates(self, config: ConfigFile, path: Path) -> [Path]:
self.debug(f"Begin gathering rotation candidates for: {path}") @staticmethod
def _gather_rotation_candidates(config, path):
candidates: [Path] = [] candidates = []
for item in path.iterdir(): if "target-type" not in config.keys():
raise Exception("Please provide the configuration key: target-type")
self.debug(f"Found an item: {item.name}") for item_name in os.listdir(path):
if config.target_type == "file": item_path = os.path.join(path, item_name)
if not item.is_file(): if config["target-type"] == "file":
self.debug(f"Not a file; Skipping: {item.name}") if not os.path.isfile(item_path):
continue continue
elif config["target-type"] == "directory":
elif config.target_type == "directory": if not os.path.isdir(item_path):
if not item.is_dir():
self.debug(f"Not a directory; Skipping: {item.name}")
continue continue
else: else:
raise Exception( raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"")
f"Unsupported target type: {config.target_type}"
)
candidates.append(item) candidates.append(item_path)
self.__logger.info(f"Returning {len(candidates)} potential candidates to remove.")
return candidates return candidates
def _pick_oldest_item(self, config: ConfigFile, items: [Path]) -> (Path, float, float, str): def _pick_oldest_item(self, config, items):
best_item = None best_item = None
best_ctime = None best_ctime = None
for item in items: for item in items:
try: ctime = self._detect_item_date(config, item)
ctime = Util.detect_item_creation_date(config, item)
except FileNotFoundError as e:
self.__logger.error(f"File disappeared while trying to check ctime: {item}")
continue
if best_ctime is None or ctime < best_ctime: if best_ctime is None or ctime < best_ctime:
best_ctime = ctime best_ctime = ctime
best_item = item best_item = item
age_seconds = Util.detect_item_age_seconds(config, best_item) age_seconds = self._detect_item_age_seconds(config, best_item)
age_string = Util.seconds_to_time_string(age_seconds) age_string = self.seconds_to_time_string(age_seconds)
return best_item, best_ctime, age_seconds, age_string return best_item, best_ctime, age_seconds, age_string
def _remove_item(self, config: ConfigFile, path: Path): @staticmethod
def _detect_item_date(config, item):
if path.is_file(): assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\""
detection = config["date-detection"]
self._remove_file(config=config, file_path=path)
elif path.is_dir():
self._remove_directory(config=config, dir_path=path)
if detection == "file":
ctime = os.path.getctime(item)
else: else:
raise AssertionError( raise AssertionError(f"Invalid value for \"date-detection\"; Should be one of [file]: {detection}")
f"Don't know how to remove this item: {path}"
)
def _remove_file(self, config: ConfigFile, file_path: Path): return ctime
if not file_path.is_file(): def _detect_item_age_seconds(self, config, item):
raise Exception(
f"Tried to remove a file, but this path isn't a file: {file_path}"
)
if self.__global_dry_run: now = time.time()
ctime = self._detect_item_date(config, item)
delta = now - ctime
self.info(f"(Global Dry Run) {file_path}") return delta
elif config.dry_run is True: def _detect_item_age_days(self, config, item):
self.info(f"(Config Dry Run) {file_path}") age_seconds = self._detect_item_age_seconds(config, item)
age_days = int(age_seconds / 86400)
return age_days
def seconds_to_time_string(self, seconds: float):
if isinstance(seconds, float):
pass
elif isinstance(seconds, int):
seconds = float * 1.0
else:
raise AssertionError("Seconds must be an int or float")
# Map
map = {
"year": 31536000.0,
"month": 2592000.0,
"week": 604800.0,
"day": 86400.0,
"hour": 3600.0,
"minute": 60.0,
"second": 1.0
}
s_parts = []
for unit_label in map.keys():
unit_seconds = map[unit_label]
if seconds >= unit_seconds:
unit_count = int(seconds / unit_seconds)
s_parts.append("{} {}{}".format(
unit_count, unit_label,
"" if unit_count == 1 else "s"
))
seconds -= unit_seconds * unit_count
s = ", ".join(s_parts)
return s
def _remove_item(self, config, path):
if os.path.isfile(path):
self._remove_file(config, path)
elif os.path.isdir(path):
self._remove_directory(config, path)
else:
raise AssertionError("Don't know how to remove this item: {}".format(path))
def _remove_file(self, config, file_path):
if not os.path.isfile(file_path):
raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path))
if self.__dry_run:
self.info(f"Won't purge file during global-level dry run: {file_path}")
elif "dry-run" in config.keys() and config["dry-run"] is True:
self.info(f"Won't purge file during config-level dry run: {file_path}")
else: else:
self.info(f"Purging file: {file_path}") self.info(f"Purging file: {file_path}")
file_path.unlink() os.remove(file_path)
def _remove_directory(self, config: ConfigFile, dir_path: Path): def _remove_directory(self, config, dir_path):
if not dir_path.is_dir(): if not os.path.isdir(dir_path):
raise Exception( raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))
f"Tried to remove a directory"
f", but this path isn't a directory: {dir_path}"
)
if self.__global_dry_run:
self.info(f"(Global Dry Run) {dir_path}")
elif config.dry_run:
self.info(f"(Config Dry Run) {dir_path}")
if self.__dry_run:
self.info(f"Won't purge directory during global-level dry run: {dir_path}")
elif "dry-run" in config.keys() and config["dry-run"] is True:
self.info(f"Won't purge directory during config-level dry run: {dir_path}")
else: else:
self.info(f"Purging directory: {dir_path}") self.info(f"Purging directory: {dir_path}")
shutil.rmtree(dir_path) shutil.rmtree(dir_path)
def _determine_minimum_items(self, config) -> int:
def _determine_minimum_items(self, config):
minimum_items = 0 minimum_items = 0
if config.minimum_items is not None: if "minimum-items" in config.keys():
minimum_items = config["minimum-items"]
minimum_items = config.minimum_items self.info("Won't delete anything unless a minimum of {} items were found".format(minimum_items))
self.info(
f"Won't delete anything unless a minimum of {minimum_items} items were found"
)
else: else:
self.info( self.info("No value found for \"minimum-items\"; Will not enforce minimum item constraint.")
"No minimum number of items specified"
"; Will not enforce minimum item constraint."
)
return minimum_items return minimum_items

View File

@ -1,38 +1,48 @@
from domain.Logger import Logger from domain.Logger import Logger
from domain.Util import Util
import os
# import os class Config:
from pathlib import Path
class Scanner:
__DEFAULT_VALID_EXTENSIONS = [ __DEFAULT_VALID_EXTENSIONS = [
"yaml", "yaml",
"yml" "yml"
] ]
def __init__(self, logger: Logger): def __init__(self, logger):
self.__logger = logger self.__logger = logger
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
def debug(self, s): def debug(self, s):
self.__logger.debug(f"[{type(self).__name__}] {s}") self.__logger.debug(f"[{type(self).__name__}] {s}")
def info(self, s): def info(self, s):
self.__logger.info(f"[{type(self).__name__}] {s}") self.__logger.info(f"[{type(self).__name__}] {s}")
def warn(self, s): def warn(self, s):
self.__logger.warning(f"[{type(self).__name__}] {s}") self.__logger.warn(f"[{type(self).__name__}] {s}")
def error(self, s): def error(self, s):
self.__logger.error(f"[{type(self).__name__}] {s}") self.__logger.error(f"[{type(self).__name__}] {s}")
def gather_valid_config_paths(self, paths: list = None) -> [Path]: @staticmethod
def get_dir_files_recursive(path: str):
files_paths = []
for dir_path, dirnames, filenames in os.walk(path):
for file_name in filenames:
file_path = os.path.join(dir_path, file_name)
files_paths.append(file_path)
# print("Uhm yeah", dir_path, "--", dirnames, "--", file_name)
# print("==>", file_path)
return files_paths
def gather_valid_configs(self, paths: list=None):
assert paths is not None, "Config paths cannot be None" assert paths is not None, "Config paths cannot be None"
assert len(paths) > 0, "Must provide at least one config file path" assert len(paths) > 0, "Must provide at least one config file path"
@ -40,81 +50,64 @@ class Scanner:
self.info("Gathering valid configs") self.info("Gathering valid configs")
file_paths = [] file_paths = []
config_paths = [] configs = []
not_config_paths = [] not_configs = []
# First gather all files that are potential configs # First gather all files that are potential configs
for path_str in paths: for path in paths:
path = Path(path_str)
self.info(f"Inspecting path: {path}") self.info(f"Inspecting path: {path}")
if not path.exists(): if os.path.isfile(path):
self.debug(f"Path is a file; Adding directly to potential config candidates: {path}")
self.error(f"Path doesn't exist: {path}")
if path.is_file():
self.debug(
f"Path is a file; Adding directly to potential config candidates: {path}"
)
file_paths.append(path) file_paths.append(path)
elif path.is_dir(): elif os.path.isdir(path):
self.debug(f"Path is a dir; Scanning recursively for potential config candidate files: {path}")
self.debug( for file_path in Config.get_dir_files_recursive(path=path):
f"Path is a dir;"
" Scanning recursively for potential config candidate files: {path}"
)
for file_path in Util.get_dir_files_recursive(path=path):
self.info(f"> Candidate file: {file_path}") self.info(f"> Candidate file: {file_path}")
file_paths.append(file_path) file_paths.append(file_path)
else: else:
raise AssertionError( raise AssertionError(f"Don't know how to handle path that isn't a file or dir: {path}")
f"Don't know how to handle path that isn't a file or dir: {path}"
)
# Now, filter for files with valid YAML extensions # Now, filter for files with valid YAML extensions
for file_path in file_paths: for file_path in file_paths:
if self.check_file_extension(file_path=file_path, extensions=None): if self.check_file_extension(file_path=file_path, extensions=None):
config_paths.append(file_path) configs.append(file_path)
else: else:
not_config_paths.append(file_path) not_configs.append(file_path)
self.info("Filtered out non-config files:") self.info("Filtered out non-config files:")
if len(not_config_paths) > 0: if len(not_configs) > 0:
for not_config_path in not_config_paths: for not_config in not_configs:
self.info(f"> {not_config_path}") self.info(f"> {not_config}")
else: else:
self.info("> [none]") self.info("> [none]")
self.info("Kept config-looking files:") self.info("Kept config-looking files:")
if len(config_paths) > 0: if len(configs) > 0:
for config_path in config_paths: for config in configs:
self.info(f"> {config_path}") self.info(f"> {config}")
else: else:
self.info("> [none]") self.info("> [none]")
return config_paths return configs
def check_file_extension(self, file_path: Path, extensions: list = None) -> bool: def check_file_extension(self, file_path, extensions: list=None):
if extensions is None: if extensions is None:
extensions = self.__valid_extensions extensions = self.__valid_extensions
file_extension = file_path.suffix file_name, file_extension = os.path.splitext(file_path)
# Strip preceding dot from extension
if len(file_extension) > 0 and file_extension[0] == ".": if len(file_extension) > 0 and file_extension[0] == ".":
file_extension = file_extension[1:] file_extension = file_extension[1:]
file_extension = file_extension.lower() file_extension = file_extension.lower()
for valid_extension in extensions: for valid_extension in extensions:
#print(file_name, "---", file_extension, "---", valid_extension)
if file_extension == valid_extension: if file_extension == valid_extension:
return True return True
return False return False

View File

@ -4,45 +4,23 @@ from logging.handlers import SysLogHandler
import sys import sys
class Logger: class Logger:
def __init__( def __init__(self, name: str, debug: bool=False):
self,
name: str,
debug: bool = False,
write_to_syslog: bool = False,
systemd: bool = False,
do_test_logs: bool = True,
):
self.__name = name self.__name = name
self.__debug = debug
self.__write_to_syslog = write_to_syslog
self.__systemd = systemd
self.__do_test_logs = do_test_logs
self._init_logger()
def _init_logger(self):
self.__logger = logging.getLogger(self.__name) self.__logger = logging.getLogger(self.__name)
if self.__debug: if debug:
level = logging.DEBUG level = logging.DEBUG
else: else:
level = logging.INFO level = logging.INFO
self.__logger.setLevel(level) self.__logger.setLevel(level)
formatter = logging.Formatter( formatter = logging.Formatter('[%(name)s][%(levelname)s] %(message)s')
fmt="[{name}][{levelname:<7}] {message}", formatter_full = logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s] %(message)s')
style='{'
)
formatter_full = logging.Formatter(
fmt="[{asctime}][{name}][{levelname:<7}] {message}",
style='{'
)
# Console output / stream handler (STDOUT) # Console output / stream handler (STDOUT)
handler = logging.StreamHandler( handler = logging.StreamHandler(
@ -50,9 +28,7 @@ class Logger:
) )
handler.setLevel(level) handler.setLevel(level)
handler.addFilter(lambda entry: entry.levelno <= logging.INFO) handler.addFilter(lambda entry: entry.levelno <= logging.INFO)
handler.setFormatter( handler.setFormatter(formatter_full)
formatter if self.__systemd else formatter_full
)
self.__logger.addHandler(handler) self.__logger.addHandler(handler)
# Console output / stream handler (STDERR) # Console output / stream handler (STDERR)
@ -60,38 +36,29 @@ class Logger:
stream=sys.stderr stream=sys.stderr
) )
handler.setLevel(logging.WARNING) handler.setLevel(logging.WARNING)
handler.setFormatter( handler.setFormatter(formatter_full)
formatter if self.__systemd else formatter_full
)
self.__logger.addHandler(handler) self.__logger.addHandler(handler)
# Syslog handler # Syslog handler
if self.__write_to_syslog: handler = SysLogHandler(
handler = SysLogHandler( address="/dev/log"
address="/dev/log" )
) handler.setLevel(level)
handler.setLevel(level) handler.setFormatter(formatter)
handler.setFormatter(formatter) self.__logger.addHandler(handler)
self.__logger.addHandler(handler)
# This is annoying inside cron # This is annoying inside cron
if self.__do_test_logs: # self.debug("Test debug log")
self.debug("Test debug log") # self.info("Test info log")
self.info("Test info log") # self.warn("Test warn log")
self.warn("Test warn log") # self.error("Test error log")
self.error("Test error log")
def debug(self, s): def debug(self, s):
self.__logger.debug(s) self.__logger.debug(s)
def info(self, s): def info(self, s):
self.__logger.info(s) self.__logger.info(s)
def warn(self, s): def warn(self, s):
self.__logger.warning(s) self.__logger.warn(s)
def warning(self, s):
self.__logger.warning(s)
def error(self, s): def error(self, s):
self.__logger.error(s) self.__logger.error(s)

View File

@ -1,130 +0,0 @@
from domain.config.ConfigFile import ConfigFile
import datetime
from pathlib import Path
class Util:
def __init__(self):
pass
@staticmethod
def get_dir_files_recursive(path: Path) -> [Path]:
files_paths = []
for dir_path, dirs_names, filenames in path.walk():
for file_name in filenames:
file_path = dir_path / file_name
files_paths.append(file_path)
return files_paths
@staticmethod
def detect_item_creation_date(config: ConfigFile, item: Path) -> datetime.datetime:
stat = None
if config.date_detection == "file":
# Try for the most accurate stat
# First one that raises will just break the block, obv
try:
stat = item.stat().st_ctime
# print("got ctime")
stat = item.stat().st_mtime
# print("got mtime")
stat = item.stat().st_birthtime
# print("got btime")
except FileNotFoundError as e:
raise e
except AttributeError:
pass
else:
raise AssertionError(
f"Unsupported date-detection option: {config.date_detection}"
)
stamp = datetime.datetime.fromtimestamp(
stat
)
# print("Stat:", stat)
# print("Stamp:", stamp)
# print(item.name, "==>", stamp)
return stamp
@staticmethod
def detect_item_age_seconds(config: ConfigFile, item: Path) -> float:
now = datetime.datetime.now()
ctime = Util.detect_item_creation_date(config=config, item=item)
delta = now - ctime
seconds = delta.seconds
# print(item.name, "==>", seconds, f"({ctime})")
# print(">", "Now was:", now)
# print(">", "ctime was:", ctime)
# print(">", "Delta was:", delta)
# print(">", "Seconds was:", delta.total_seconds())
return delta.total_seconds()
@staticmethod
def detect_item_age_days(config: ConfigFile, item: Path) -> int:
age_seconds = Util.detect_item_age_seconds(
config=config, item=item
)
age_days = int(age_seconds / 86400)
return age_days
@staticmethod
def seconds_to_time_string(seconds: float):
if isinstance(seconds, float):
pass
elif isinstance(seconds, int):
seconds = float(seconds)
else:
raise AssertionError("Seconds must be an int or float")
# Map
dt_map = {
"year": 31536000.0,
"month": 2592000.0,
"week": 604800.0,
"day": 86400.0,
"hour": 3600.0,
"minute": 60.0,
"second": 1.0
}
s_parts = []
for unit_label in dt_map.keys():
unit_seconds = dt_map[unit_label]
if seconds >= unit_seconds:
unit_count = int(seconds / unit_seconds)
unit_plural = "" if unit_count == 1 else "s"
s_parts.append(
f"{unit_count} {unit_label}{unit_plural}"
)
seconds -= unit_seconds * unit_count
s = ", ".join(s_parts)
return s

View File

@ -1,43 +0,0 @@
from domain.config.ConfigFile import ConfigFile
from domain.config.Scanner import Scanner
from domain.Logger import Logger
from pathlib import Path
class Config:
def __init__(self, logger: Logger, config_files_paths: [Path]):
self.__logger = logger
self.__config_files_paths: [Path] = config_files_paths
self.__configs = {}
self.__scanner = Scanner(
logger=self.__logger
)
self._consume_configs()
def _consume_configs(self):
config_paths = self.__scanner.gather_valid_config_paths(
paths=self.__config_files_paths
)
for config_path in config_paths:
config = ConfigFile(
logger=self.__logger,
path=config_path
)
self.__configs[config.key] = config
@property
def config_files(self) -> [ConfigFile]:
return self.__configs.values()

View File

@ -1,309 +0,0 @@
from domain.Logger import Logger
from pathlib import Path
import yaml
class ConfigFile:
__VALID_TARGET_TYPES = [
"file",
"directory"
]
__VALID_DATE_DETECTION_TYPES = [
"file"
]
__DEFAULT_MINIMUM_ITEMS = 0
__DEFAULT_MAXIMUM_ITEMS = None
__DEFAULT_MAXIMUM_AGE = None
def __init__(
self, logger: Logger,
path: Path,
):
self.__logger = logger
self.__path = path.absolute()
# noinspection PyTypeChecker
self.__data: dict = None
self.__dry_run: bool = True
# noinspection PyTypeChecker
self.__target_type: str = None
# noinspection PyTypeChecker
self.__date_detection: str = None
self.__rotatable_paths: [Path] = []
self.__minimum_items = self.__DEFAULT_MINIMUM_ITEMS
# noinspection PyTypeChecker
self.__maximum_items: int = self.__DEFAULT_MAXIMUM_ITEMS
# noinspection PyTypeChecker
self.__maximum_age: int = None
self._load()
self._consume()
def __str__(self):
s = ""
s += "*** Config File ***"
s += f"\n> Path: {self.__path}"
s += f"\n> Dry run: " + ("Yes" if self.__dry_run else "No")
s += f"\n> Minimum items: {self.__minimum_items}"
s += f"\n> Maximum items: {self.__maximum_items}"
s += f"\n> Maximum age (in days): {self.__maximum_age}"
s += f"\n> Target type: {self.__target_type}"
s += f"\n> Date detection: {self.__date_detection}"
s += f"\n> Rotatable paths: "
if len(self.__rotatable_paths) > 0:
for p in self.__rotatable_paths:
s += f"\n>> {p}"
else:
s += "\n>> [none]"
return s
def _load(self):
self.info(f"Loading config: {self.__path}")
assert self.__path.is_file(), (
f"Cannot load config file because it isn't a file: {self.__path}"
)
# Open the file
self.debug(f"Opening config file for load: {self.__path}")
f = open(str(self.__path))
if not f:
raise Exception(f"Unable to open config file: {self.__path}")
# Load data
self.__data = yaml.safe_load(f)
assert self.__data is not None, (
f"Config file seems to be null or empty: {self.__path}"
)
# Consume to internal
self.info(f"Loaded config from path: {self.__path}")
def _consume(self):
try:
assert isinstance(self.__data, dict), (
f"Config file should be a dict!"
)
if "options" in self.__data.keys():
self.info(f"Found options setting")
options = self.__data["options"]
assert isinstance(options, dict), "Options must be a dict"
if "dry-run" in options.keys():
dry_run = self.__data["options"]["dry-run"]
self.info(f"Found dry run option: {dry_run}")
assert isinstance(dry_run, bool), "dry-run setting must be boolean"
self.__dry_run = dry_run
else:
self.warning(f"No dry-run option found; Will use default: {self.__dry_run}")
if "minimum-items" in options.keys():
minimum_items = options["minimum-items"]
self.info(f"Found minimum-items option: {minimum_items}")
if minimum_items is None:
minimum_items = self.__DEFAULT_MINIMUM_ITEMS
assert isinstance(minimum_items, int), (
f"Option minimum-items must be an integer,"
f" but got: {type(minimum_items).__name__} ({minimum_items})"
)
self.__minimum_items = minimum_items
else:
self.warning(
f"No minimum-items option found; Will use default: {self.__minimum_items}"
)
assert (
"maximum-items" in options.keys()
or
"maximum-age" in options.keys()
), (
"Options should include either maximum-items or maximum-age"
)
if "maximum-items" in options.keys():
maximum_items = options["maximum-items"]
self.info(f"Found maximum-items option: {maximum_items}")
assert maximum_items is None or isinstance(maximum_items, int), (
f"Option maximum-items must be integer, but got: {maximum_items}"
)
assert maximum_items is None or maximum_items > 0, (
f"Option maximum-items is zero, which doesn't make sense."
)
self.__maximum_items = maximum_items
else:
self.warning(
f"No maximum-items option found; Will use default: {self.__maximum_items}"
)
if "maximum-age" in options.keys():
maximum_age = options["maximum-age"]
self.info(f"Found maximum-age option (max age in days): {maximum_age}")
assert maximum_age is None or isinstance(maximum_age, int), (
f"Option maximum-age must be None or an integer,"
f" but got: {type(maximum_age).__name__} ({maximum_age})"
)
assert maximum_age is None or maximum_age > 0, (
f"Option maximum-age is zero, which doesn't make sense."
)
self.__maximum_age = maximum_age
else:
self.warning(
f"No maximum-age option found; Will use default: {self.__maximum_age}"
)
assert "target-type" in options.keys(), (
f"Option target-type is required"
)
target_type = options["target-type"]
self.info(f"Found target-type option: {target_type}")
assert isinstance(target_type, str), (
f"Option target-type must be str, but got: {target_type}"
)
assert target_type in self.__VALID_TARGET_TYPES, (
f"Option target-type must be one of: {self.__VALID_TARGET_TYPES}"
)
self.__target_type = target_type
if "date-detection" in options.keys():
date_detection = options["date-detection"]
self.info(f"Found date-detection option: {date_detection}")
assert isinstance(date_detection, str), (
f"Option date-detection must be str, but got: {date_detection}"
)
assert date_detection in self.__VALID_DATE_DETECTION_TYPES, (
f"Option date-detection must be one of: {self.__VALID_DATE_DETECTION_TYPES}"
)
self.__date_detection = date_detection
else:
self.error(
f"Option date-detection not found; Will use default: {self.__date_detection}"
)
raise AssertionError(
f"Option date-detection is required."
)
else:
self.error(f"No options key found!")
raise AssertionError(f"No options key found!")
assert "paths" in self.__data, (
f"Could not find 'paths' key"
)
rotatable_paths = self.__data["paths"]
if isinstance(rotatable_paths, str):
rotatable_paths = [rotatable_paths]
assert isinstance(rotatable_paths, list), (
"Rotatable 'paths' key must be a string or list"
)
for i in range(len(rotatable_paths)):
p = rotatable_paths[i]
if isinstance(p, Path):
continue
elif isinstance(p, str):
rotatable_paths[i] = Path(p)
else:
raise AssertionError(
f"All rotatable paths must be strings or pathlib::Path objects"
)
self.__rotatable_paths = rotatable_paths
self.info(f"Found {len(self.__rotatable_paths)} rotatable paths")
except KeyError as e:
self.error(
f"Failed to load config due to KeyError"
f"\nFile: {self.__path}"
f"\nError: {str(e)}"
)
raise e
except AssertionError as e:
self.error(
f"Failed to load config due to AssertionError"
f"\nFile: {self.__path}"
f"\nError: {str(e)}"
)
raise e
def debug(self, s):
self.__logger.debug(f"({self.__path.name}) {s}")
def info(self, s):
self.__logger.info(f"({self.__path.name}) {s}")
def warning(self, s):
self.__logger.warning(f"({self.__path.name}) {s}")
def error(self, s):
self.__logger.error(f"({self.__path.name}) {s}")
@property
def key(self) -> str:
return str(self.__path)
@property
def path(self) -> Path:
return self.__path
@property
def data(self) -> dict:
return self.__data
@property
def dry_run(self) -> bool:
return self.__dry_run
@dry_run.setter
def dry_run(self, b: bool):
self.__dry_run = b
@property
def target_type(self) -> str:
return self.__target_type
@property
def date_detection(self) -> str:
return self.__date_detection
@property
def rotatable_paths(self) -> [Path]:
return self.__rotatable_paths
@property
def minimum_items(self) -> int:
return self.__minimum_items
@property
def maximum_items(self) -> int:
return self.__maximum_items
@property
def maximum_age(self) -> int:
return self.__maximum_age

60
main.py
View File

@ -21,76 +21,30 @@ def main():
help="Verbose/Debug logging mode" help="Verbose/Debug logging mode"
) )
parser.add_argument(
"--systemd",
default=False,
dest="systemd",
action="store_true",
help=(
"Pass if this program will be spawned inside systemd"
" or another system that already adds timestamps to log messages."
)
)
parser.add_argument(
"--syslog", "--write-to-syslog",
default=False,
dest="write_to_syslog",
action="store_true",
help=(
"Pass if you'd like this program to write to syslog."
)
)
parser.add_argument(
"--no-test-logs",
default=False,
dest="do_test_logs",
action="store_false",
help=(
"Pass if you do not want to see test logs for all log levels."
)
)
parser.add_argument(
"--test-logs",
default=True,
dest="do_test_logs",
action="store_true",
help=(
"Pass if you want to see test logs for all log levels."
)
)
parser.add_argument( parser.add_argument(
"--config", "-c", "--config", "-c",
dest="config_paths", dest="config_files",
default=[], default=[],
action="append", action="append",
type=str, type=str,
help="Specify a configuration file or configuration directory. Can be called multiple times." help="Specify a configuration file. Can be called multiple times."
) )
parser.add_argument( parser.add_argument(
"--dry-run", "-d", "--dry-run", "-d",
dest="global_dry_run", dest="dry_run",
default=False, default=False,
action="store_true", action="store_true",
help=( help="Only perform an analysis; Don't delete anything."
"Only perform an analysis;"
" Don't delete anything no matter what configs say (configs can specify dry run, too)."
)
) )
args = parser.parse_args() args = parser.parse_args()
rotator = BackupRotator( rotator = BackupRotator(
config_paths=args.config_paths, debug=args.debug
debug=args.debug,
systemd=args.systemd,
write_to_syslog=args.write_to_syslog,
do_test_logs=args.do_test_logs,
) )
rotator.run( rotator.run(
global_dry_run=args.global_dry_run configs=args.config_files,
dry_run=args.dry_run
) )