Adding dedicated Logger and Config classes

This commit is contained in:
root 2023-03-27 19:40:52 -07:00
parent 4f0b29cd3d
commit 9d17178012
3 changed files with 176 additions and 84 deletions

View File

@ -12,6 +12,11 @@ Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE fil
""" """
from domain.Logger import Logger
from domain.Config import Config
import datetime import datetime
import os import os
# import pprint # import pprint
@ -24,23 +29,18 @@ import yaml
class BackupRotator: class BackupRotator:
__DEFAULT_VALID_EXTENSIONS = [
"yaml",
"yml"
]
def __init__(self): def __init__(self):
self.__logger = Logger(type(self).__name__)
self.__dry_run = False self.__dry_run = False
self.__configs = [] self.__configs = []
self.__config_paths = [] self.__config_paths = []
self.__calculated_actions = [] self.__calculated_actions = []
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
def run(self, configs, dry_run: bool = False): def run(self, configs, dry_run: bool = False):
self.log("Begin") self.info("Begin")
self.__dry_run = dry_run self.__dry_run = dry_run
self.__config_paths = configs self.__config_paths = configs
@ -54,7 +54,7 @@ class BackupRotator:
config = self.__configs[config_index] config = self.__configs[config_index]
# #
self.log("Rotating for config " + str(config_index + 1) + " of " + str(len(self.__configs)), config["__path"]) self.info("Rotating for config " + str(config_index + 1) + " of " + str(len(self.__configs)), config["__path"])
self._do_rotate(config) self._do_rotate(config)
@staticmethod @staticmethod
@ -64,42 +64,22 @@ class BackupRotator:
now_s = now.strftime("%b-%d-%Y %I:%M%p") now_s = now.strftime("%b-%d-%Y %I:%M%p")
return str(now_s) return str(now_s)
def log(self, s, o=None): def info(self, s):
self.__logger.info(s)
now = self.current_time() def warn(self, s):
self.__logger.warn(s)
to_log = "[" + now + "][Backup Rotator] " + str(s) def error(self, s):
if o is not None: self.__logger.error(s)
to_log += " " + str(o)
syslog.syslog(to_log)
print(to_log)
def _consume_configs(self, paths: list=None): def _consume_configs(self, paths: list=None):
assert paths is not None, "Config paths cannot be None" configs = Config().gather_valid_configs(paths=paths)
assert len(paths) > 0, "Must provide at least one config file path" print("Configs:")
print(configs)
return
for config in configs:
self._consume_config(path=config)
# Use each config path
for path in paths:
# If this is a single file
if os.path.isfile(path):
self._consume_config(path)
# If this is a directory
elif os.path.isdir(path):
# Iterate over each file inside
for file_name in os.listdir(path):
one_file = os.path.join(path, file_name)
if os.path.isfile(one_file) and self._check_file_extension(file_path=one_file, extensions=None):
self._consume_config(one_file)
def _consume_config(self, path: str): def _consume_config(self, path: str):
# Open the file # Open the file
@ -115,7 +95,7 @@ class BackupRotator:
# Consume to internal # Consume to internal
self.__configs.append(config) self.__configs.append(config)
self.log("Consumed config from path:", path) self.info("Consumed config from path:", path)
def _do_rotate(self, config): def _do_rotate(self, config):
@ -123,7 +103,7 @@ class BackupRotator:
def _rotate_paths(self, config): def _rotate_paths(self, config):
self.log("Begin rotating " + str(len(config["paths"])) + " paths") self.info("Begin rotating " + str(len(config["paths"])) + " paths")
for path in config["paths"]: for path in config["paths"]:
self._rotate_path(config, path) self._rotate_path(config, path)
@ -131,7 +111,7 @@ class BackupRotator:
assert os.path.isdir(path), "Path should be a directory: {}".format(path) assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path: {}".format(path)) self.info("Rotating path: {}".format(path))
found_any_rotation_keys = False found_any_rotation_keys = False
if "maximum-items" in config.keys(): if "maximum-items" in config.keys():
@ -148,7 +128,7 @@ class BackupRotator:
assert os.path.isdir(path), "Path should be a directory: {}".format(path) assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path for a maximum of {} items: {}".format( self.info("Rotating path for a maximum of {} items: {}".format(
max_items, path max_items, path
)) ))
@ -158,24 +138,24 @@ class BackupRotator:
# Do we need to rotate anything out? # Do we need to rotate anything out?
if len(children) < minimum_items: if len(children) < minimum_items:
self.log("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format( self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
len(children), minimum_items len(children), minimum_items
)) ))
return return
elif len(children) <= max_items: elif len(children) <= max_items:
self.log("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format( self.info("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format(
len(children), max_items len(children), max_items
)) ))
return return
self.log("Found {} items to examine".format(len(children))) self.info("Found {} items to examine".format(len(children)))
# #
maximum_purge_count = len(children) - minimum_items maximum_purge_count = len(children) - minimum_items
purge_count = len(children) - max_items purge_count = len(children) - max_items
self.log("Want to purge {} items".format(purge_count)) self.info("Want to purge {} items".format(purge_count))
if purge_count > maximum_purge_count: if purge_count > maximum_purge_count:
self.log("Reducing purge count from {} to {} items to respect minimum items setting ({})".format( self.info("Reducing purge count from {} to {} items to respect minimum items setting ({})".format(
purge_count, maximum_purge_count, minimum_items purge_count, maximum_purge_count, minimum_items
)) ))
purge_count = maximum_purge_count purge_count = maximum_purge_count
@ -186,7 +166,7 @@ class BackupRotator:
# #
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children) item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children)
children.remove(item_to_purge) children.remove(item_to_purge)
self.log("Found next item to purge: ({}) {} ({})".format( self.info("Found next item to purge: ({}) {} ({})".format(
purge_index + 1, purge_index + 1,
os.path.basename(item_to_purge), os.path.basename(item_to_purge),
item_age item_age
@ -196,7 +176,7 @@ class BackupRotator:
children_to_purge.append(item_to_purge) children_to_purge.append(item_to_purge)
# #
self.log("Removing items") self.info("Removing items")
for child_to_purge in children_to_purge: for child_to_purge in children_to_purge:
child_basename = os.path.basename(child_to_purge) child_basename = os.path.basename(child_to_purge)
self._remove_item(config, child_to_purge) self._remove_item(config, child_to_purge)
@ -205,19 +185,19 @@ class BackupRotator:
assert os.path.isdir(path), "Path should be a directory: {}".format(path) assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path for max age of {} days: {}".format(max_age_days, path)) self.info("Rotating path for max age of {} days: {}".format(max_age_days, path))
children = self._gather_rotation_candidates(config, path) children = self._gather_rotation_candidates(config, path)
minimum_items = self._determine_minimum_items(config) minimum_items = self._determine_minimum_items(config)
# Do we need to rotate anything out? # Do we need to rotate anything out?
if len(children) < minimum_items: if len(children) < minimum_items:
self.log("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format( self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
len(children), minimum_items len(children), minimum_items
)) ))
return return
self.log("Examining {} items for deletion".format(len(children))) self.info("Examining {} items for deletion".format(len(children)))
children_to_delete = [] children_to_delete = []
for child in children: for child in children:
@ -227,22 +207,22 @@ class BackupRotator:
child_basename = os.path.basename(child) child_basename = os.path.basename(child)
if age_days > max_age_days: if age_days > max_age_days:
self.log("[Old enough ] {} ({})".format( self.info("[Old enough ] {} ({})".format(
child_basename, age_formatted child_basename, age_formatted
)) ))
children_to_delete.append(child) children_to_delete.append(child)
else: else:
self.log("[Not Old enough] {} ({})".format( self.info("[Not Old enough] {} ({})".format(
child_basename, age_formatted child_basename, age_formatted
)) ))
if len(children_to_delete) > 0: if len(children_to_delete) > 0:
self.log("Removing old items ...") self.info("Removing old items ...")
for child_to_delete in children_to_delete: for child_to_delete in children_to_delete:
basename = os.path.basename(child_to_delete) basename = os.path.basename(child_to_delete)
self._remove_item(config, child_to_delete) self._remove_item(config, child_to_delete)
else: else:
self.log("No old items to remove") self.info("No old items to remove")
@staticmethod @staticmethod
@ -366,11 +346,11 @@ class BackupRotator:
raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path)) raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path))
if self.__dry_run: if self.__dry_run:
self.log("Won't purge file during global-level dry run: ", file_path) self.info("Won't purge file during global-level dry run: ", file_path)
elif "dry-run" in config.keys() and config["dry-run"] is True: elif "dry-run" in config.keys() and config["dry-run"] is True:
self.log("Won't purge file during config-level dry run: ", file_path) self.info("Won't purge file during config-level dry run: ", file_path)
else: else:
self.log("Purging file:", file_path) self.info("Purging file:", file_path)
os.remove(file_path) os.remove(file_path)
def _remove_directory(self, config, dir_path): def _remove_directory(self, config, dir_path):
@ -379,11 +359,11 @@ class BackupRotator:
raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path)) raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))
if self.__dry_run: if self.__dry_run:
self.log("Won't purge directory during global-level dry run: ", dir_path) self.info("Won't purge directory during global-level dry run: ", dir_path)
elif "dry-run" in config.keys() and config["dry-run"] is True: elif "dry-run" in config.keys() and config["dry-run"] is True:
self.log("Won't purge directory during config-level dry run: ", dir_path) self.info("Won't purge directory during config-level dry run: ", dir_path)
else: else:
self.log("Purging directory:", dir_path) self.info("Purging directory:", dir_path)
shutil.rmtree(dir_path) shutil.rmtree(dir_path)
@ -393,25 +373,8 @@ class BackupRotator:
if "minimum-items" in config.keys(): if "minimum-items" in config.keys():
minimum_items = config["minimum-items"] minimum_items = config["minimum-items"]
self.log("Won't delete anything unless a minimum of {} items were found".format(minimum_items)) self.info("Won't delete anything unless a minimum of {} items were found".format(minimum_items))
else: else:
self.log("No value found for \"minimum-items\"; Will not enforce minimum item constraint.") self.info("No value found for \"minimum-items\"; Will not enforce minimum item constraint.")
return minimum_items return minimum_items
def _check_file_extension(self, file_path, extensions: list=None):
if extensions is None:
extensions = self.__valid_extensions
file_name, file_extension = os.path.splitext(file_path)
if len(file_extension) > 0 and file_extension[0] == ".":
file_extension = file_extension[1:]
file_extension = file_extension.lower()
for valid_extension in extensions:
#print(file_name, "---", file_extension, "---", valid_extension)
if file_extension == valid_extension:
return True
return False

111
domain/Config.py Normal file
View File

@ -0,0 +1,111 @@
from domain.Logger import Logger
import os
class Config:
__DEFAULT_VALID_EXTENSIONS = [
"yaml",
"yml"
]
def __init__(self):
self.__logger = Logger(type(self).__name__)
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
def info(self, s):
self.__logger.info(s)
def warn(self, s):
self.__logger.warn(s)
def error(self, s):
self.__logger.error(s)
@staticmethod
def get_dir_files_recursive(path: str):
files_paths = []
for dir_path, dirnames, filenames in os.walk(path):
for file_name in filenames:
file_path = os.path.join(dir_path, file_name)
files_paths.append(file_path)
# print("Uhm yeah", dir_path, "--", dirnames, "--", file_name)
# print("==>", file_path)
return files_paths
def gather_valid_configs(self, paths: list=None):
assert paths is not None, "Config paths cannot be None"
assert len(paths) > 0, "Must provide at least one config file path"
self.__logger.info("Gathering valid configs")
file_paths = []
configs = []
not_configs = []
# First gather all files that are potential configs
for path in paths:
self.__logger.info(f"Inspecting path: {path}")
if os.path.isfile(path):
self.__logger.info(f"Path is a file; Adding directly to potential config candidates: {path}")
file_paths.append(path)
elif os.path.isdir(path):
self.__logger.info(f"Path is a dir; Scanning recursively for potential config candidate files: {path}")
for file_path in Config.get_dir_files_recursive(path=path):
self.__logger.info(f"> Candidate file: {file_path}")
file_paths.append(file_path)
else:
raise AssertionError(f"Don't know how to handle path that isn't a file or dir: {path}")
# Now, filter for files with valid YAML extensions
for file_path in file_paths:
if self.check_file_extension(file_path=file_path, extensions=None):
configs.append(file_path)
else:
not_configs.append(file_path)
self.__logger.info("Filtered out non-config files:")
if len(not_configs) > 0:
for not_config in not_configs:
self.__logger.info(f"> {not_config}")
else:
self.__logger.info("> [none]")
self.__logger.info("Kept config-looking files:")
if len(configs) > 0:
for config in configs:
self.__logger.info(f"> {config}")
else:
self.__logger.info("> [none]")
return configs
def check_file_extension(self, file_path, extensions: list=None):
if extensions is None:
extensions = self.__valid_extensions
file_name, file_extension = os.path.splitext(file_path)
if len(file_extension) > 0 and file_extension[0] == ".":
file_extension = file_extension[1:]
file_extension = file_extension.lower()
for valid_extension in extensions:
#print(file_name, "---", file_extension, "---", valid_extension)
if file_extension == valid_extension:
return True
return False

18
domain/Logger.py Normal file
View File

@ -0,0 +1,18 @@
import logging
class Logger:
def __init__(self, name: str):
self.__name = name
def debug(self, s):
print(self.__name, s)
def info(self, s):
print(self.__name, s)
def warn(self, s):
print(self.__name, s)
def error(self, s):
print(self.__name, s)