Compare commits
19 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
161f018cb9 | ||
|
565c1d31b9 | ||
|
1517f64424 | ||
|
cf3bd8eb85 | ||
|
2ef7aaf195 | ||
|
3c4ed5f792 | ||
|
403531d7f0 | ||
|
bd088ac545 | ||
|
d2af261477 | ||
|
5828a0363f | ||
|
b2261a3c49 | ||
|
6ceaa89dad | ||
|
2f706bacf6 | ||
|
dbbe3b88af | ||
|
b8aff6429f | ||
|
33c4233797 | ||
|
1564f4cf8a | ||
|
2e6d268de0 | ||
|
bfafd890d0 |
@ -7,7 +7,7 @@ Suppose you have a third party backup program regularly dropping backup files in
|
||||
|
||||
# License
|
||||
|
||||
Copyright 2023 Mike Peralta; All rights reserved
|
||||
Copyright 2024 Mike Peralta; All rights reserved
|
||||
|
||||
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
|
||||
|
||||
|
@ -6,56 +6,75 @@ Mike's Backup Rotator
|
||||
|
||||
A simple script to help automatically rotate backup files
|
||||
|
||||
Copyright 2023 Mike Peralta; All rights reserved
|
||||
Copyright 2024 Mike Peralta; All rights reserved
|
||||
|
||||
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
|
||||
|
||||
"""
|
||||
|
||||
|
||||
from domain.config.Config import Config
|
||||
from domain.config.ConfigFile import ConfigFile
|
||||
from domain.Logger import Logger
|
||||
from domain.Config import Config
|
||||
from domain.Util import Util
|
||||
|
||||
|
||||
import datetime
|
||||
import os
|
||||
# import pprint
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
import yaml
|
||||
|
||||
|
||||
class BackupRotator:
|
||||
|
||||
def __init__(self, debug:bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
config_paths: [Path] = None,
|
||||
debug: bool = False,
|
||||
systemd: bool = False,
|
||||
write_to_syslog: bool = False,
|
||||
do_test_logs: bool = True,
|
||||
):
|
||||
self.__do_test_logs = do_test_logs
|
||||
|
||||
self.__logger = Logger(name=type(self).__name__, debug=debug)
|
||||
self.__config_helper = Config(logger=self.__logger)
|
||||
self.__logger = Logger(
|
||||
name=type(self).__name__,
|
||||
debug=debug,
|
||||
systemd=systemd,
|
||||
write_to_syslog=write_to_syslog,
|
||||
do_test_logs=do_test_logs,
|
||||
)
|
||||
|
||||
self.__dry_run = False
|
||||
self.__configs = []
|
||||
self.__config_paths = []
|
||||
self.__config = Config(
|
||||
logger=self.__logger,
|
||||
config_files_paths=config_paths
|
||||
)
|
||||
|
||||
self.__global_dry_run = True
|
||||
self.__calculated_actions = []
|
||||
|
||||
def run(self, configs, dry_run: bool = False):
|
||||
def run(self, global_dry_run: bool = True):
|
||||
|
||||
self.info("Begin")
|
||||
self.info("Begin rotating")
|
||||
|
||||
self.__dry_run = dry_run
|
||||
self.__config_paths = configs
|
||||
|
||||
self._consume_configs(self.__config_paths)
|
||||
self.__global_dry_run = global_dry_run
|
||||
if self.__global_dry_run:
|
||||
self.info(f"Running as a dry run, globally.")
|
||||
else:
|
||||
self.info(f"Won't run as a global dry run.")
|
||||
|
||||
# Rotate once per config
|
||||
for config_index in range(len(self.__configs)):
|
||||
config_file_index = -1
|
||||
for config_file in self.__config.config_files:
|
||||
|
||||
#
|
||||
config = self.__configs[config_index]
|
||||
config_file: ConfigFile
|
||||
config_file_index += 1
|
||||
|
||||
#
|
||||
self.info(f"Rotating for config {config_index + 1} of {len(self.__configs)} : {config['__path']}")
|
||||
self._do_rotate(config)
|
||||
self.info(
|
||||
f"Rotating for config {config_file_index + 1} of {len(self.__config.config_files)}"
|
||||
f" : {config_file.path}"
|
||||
f"\n{config_file}"
|
||||
)
|
||||
self._do_rotate(config_file)
|
||||
|
||||
@staticmethod
|
||||
def current_time():
|
||||
@ -66,312 +85,333 @@ class BackupRotator:
|
||||
|
||||
def debug(self, s):
|
||||
self.__logger.debug(s)
|
||||
|
||||
def info(self, s):
|
||||
self.__logger.info(s)
|
||||
|
||||
def warn(self, s):
|
||||
self.__logger.warn(s)
|
||||
self.__logger.warning(s)
|
||||
|
||||
def error(self, s):
|
||||
self.__logger.error(s)
|
||||
|
||||
def _consume_configs(self, paths: list=None):
|
||||
def _do_rotate(self, config: ConfigFile):
|
||||
|
||||
configs = self.__config_helper.gather_valid_configs(paths=paths)
|
||||
for config in configs:
|
||||
self._consume_config(path=config)
|
||||
self.info(
|
||||
f"Rotating for config: {config.path}"
|
||||
)
|
||||
if config.dry_run:
|
||||
self.info(
|
||||
f"Config {config.path.name} is set for a dry run (no deleting)."
|
||||
)
|
||||
else:
|
||||
self.info(
|
||||
f"Config {config.path.name} is not set for a dry run (will delete)."
|
||||
)
|
||||
|
||||
def _consume_config(self, path: str):
|
||||
|
||||
# Open the file
|
||||
f = open(path)
|
||||
if not f:
|
||||
raise Exception("Unable to open config file: " + path)
|
||||
|
||||
# Parse
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
# Add its own path
|
||||
config["__path"] = path
|
||||
|
||||
# Consume to internal
|
||||
self.__configs.append(config)
|
||||
self.info(f"Consumed config from path: {path}")
|
||||
self._rotate_paths(config=config)
|
||||
|
||||
def _do_rotate(self, config):
|
||||
def _rotate_paths(self, config: ConfigFile):
|
||||
|
||||
paths = config.rotatable_paths
|
||||
self.info(f"Begin rotating {len(paths)} paths")
|
||||
|
||||
for path in paths:
|
||||
|
||||
path: Path
|
||||
|
||||
self._rotate_path(config=config, path=path)
|
||||
|
||||
self._rotate_paths(config)
|
||||
def _rotate_path(self, config: ConfigFile, path: Path):
|
||||
|
||||
assert path.is_dir(), (
|
||||
f"Path should be a directory: {path}"
|
||||
)
|
||||
|
||||
self.info(
|
||||
f"Rotating path: {path}"
|
||||
)
|
||||
|
||||
self._rotate_path_for_maximum_items(
|
||||
config=config,
|
||||
path=path,
|
||||
)
|
||||
|
||||
self._rotate_path_for_maximum_age(
|
||||
config=config,
|
||||
path=path,
|
||||
)
|
||||
|
||||
def _rotate_paths(self, config):
|
||||
def _rotate_path_for_maximum_items(self, config: ConfigFile, path: Path):
|
||||
|
||||
self.info("Begin rotating " + str(len(config["paths"])) + " paths")
|
||||
for path in config["paths"]:
|
||||
self._rotate_path(config, path)
|
||||
|
||||
def _rotate_path(self, config, path):
|
||||
assert path.is_dir(), f"Path should be a directory: {path}"
|
||||
|
||||
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
|
||||
if config.maximum_items:
|
||||
self.info(
|
||||
f"Rotating path for a maximum of {config.maximum_items} items: {path}"
|
||||
)
|
||||
else:
|
||||
self.info(
|
||||
f"Not configured to rotate for maximum number of items."
|
||||
)
|
||||
return
|
||||
|
||||
self.info("Rotating path: {}".format(path))
|
||||
self.info(
|
||||
f"Will gather rotation candidates for maximum number of items."
|
||||
)
|
||||
|
||||
found_any_rotation_keys = False
|
||||
if "maximum-items" in config.keys():
|
||||
found_any_rotation_keys = True
|
||||
self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"])
|
||||
if "maximum-age" in config.keys():
|
||||
found_any_rotation_keys = True
|
||||
self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"])
|
||||
|
||||
assert found_any_rotation_keys is True, \
|
||||
"Config needs one of the following keys: \"maximum-items\""
|
||||
|
||||
def _rotate_path_for_maximum_items(self, config, path: str, max_items: int):
|
||||
|
||||
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
|
||||
|
||||
self.info("Rotating path for a maximum of {} items: {}".format(
|
||||
max_items, path
|
||||
))
|
||||
|
||||
children = self._gather_rotation_candidates(config, path)
|
||||
|
||||
minimum_items = self._determine_minimum_items(config)
|
||||
candidate_items = self._gather_rotation_candidates(config=config, path=path)
|
||||
minimum_items = self._determine_minimum_items(config=config)
|
||||
|
||||
# Do we need to rotate anything out?
|
||||
if len(children) < minimum_items:
|
||||
self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
|
||||
len(children), minimum_items
|
||||
))
|
||||
if len(candidate_items) < minimum_items:
|
||||
|
||||
self.info(
|
||||
f"Path only has {len(candidate_items)} items"
|
||||
f", which does not meet the minimum threshold of {minimum_items} items."
|
||||
" Won't rotate this path."
|
||||
)
|
||||
return
|
||||
elif len(children) <= max_items:
|
||||
self.info("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format(
|
||||
len(children), max_items
|
||||
))
|
||||
|
||||
elif len(candidate_items) <= config.maximum_items:
|
||||
self.info(
|
||||
f"Path only has {len(candidate_items)} items"
|
||||
f", but needs more than {config.maximum_items} for rotation"
|
||||
"; Won't rotate this path."
|
||||
)
|
||||
return
|
||||
self.info("Found {} items to examine".format(len(children)))
|
||||
|
||||
self.info(f"Found {len(candidate_items)} items to examine")
|
||||
|
||||
#
|
||||
maximum_purge_count = len(children) - minimum_items
|
||||
purge_count = len(children) - max_items
|
||||
self.info("Want to purge {} items".format(purge_count))
|
||||
|
||||
maximum_purge_count = len(candidate_items) - minimum_items
|
||||
purge_count = len(candidate_items) - config.maximum_items
|
||||
self.info(
|
||||
f"Want to purge {purge_count} items to stay under maximum of {config.maximum_items}"
|
||||
)
|
||||
|
||||
if purge_count > maximum_purge_count:
|
||||
self.info("Reducing purge count from {} to {} items to respect minimum items setting ({})".format(
|
||||
purge_count, maximum_purge_count, minimum_items
|
||||
))
|
||||
self.info(
|
||||
f"Reducing purge count from"
|
||||
f" {purge_count} to {maximum_purge_count} items"
|
||||
f" to respect minimum items setting ({minimum_items})"
|
||||
)
|
||||
purge_count = maximum_purge_count
|
||||
|
||||
children_to_purge = []
|
||||
items_to_purge = []
|
||||
for purge_index in range(purge_count):
|
||||
|
||||
#
|
||||
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children)
|
||||
children.remove(item_to_purge)
|
||||
self.info("Found next item to purge: ({}) {} ({})".format(
|
||||
purge_index + 1,
|
||||
os.path.basename(item_to_purge),
|
||||
item_age
|
||||
))
|
||||
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(
|
||||
config=config, items=candidate_items
|
||||
)
|
||||
item_to_purge: Path
|
||||
|
||||
candidate_items.remove(item_to_purge)
|
||||
|
||||
self.info(
|
||||
f"Will purge: ({purge_index + 1})"
|
||||
f" {item_to_purge.name}"
|
||||
f" ({item_age})"
|
||||
)
|
||||
|
||||
#
|
||||
children_to_purge.append(item_to_purge)
|
||||
|
||||
items_to_purge.append(item_to_purge)
|
||||
|
||||
#
|
||||
self.info("Removing items")
|
||||
for child_to_purge in children_to_purge:
|
||||
child_basename = os.path.basename(child_to_purge)
|
||||
self._remove_item(config, child_to_purge)
|
||||
for item_to_purge in items_to_purge:
|
||||
|
||||
item_to_purge: Path
|
||||
|
||||
self.debug(f"Purging item: {item_to_purge.name}")
|
||||
|
||||
self._remove_item(config=config, path=item_to_purge)
|
||||
|
||||
def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int):
|
||||
def _rotate_path_for_maximum_age(self, config: ConfigFile, path: Path):
|
||||
|
||||
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
|
||||
assert path.is_dir(), f"Path should be a directory: {path}"
|
||||
|
||||
self.info("Rotating path for max age of {} days: {}".format(max_age_days, path))
|
||||
|
||||
children = self._gather_rotation_candidates(config, path)
|
||||
minimum_items = self._determine_minimum_items(config)
|
||||
|
||||
# Do we need to rotate anything out?
|
||||
if len(children) < minimum_items:
|
||||
self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
|
||||
len(children), minimum_items
|
||||
))
|
||||
if config.maximum_age:
|
||||
self.info(
|
||||
f"Rotating path for max age of {config.maximum_age} days: {path}"
|
||||
)
|
||||
else:
|
||||
self.info(
|
||||
f"Not configured to rotate for a maximum number of days."
|
||||
)
|
||||
return
|
||||
|
||||
self.info("Examining {} items for deletion".format(len(children)))
|
||||
children_to_delete = []
|
||||
for child in children:
|
||||
|
||||
age_seconds = self._detect_item_age_seconds(config, child)
|
||||
age_days = self._detect_item_age_days(config, child)
|
||||
age_formatted = self.seconds_to_time_string(age_seconds)
|
||||
child_basename = os.path.basename(child)
|
||||
|
||||
if age_days > max_age_days:
|
||||
self.info("[Old enough ] {} ({})".format(
|
||||
child_basename, age_formatted
|
||||
))
|
||||
children_to_delete.append(child)
|
||||
else:
|
||||
self.info("[Not Old enough] {} ({})".format(
|
||||
child_basename, age_formatted
|
||||
))
|
||||
self.info(
|
||||
f"Will gather rotation candidates for maximum age, in days."
|
||||
)
|
||||
candidate_items = self._gather_rotation_candidates(config=config, path=path)
|
||||
minimum_items = self._determine_minimum_items(config=config)
|
||||
|
||||
if len(children_to_delete) > 0:
|
||||
# Do we need to rotate anything out?
|
||||
if len(candidate_items) < minimum_items:
|
||||
self.info(
|
||||
f"Path only has {len(candidate_items)} items"
|
||||
f", which does not meet the minimum threshold of {minimum_items} items."
|
||||
f" Won't rotate this path."
|
||||
)
|
||||
return
|
||||
|
||||
self.info(
|
||||
f"Examining {len(candidate_items)} items for deletion"
|
||||
)
|
||||
items_to_delete = []
|
||||
for item in candidate_items:
|
||||
|
||||
age_seconds = Util.detect_item_age_seconds(config=config, item=item)
|
||||
age_days = Util.detect_item_age_days(config=config, item=item)
|
||||
age_formatted = Util.seconds_to_time_string(age_seconds)
|
||||
|
||||
if age_days > config.maximum_age:
|
||||
self.info(
|
||||
f"[Old enough ] {item.name} ({age_formatted})"
|
||||
)
|
||||
items_to_delete.append(item)
|
||||
else:
|
||||
self.info(
|
||||
f"[Not Old enough] {item.name} ({age_formatted})"
|
||||
)
|
||||
|
||||
if len(items_to_delete) > 0:
|
||||
|
||||
self.info("Removing old items ...")
|
||||
for child_to_delete in children_to_delete:
|
||||
basename = os.path.basename(child_to_delete)
|
||||
self._remove_item(config, child_to_delete)
|
||||
|
||||
for item in items_to_delete:
|
||||
self._remove_item(config, item)
|
||||
|
||||
else:
|
||||
self.info("No old items to remove")
|
||||
|
||||
def _gather_rotation_candidates(self, config: ConfigFile, path: Path) -> [Path]:
|
||||
|
||||
self.debug(f"Begin gathering rotation candidates for: {path}")
|
||||
|
||||
@staticmethod
|
||||
def _gather_rotation_candidates(config, path):
|
||||
candidates: [Path] = []
|
||||
|
||||
candidates = []
|
||||
|
||||
if "target-type" not in config.keys():
|
||||
raise Exception("Please provide the configuration key: target-type")
|
||||
|
||||
for item_name in os.listdir(path):
|
||||
for item in path.iterdir():
|
||||
|
||||
item_path = os.path.join(path, item_name)
|
||||
self.debug(f"Found an item: {item.name}")
|
||||
|
||||
if config["target-type"] == "file":
|
||||
if not os.path.isfile(item_path):
|
||||
if config.target_type == "file":
|
||||
|
||||
if not item.is_file():
|
||||
self.debug(f"Not a file; Skipping: {item.name}")
|
||||
continue
|
||||
elif config["target-type"] == "directory":
|
||||
if not os.path.isdir(item_path):
|
||||
|
||||
elif config.target_type == "directory":
|
||||
|
||||
if not item.is_dir():
|
||||
self.debug(f"Not a directory; Skipping: {item.name}")
|
||||
continue
|
||||
|
||||
else:
|
||||
raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"")
|
||||
raise Exception(
|
||||
f"Unsupported target type: {config.target_type}"
|
||||
)
|
||||
|
||||
candidates.append(item_path)
|
||||
candidates.append(item)
|
||||
|
||||
self.__logger.info(f"Returning {len(candidates)} potential candidates to remove.")
|
||||
|
||||
return candidates
|
||||
|
||||
def _pick_oldest_item(self, config, items):
|
||||
def _pick_oldest_item(self, config: ConfigFile, items: [Path]) -> (Path, float, float, str):
|
||||
|
||||
best_item = None
|
||||
best_ctime = None
|
||||
for item in items:
|
||||
|
||||
ctime = self._detect_item_date(config, item)
|
||||
try:
|
||||
ctime = Util.detect_item_creation_date(config, item)
|
||||
except FileNotFoundError as e:
|
||||
self.__logger.error(f"File disappeared while trying to check ctime: {item}")
|
||||
continue
|
||||
|
||||
if best_ctime is None or ctime < best_ctime:
|
||||
best_ctime = ctime
|
||||
best_item = item
|
||||
|
||||
age_seconds = self._detect_item_age_seconds(config, best_item)
|
||||
age_string = self.seconds_to_time_string(age_seconds)
|
||||
age_seconds = Util.detect_item_age_seconds(config, best_item)
|
||||
age_string = Util.seconds_to_time_string(age_seconds)
|
||||
|
||||
return best_item, best_ctime, age_seconds, age_string
|
||||
|
||||
@staticmethod
|
||||
def _detect_item_date(config, item):
|
||||
def _remove_item(self, config: ConfigFile, path: Path):
|
||||
|
||||
assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\""
|
||||
detection = config["date-detection"]
|
||||
|
||||
if detection == "file":
|
||||
ctime = os.path.getctime(item)
|
||||
if path.is_file():
|
||||
|
||||
self._remove_file(config=config, file_path=path)
|
||||
|
||||
elif path.is_dir():
|
||||
|
||||
self._remove_directory(config=config, dir_path=path)
|
||||
|
||||
else:
|
||||
raise AssertionError(f"Invalid value for \"date-detection\"; Should be one of [file]: {detection}")
|
||||
|
||||
return ctime
|
||||
|
||||
def _detect_item_age_seconds(self, config, item):
|
||||
raise AssertionError(
|
||||
f"Don't know how to remove this item: {path}"
|
||||
)
|
||||
|
||||
def _remove_file(self, config: ConfigFile, file_path: Path):
|
||||
|
||||
now = time.time()
|
||||
ctime = self._detect_item_date(config, item)
|
||||
delta = now - ctime
|
||||
if not file_path.is_file():
|
||||
raise Exception(
|
||||
f"Tried to remove a file, but this path isn't a file: {file_path}"
|
||||
)
|
||||
|
||||
return delta
|
||||
|
||||
def _detect_item_age_days(self, config, item):
|
||||
|
||||
age_seconds = self._detect_item_age_seconds(config, item)
|
||||
age_days = int(age_seconds / 86400)
|
||||
|
||||
return age_days
|
||||
|
||||
def seconds_to_time_string(self, seconds: float):
|
||||
|
||||
if isinstance(seconds, float):
|
||||
pass
|
||||
elif isinstance(seconds, int):
|
||||
seconds = float * 1.0
|
||||
else:
|
||||
raise AssertionError("Seconds must be an int or float")
|
||||
|
||||
# Map
|
||||
map = {
|
||||
"year": 31536000.0,
|
||||
"month": 2592000.0,
|
||||
"week": 604800.0,
|
||||
"day": 86400.0,
|
||||
"hour": 3600.0,
|
||||
"minute": 60.0,
|
||||
"second": 1.0
|
||||
}
|
||||
|
||||
s_parts = []
|
||||
for unit_label in map.keys():
|
||||
unit_seconds = map[unit_label]
|
||||
if seconds >= unit_seconds:
|
||||
unit_count = int(seconds / unit_seconds)
|
||||
s_parts.append("{} {}{}".format(
|
||||
unit_count, unit_label,
|
||||
"" if unit_count == 1 else "s"
|
||||
))
|
||||
seconds -= unit_seconds * unit_count
|
||||
|
||||
s = ", ".join(s_parts)
|
||||
|
||||
return s
|
||||
|
||||
def _remove_item(self, config, path):
|
||||
|
||||
if os.path.isfile(path):
|
||||
self._remove_file(config, path)
|
||||
elif os.path.isdir(path):
|
||||
self._remove_directory(config, path)
|
||||
else:
|
||||
raise AssertionError("Don't know how to remove this item: {}".format(path))
|
||||
|
||||
def _remove_file(self, config, file_path):
|
||||
|
||||
if not os.path.isfile(file_path):
|
||||
raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path))
|
||||
|
||||
if self.__dry_run:
|
||||
self.info(f"Won't purge file during global-level dry run: {file_path}")
|
||||
elif "dry-run" in config.keys() and config["dry-run"] is True:
|
||||
self.info(f"Won't purge file during config-level dry run: {file_path}")
|
||||
if self.__global_dry_run:
|
||||
|
||||
self.info(f"(Global Dry Run) {file_path}")
|
||||
|
||||
elif config.dry_run is True:
|
||||
|
||||
self.info(f"(Config Dry Run) {file_path}")
|
||||
|
||||
else:
|
||||
self.info(f"Purging file: {file_path}")
|
||||
os.remove(file_path)
|
||||
file_path.unlink()
|
||||
|
||||
def _remove_directory(self, config, dir_path):
|
||||
def _remove_directory(self, config: ConfigFile, dir_path: Path):
|
||||
|
||||
if not os.path.isdir(dir_path):
|
||||
raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))
|
||||
if not dir_path.is_dir():
|
||||
raise Exception(
|
||||
f"Tried to remove a directory"
|
||||
f", but this path isn't a directory: {dir_path}"
|
||||
)
|
||||
|
||||
if self.__dry_run:
|
||||
self.info(f"Won't purge directory during global-level dry run: {dir_path}")
|
||||
elif "dry-run" in config.keys() and config["dry-run"] is True:
|
||||
self.info(f"Won't purge directory during config-level dry run: {dir_path}")
|
||||
if self.__global_dry_run:
|
||||
|
||||
self.info(f"(Global Dry Run) {dir_path}")
|
||||
|
||||
elif config.dry_run:
|
||||
|
||||
self.info(f"(Config Dry Run) {dir_path}")
|
||||
|
||||
else:
|
||||
|
||||
self.info(f"Purging directory: {dir_path}")
|
||||
shutil.rmtree(dir_path)
|
||||
|
||||
|
||||
def _determine_minimum_items(self, config):
|
||||
def _determine_minimum_items(self, config) -> int:
|
||||
|
||||
minimum_items = 0
|
||||
|
||||
if "minimum-items" in config.keys():
|
||||
minimum_items = config["minimum-items"]
|
||||
self.info("Won't delete anything unless a minimum of {} items were found".format(minimum_items))
|
||||
if config.minimum_items is not None:
|
||||
|
||||
minimum_items = config.minimum_items
|
||||
|
||||
self.info(
|
||||
f"Won't delete anything unless a minimum of {minimum_items} items were found"
|
||||
)
|
||||
|
||||
else:
|
||||
self.info("No value found for \"minimum-items\"; Will not enforce minimum item constraint.")
|
||||
self.info(
|
||||
"No minimum number of items specified"
|
||||
"; Will not enforce minimum item constraint."
|
||||
)
|
||||
|
||||
return minimum_items
|
||||
|
@ -4,23 +4,45 @@ from logging.handlers import SysLogHandler
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
class Logger:
|
||||
|
||||
def __init__(self, name: str, debug: bool=False):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
debug: bool = False,
|
||||
write_to_syslog: bool = False,
|
||||
systemd: bool = False,
|
||||
do_test_logs: bool = True,
|
||||
):
|
||||
|
||||
self.__name = name
|
||||
self.__debug = debug
|
||||
self.__write_to_syslog = write_to_syslog
|
||||
self.__systemd = systemd
|
||||
self.__do_test_logs = do_test_logs
|
||||
|
||||
self._init_logger()
|
||||
|
||||
def _init_logger(self):
|
||||
|
||||
self.__logger = logging.getLogger(self.__name)
|
||||
|
||||
if debug:
|
||||
|
||||
if self.__debug:
|
||||
level = logging.DEBUG
|
||||
else:
|
||||
level = logging.INFO
|
||||
|
||||
self.__logger.setLevel(level)
|
||||
|
||||
formatter = logging.Formatter('[%(name)s][%(levelname)s] %(message)s')
|
||||
formatter_full = logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s] %(message)s')
|
||||
formatter = logging.Formatter(
|
||||
fmt="[{name}][{levelname:<7}] {message}",
|
||||
style='{'
|
||||
)
|
||||
formatter_full = logging.Formatter(
|
||||
fmt="[{asctime}][{name}][{levelname:<7}] {message}",
|
||||
style='{'
|
||||
)
|
||||
|
||||
# Console output / stream handler (STDOUT)
|
||||
handler = logging.StreamHandler(
|
||||
@ -28,7 +50,9 @@ class Logger:
|
||||
)
|
||||
handler.setLevel(level)
|
||||
handler.addFilter(lambda entry: entry.levelno <= logging.INFO)
|
||||
handler.setFormatter(formatter_full)
|
||||
handler.setFormatter(
|
||||
formatter if self.__systemd else formatter_full
|
||||
)
|
||||
self.__logger.addHandler(handler)
|
||||
|
||||
# Console output / stream handler (STDERR)
|
||||
@ -36,29 +60,38 @@ class Logger:
|
||||
stream=sys.stderr
|
||||
)
|
||||
handler.setLevel(logging.WARNING)
|
||||
handler.setFormatter(formatter_full)
|
||||
handler.setFormatter(
|
||||
formatter if self.__systemd else formatter_full
|
||||
)
|
||||
self.__logger.addHandler(handler)
|
||||
|
||||
# Syslog handler
|
||||
handler = SysLogHandler(
|
||||
address="/dev/log"
|
||||
)
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
self.__logger.addHandler(handler)
|
||||
if self.__write_to_syslog:
|
||||
handler = SysLogHandler(
|
||||
address="/dev/log"
|
||||
)
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
self.__logger.addHandler(handler)
|
||||
|
||||
# This is annoying inside cron
|
||||
# self.debug("Test debug log")
|
||||
# self.info("Test info log")
|
||||
# self.warn("Test warn log")
|
||||
# self.error("Test error log")
|
||||
|
||||
if self.__do_test_logs:
|
||||
self.debug("Test debug log")
|
||||
self.info("Test info log")
|
||||
self.warn("Test warn log")
|
||||
self.error("Test error log")
|
||||
|
||||
def debug(self, s):
|
||||
self.__logger.debug(s)
|
||||
|
||||
def info(self, s):
|
||||
self.__logger.info(s)
|
||||
|
||||
def warn(self, s):
|
||||
self.__logger.warn(s)
|
||||
self.__logger.warning(s)
|
||||
|
||||
def warning(self, s):
|
||||
self.__logger.warning(s)
|
||||
|
||||
def error(self, s):
|
||||
self.__logger.error(s)
|
||||
|
130
domain/Util.py
Normal file
130
domain/Util.py
Normal file
@ -0,0 +1,130 @@
|
||||
|
||||
|
||||
from domain.config.ConfigFile import ConfigFile
|
||||
|
||||
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Util:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_dir_files_recursive(path: Path) -> [Path]:
|
||||
|
||||
files_paths = []
|
||||
for dir_path, dirs_names, filenames in path.walk():
|
||||
|
||||
for file_name in filenames:
|
||||
|
||||
file_path = dir_path / file_name
|
||||
|
||||
files_paths.append(file_path)
|
||||
|
||||
return files_paths
|
||||
|
||||
@staticmethod
|
||||
def detect_item_creation_date(config: ConfigFile, item: Path) -> datetime.datetime:
|
||||
|
||||
stat = None
|
||||
|
||||
if config.date_detection == "file":
|
||||
|
||||
# Try for the most accurate stat
|
||||
# First one that raises will just break the block, obv
|
||||
try:
|
||||
stat = item.stat().st_ctime
|
||||
# print("got ctime")
|
||||
stat = item.stat().st_mtime
|
||||
# print("got mtime")
|
||||
stat = item.stat().st_birthtime
|
||||
# print("got btime")
|
||||
except FileNotFoundError as e:
|
||||
raise e
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
else:
|
||||
raise AssertionError(
|
||||
f"Unsupported date-detection option: {config.date_detection}"
|
||||
)
|
||||
|
||||
stamp = datetime.datetime.fromtimestamp(
|
||||
stat
|
||||
)
|
||||
# print("Stat:", stat)
|
||||
# print("Stamp:", stamp)
|
||||
# print(item.name, "==>", stamp)
|
||||
|
||||
return stamp
|
||||
|
||||
@staticmethod
|
||||
def detect_item_age_seconds(config: ConfigFile, item: Path) -> float:
|
||||
|
||||
now = datetime.datetime.now()
|
||||
|
||||
ctime = Util.detect_item_creation_date(config=config, item=item)
|
||||
delta = now - ctime
|
||||
seconds = delta.seconds
|
||||
|
||||
# print(item.name, "==>", seconds, f"({ctime})")
|
||||
# print(">", "Now was:", now)
|
||||
# print(">", "ctime was:", ctime)
|
||||
# print(">", "Delta was:", delta)
|
||||
# print(">", "Seconds was:", delta.total_seconds())
|
||||
|
||||
return delta.total_seconds()
|
||||
|
||||
@staticmethod
|
||||
def detect_item_age_days(config: ConfigFile, item: Path) -> int:
|
||||
|
||||
age_seconds = Util.detect_item_age_seconds(
|
||||
config=config, item=item
|
||||
)
|
||||
age_days = int(age_seconds / 86400)
|
||||
|
||||
return age_days
|
||||
|
||||
@staticmethod
|
||||
def seconds_to_time_string(seconds: float):
|
||||
|
||||
if isinstance(seconds, float):
|
||||
pass
|
||||
elif isinstance(seconds, int):
|
||||
seconds = float(seconds)
|
||||
else:
|
||||
raise AssertionError("Seconds must be an int or float")
|
||||
|
||||
# Map
|
||||
dt_map = {
|
||||
"year": 31536000.0,
|
||||
"month": 2592000.0,
|
||||
"week": 604800.0,
|
||||
"day": 86400.0,
|
||||
"hour": 3600.0,
|
||||
"minute": 60.0,
|
||||
"second": 1.0
|
||||
}
|
||||
|
||||
s_parts = []
|
||||
for unit_label in dt_map.keys():
|
||||
|
||||
unit_seconds = dt_map[unit_label]
|
||||
|
||||
if seconds >= unit_seconds:
|
||||
|
||||
unit_count = int(seconds / unit_seconds)
|
||||
|
||||
unit_plural = "" if unit_count == 1 else "s"
|
||||
s_parts.append(
|
||||
f"{unit_count} {unit_label}{unit_plural}"
|
||||
)
|
||||
|
||||
seconds -= unit_seconds * unit_count
|
||||
|
||||
s = ", ".join(s_parts)
|
||||
|
||||
return s
|
43
domain/config/Config.py
Normal file
43
domain/config/Config.py
Normal file
@ -0,0 +1,43 @@
|
||||
|
||||
|
||||
from domain.config.ConfigFile import ConfigFile
|
||||
from domain.config.Scanner import Scanner
|
||||
from domain.Logger import Logger
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
def __init__(self, logger: Logger, config_files_paths: [Path]):
|
||||
|
||||
self.__logger = logger
|
||||
|
||||
self.__config_files_paths: [Path] = config_files_paths
|
||||
self.__configs = {}
|
||||
|
||||
self.__scanner = Scanner(
|
||||
logger=self.__logger
|
||||
)
|
||||
|
||||
self._consume_configs()
|
||||
|
||||
def _consume_configs(self):
|
||||
|
||||
config_paths = self.__scanner.gather_valid_config_paths(
|
||||
paths=self.__config_files_paths
|
||||
)
|
||||
|
||||
for config_path in config_paths:
|
||||
|
||||
config = ConfigFile(
|
||||
logger=self.__logger,
|
||||
path=config_path
|
||||
)
|
||||
|
||||
self.__configs[config.key] = config
|
||||
|
||||
@property
|
||||
def config_files(self) -> [ConfigFile]:
|
||||
return self.__configs.values()
|
309
domain/config/ConfigFile.py
Normal file
309
domain/config/ConfigFile.py
Normal file
@ -0,0 +1,309 @@
|
||||
|
||||
|
||||
from domain.Logger import Logger
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
|
||||
class ConfigFile:
|
||||
|
||||
__VALID_TARGET_TYPES = [
|
||||
"file",
|
||||
"directory"
|
||||
]
|
||||
|
||||
__VALID_DATE_DETECTION_TYPES = [
|
||||
"file"
|
||||
]
|
||||
|
||||
__DEFAULT_MINIMUM_ITEMS = 0
|
||||
__DEFAULT_MAXIMUM_ITEMS = None
|
||||
__DEFAULT_MAXIMUM_AGE = None
|
||||
|
||||
def __init__(
|
||||
self, logger: Logger,
|
||||
path: Path,
|
||||
):
|
||||
|
||||
self.__logger = logger
|
||||
self.__path = path.absolute()
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
self.__data: dict = None
|
||||
|
||||
self.__dry_run: bool = True
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
self.__target_type: str = None
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
self.__date_detection: str = None
|
||||
|
||||
self.__rotatable_paths: [Path] = []
|
||||
|
||||
self.__minimum_items = self.__DEFAULT_MINIMUM_ITEMS
|
||||
# noinspection PyTypeChecker
|
||||
self.__maximum_items: int = self.__DEFAULT_MAXIMUM_ITEMS
|
||||
# noinspection PyTypeChecker
|
||||
self.__maximum_age: int = None
|
||||
|
||||
self._load()
|
||||
self._consume()
|
||||
|
||||
def __str__(self):
|
||||
|
||||
s = ""
|
||||
|
||||
s += "*** Config File ***"
|
||||
s += f"\n> Path: {self.__path}"
|
||||
s += f"\n> Dry run: " + ("Yes" if self.__dry_run else "No")
|
||||
s += f"\n> Minimum items: {self.__minimum_items}"
|
||||
s += f"\n> Maximum items: {self.__maximum_items}"
|
||||
s += f"\n> Maximum age (in days): {self.__maximum_age}"
|
||||
s += f"\n> Target type: {self.__target_type}"
|
||||
s += f"\n> Date detection: {self.__date_detection}"
|
||||
s += f"\n> Rotatable paths: "
|
||||
if len(self.__rotatable_paths) > 0:
|
||||
for p in self.__rotatable_paths:
|
||||
s += f"\n>> {p}"
|
||||
else:
|
||||
s += "\n>> [none]"
|
||||
|
||||
return s
|
||||
|
||||
def _load(self):
|
||||
|
||||
self.info(f"Loading config: {self.__path}")
|
||||
|
||||
assert self.__path.is_file(), (
|
||||
f"Cannot load config file because it isn't a file: {self.__path}"
|
||||
)
|
||||
|
||||
# Open the file
|
||||
self.debug(f"Opening config file for load: {self.__path}")
|
||||
f = open(str(self.__path))
|
||||
if not f:
|
||||
raise Exception(f"Unable to open config file: {self.__path}")
|
||||
|
||||
# Load data
|
||||
self.__data = yaml.safe_load(f)
|
||||
assert self.__data is not None, (
|
||||
f"Config file seems to be null or empty: {self.__path}"
|
||||
)
|
||||
|
||||
# Consume to internal
|
||||
self.info(f"Loaded config from path: {self.__path}")
|
||||
|
||||
def _consume(self):
|
||||
|
||||
try:
|
||||
|
||||
assert isinstance(self.__data, dict), (
|
||||
f"Config file should be a dict!"
|
||||
)
|
||||
|
||||
if "options" in self.__data.keys():
|
||||
|
||||
self.info(f"Found options setting")
|
||||
options = self.__data["options"]
|
||||
assert isinstance(options, dict), "Options must be a dict"
|
||||
|
||||
if "dry-run" in options.keys():
|
||||
|
||||
dry_run = self.__data["options"]["dry-run"]
|
||||
self.info(f"Found dry run option: {dry_run}")
|
||||
assert isinstance(dry_run, bool), "dry-run setting must be boolean"
|
||||
self.__dry_run = dry_run
|
||||
else:
|
||||
self.warning(f"No dry-run option found; Will use default: {self.__dry_run}")
|
||||
|
||||
if "minimum-items" in options.keys():
|
||||
|
||||
minimum_items = options["minimum-items"]
|
||||
self.info(f"Found minimum-items option: {minimum_items}")
|
||||
if minimum_items is None:
|
||||
minimum_items = self.__DEFAULT_MINIMUM_ITEMS
|
||||
assert isinstance(minimum_items, int), (
|
||||
f"Option minimum-items must be an integer,"
|
||||
f" but got: {type(minimum_items).__name__} ({minimum_items})"
|
||||
)
|
||||
self.__minimum_items = minimum_items
|
||||
else:
|
||||
self.warning(
|
||||
f"No minimum-items option found; Will use default: {self.__minimum_items}"
|
||||
)
|
||||
|
||||
assert (
|
||||
"maximum-items" in options.keys()
|
||||
or
|
||||
"maximum-age" in options.keys()
|
||||
), (
|
||||
"Options should include either maximum-items or maximum-age"
|
||||
)
|
||||
|
||||
if "maximum-items" in options.keys():
|
||||
|
||||
maximum_items = options["maximum-items"]
|
||||
self.info(f"Found maximum-items option: {maximum_items}")
|
||||
assert maximum_items is None or isinstance(maximum_items, int), (
|
||||
f"Option maximum-items must be integer, but got: {maximum_items}"
|
||||
)
|
||||
assert maximum_items is None or maximum_items > 0, (
|
||||
f"Option maximum-items is zero, which doesn't make sense."
|
||||
)
|
||||
self.__maximum_items = maximum_items
|
||||
else:
|
||||
self.warning(
|
||||
f"No maximum-items option found; Will use default: {self.__maximum_items}"
|
||||
)
|
||||
|
||||
if "maximum-age" in options.keys():
|
||||
|
||||
maximum_age = options["maximum-age"]
|
||||
self.info(f"Found maximum-age option (max age in days): {maximum_age}")
|
||||
assert maximum_age is None or isinstance(maximum_age, int), (
|
||||
f"Option maximum-age must be None or an integer,"
|
||||
f" but got: {type(maximum_age).__name__} ({maximum_age})"
|
||||
)
|
||||
assert maximum_age is None or maximum_age > 0, (
|
||||
f"Option maximum-age is zero, which doesn't make sense."
|
||||
)
|
||||
self.__maximum_age = maximum_age
|
||||
else:
|
||||
self.warning(
|
||||
f"No maximum-age option found; Will use default: {self.__maximum_age}"
|
||||
)
|
||||
|
||||
assert "target-type" in options.keys(), (
|
||||
f"Option target-type is required"
|
||||
)
|
||||
target_type = options["target-type"]
|
||||
self.info(f"Found target-type option: {target_type}")
|
||||
assert isinstance(target_type, str), (
|
||||
f"Option target-type must be str, but got: {target_type}"
|
||||
)
|
||||
assert target_type in self.__VALID_TARGET_TYPES, (
|
||||
f"Option target-type must be one of: {self.__VALID_TARGET_TYPES}"
|
||||
)
|
||||
self.__target_type = target_type
|
||||
|
||||
if "date-detection" in options.keys():
|
||||
date_detection = options["date-detection"]
|
||||
self.info(f"Found date-detection option: {date_detection}")
|
||||
assert isinstance(date_detection, str), (
|
||||
f"Option date-detection must be str, but got: {date_detection}"
|
||||
)
|
||||
assert date_detection in self.__VALID_DATE_DETECTION_TYPES, (
|
||||
f"Option date-detection must be one of: {self.__VALID_DATE_DETECTION_TYPES}"
|
||||
)
|
||||
self.__date_detection = date_detection
|
||||
else:
|
||||
self.error(
|
||||
f"Option date-detection not found; Will use default: {self.__date_detection}"
|
||||
)
|
||||
raise AssertionError(
|
||||
f"Option date-detection is required."
|
||||
)
|
||||
|
||||
else:
|
||||
self.error(f"No options key found!")
|
||||
raise AssertionError(f"No options key found!")
|
||||
|
||||
assert "paths" in self.__data, (
|
||||
f"Could not find 'paths' key"
|
||||
)
|
||||
rotatable_paths = self.__data["paths"]
|
||||
if isinstance(rotatable_paths, str):
|
||||
rotatable_paths = [rotatable_paths]
|
||||
assert isinstance(rotatable_paths, list), (
|
||||
"Rotatable 'paths' key must be a string or list"
|
||||
)
|
||||
for i in range(len(rotatable_paths)):
|
||||
p = rotatable_paths[i]
|
||||
if isinstance(p, Path):
|
||||
continue
|
||||
elif isinstance(p, str):
|
||||
rotatable_paths[i] = Path(p)
|
||||
else:
|
||||
raise AssertionError(
|
||||
f"All rotatable paths must be strings or pathlib::Path objects"
|
||||
)
|
||||
|
||||
self.__rotatable_paths = rotatable_paths
|
||||
self.info(f"Found {len(self.__rotatable_paths)} rotatable paths")
|
||||
|
||||
except KeyError as e:
|
||||
|
||||
self.error(
|
||||
f"Failed to load config due to KeyError"
|
||||
f"\nFile: {self.__path}"
|
||||
f"\nError: {str(e)}"
|
||||
)
|
||||
raise e
|
||||
|
||||
except AssertionError as e:
|
||||
|
||||
self.error(
|
||||
f"Failed to load config due to AssertionError"
|
||||
f"\nFile: {self.__path}"
|
||||
f"\nError: {str(e)}"
|
||||
)
|
||||
raise e
|
||||
|
||||
def debug(self, s):
|
||||
self.__logger.debug(f"({self.__path.name}) {s}")
|
||||
|
||||
def info(self, s):
|
||||
self.__logger.info(f"({self.__path.name}) {s}")
|
||||
|
||||
def warning(self, s):
|
||||
self.__logger.warning(f"({self.__path.name}) {s}")
|
||||
|
||||
def error(self, s):
|
||||
self.__logger.error(f"({self.__path.name}) {s}")
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
return str(self.__path)
|
||||
|
||||
@property
|
||||
def path(self) -> Path:
|
||||
return self.__path
|
||||
|
||||
@property
|
||||
def data(self) -> dict:
|
||||
return self.__data
|
||||
|
||||
@property
|
||||
def dry_run(self) -> bool:
|
||||
return self.__dry_run
|
||||
|
||||
@dry_run.setter
|
||||
def dry_run(self, b: bool):
|
||||
self.__dry_run = b
|
||||
|
||||
@property
|
||||
def target_type(self) -> str:
|
||||
return self.__target_type
|
||||
|
||||
@property
|
||||
def date_detection(self) -> str:
|
||||
return self.__date_detection
|
||||
|
||||
@property
|
||||
def rotatable_paths(self) -> [Path]:
|
||||
return self.__rotatable_paths
|
||||
|
||||
@property
|
||||
def minimum_items(self) -> int:
|
||||
return self.__minimum_items
|
||||
|
||||
@property
|
||||
def maximum_items(self) -> int:
|
||||
return self.__maximum_items
|
||||
|
||||
@property
|
||||
def maximum_age(self) -> int:
|
||||
return self.__maximum_age
|
@ -1,48 +1,38 @@
|
||||
|
||||
|
||||
from domain.Logger import Logger
|
||||
|
||||
import os
|
||||
from domain.Util import Util
|
||||
|
||||
|
||||
class Config:
|
||||
# import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Scanner:
|
||||
|
||||
__DEFAULT_VALID_EXTENSIONS = [
|
||||
"yaml",
|
||||
"yml"
|
||||
]
|
||||
|
||||
def __init__(self, logger):
|
||||
def __init__(self, logger: Logger):
|
||||
|
||||
self.__logger = logger
|
||||
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
|
||||
|
||||
def debug(self, s):
|
||||
self.__logger.debug(f"[{type(self).__name__}] {s}")
|
||||
|
||||
def info(self, s):
|
||||
self.__logger.info(f"[{type(self).__name__}] {s}")
|
||||
|
||||
def warn(self, s):
|
||||
self.__logger.warn(f"[{type(self).__name__}] {s}")
|
||||
self.__logger.warning(f"[{type(self).__name__}] {s}")
|
||||
|
||||
def error(self, s):
|
||||
self.__logger.error(f"[{type(self).__name__}] {s}")
|
||||
|
||||
@staticmethod
|
||||
def get_dir_files_recursive(path: str):
|
||||
|
||||
files_paths = []
|
||||
|
||||
for dir_path, dirnames, filenames in os.walk(path):
|
||||
|
||||
for file_name in filenames:
|
||||
|
||||
file_path = os.path.join(dir_path, file_name)
|
||||
files_paths.append(file_path)
|
||||
# print("Uhm yeah", dir_path, "--", dirnames, "--", file_name)
|
||||
# print("==>", file_path)
|
||||
|
||||
return files_paths
|
||||
|
||||
def gather_valid_configs(self, paths: list=None):
|
||||
def gather_valid_config_paths(self, paths: list = None) -> [Path]:
|
||||
|
||||
assert paths is not None, "Config paths cannot be None"
|
||||
assert len(paths) > 0, "Must provide at least one config file path"
|
||||
@ -50,64 +40,81 @@ class Config:
|
||||
self.info("Gathering valid configs")
|
||||
|
||||
file_paths = []
|
||||
configs = []
|
||||
not_configs = []
|
||||
config_paths = []
|
||||
not_config_paths = []
|
||||
|
||||
# First gather all files that are potential configs
|
||||
for path in paths:
|
||||
for path_str in paths:
|
||||
|
||||
path = Path(path_str)
|
||||
|
||||
self.info(f"Inspecting path: {path}")
|
||||
|
||||
if os.path.isfile(path):
|
||||
self.debug(f"Path is a file; Adding directly to potential config candidates: {path}")
|
||||
if not path.exists():
|
||||
|
||||
self.error(f"Path doesn't exist: {path}")
|
||||
|
||||
if path.is_file():
|
||||
|
||||
self.debug(
|
||||
f"Path is a file; Adding directly to potential config candidates: {path}"
|
||||
)
|
||||
file_paths.append(path)
|
||||
|
||||
elif os.path.isdir(path):
|
||||
self.debug(f"Path is a dir; Scanning recursively for potential config candidate files: {path}")
|
||||
for file_path in Config.get_dir_files_recursive(path=path):
|
||||
elif path.is_dir():
|
||||
|
||||
self.debug(
|
||||
f"Path is a dir;"
|
||||
" Scanning recursively for potential config candidate files: {path}"
|
||||
)
|
||||
|
||||
for file_path in Util.get_dir_files_recursive(path=path):
|
||||
self.info(f"> Candidate file: {file_path}")
|
||||
file_paths.append(file_path)
|
||||
|
||||
else:
|
||||
raise AssertionError(f"Don't know how to handle path that isn't a file or dir: {path}")
|
||||
raise AssertionError(
|
||||
f"Don't know how to handle path that isn't a file or dir: {path}"
|
||||
)
|
||||
|
||||
# Now, filter for files with valid YAML extensions
|
||||
for file_path in file_paths:
|
||||
|
||||
if self.check_file_extension(file_path=file_path, extensions=None):
|
||||
configs.append(file_path)
|
||||
config_paths.append(file_path)
|
||||
else:
|
||||
not_configs.append(file_path)
|
||||
not_config_paths.append(file_path)
|
||||
|
||||
self.info("Filtered out non-config files:")
|
||||
if len(not_configs) > 0:
|
||||
for not_config in not_configs:
|
||||
self.info(f"> {not_config}")
|
||||
if len(not_config_paths) > 0:
|
||||
for not_config_path in not_config_paths:
|
||||
self.info(f"> {not_config_path}")
|
||||
else:
|
||||
self.info("> [none]")
|
||||
|
||||
self.info("Kept config-looking files:")
|
||||
if len(configs) > 0:
|
||||
for config in configs:
|
||||
self.info(f"> {config}")
|
||||
if len(config_paths) > 0:
|
||||
for config_path in config_paths:
|
||||
self.info(f"> {config_path}")
|
||||
else:
|
||||
self.info("> [none]")
|
||||
|
||||
return configs
|
||||
return config_paths
|
||||
|
||||
def check_file_extension(self, file_path, extensions: list=None):
|
||||
def check_file_extension(self, file_path: Path, extensions: list = None) -> bool:
|
||||
|
||||
if extensions is None:
|
||||
extensions = self.__valid_extensions
|
||||
|
||||
file_name, file_extension = os.path.splitext(file_path)
|
||||
file_extension = file_path.suffix
|
||||
|
||||
# Strip preceding dot from extension
|
||||
if len(file_extension) > 0 and file_extension[0] == ".":
|
||||
file_extension = file_extension[1:]
|
||||
file_extension = file_extension.lower()
|
||||
|
||||
|
||||
for valid_extension in extensions:
|
||||
#print(file_name, "---", file_extension, "---", valid_extension)
|
||||
if file_extension == valid_extension:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
60
main.py
60
main.py
@ -21,30 +21,76 @@ def main():
|
||||
help="Verbose/Debug logging mode"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--systemd",
|
||||
default=False,
|
||||
dest="systemd",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Pass if this program will be spawned inside systemd"
|
||||
" or another system that already adds timestamps to log messages."
|
||||
)
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--syslog", "--write-to-syslog",
|
||||
default=False,
|
||||
dest="write_to_syslog",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Pass if you'd like this program to write to syslog."
|
||||
)
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--no-test-logs",
|
||||
default=False,
|
||||
dest="do_test_logs",
|
||||
action="store_false",
|
||||
help=(
|
||||
"Pass if you do not want to see test logs for all log levels."
|
||||
)
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test-logs",
|
||||
default=True,
|
||||
dest="do_test_logs",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Pass if you want to see test logs for all log levels."
|
||||
)
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--config", "-c",
|
||||
dest="config_files",
|
||||
dest="config_paths",
|
||||
default=[],
|
||||
action="append",
|
||||
type=str,
|
||||
help="Specify a configuration file. Can be called multiple times."
|
||||
help="Specify a configuration file or configuration directory. Can be called multiple times."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", "-d",
|
||||
dest="dry_run",
|
||||
dest="global_dry_run",
|
||||
default=False,
|
||||
action="store_true",
|
||||
help="Only perform an analysis; Don't delete anything."
|
||||
help=(
|
||||
"Only perform an analysis;"
|
||||
" Don't delete anything no matter what configs say (configs can specify dry run, too)."
|
||||
)
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
rotator = BackupRotator(
|
||||
debug=args.debug
|
||||
config_paths=args.config_paths,
|
||||
debug=args.debug,
|
||||
systemd=args.systemd,
|
||||
write_to_syslog=args.write_to_syslog,
|
||||
do_test_logs=args.do_test_logs,
|
||||
)
|
||||
rotator.run(
|
||||
configs=args.config_files,
|
||||
dry_run=args.dry_run
|
||||
global_dry_run=args.global_dry_run
|
||||
)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user