mikes-backup-rotator/domain/BackupRotator.py

378 lines
10 KiB
Python
Raw Normal View History

2019-08-30 16:14:18 -07:00
#!/usr/bin/env python3
2019-02-27 15:10:35 -08:00
"""
Mike's Backup Rotator
A simple script to help automatically rotate backup files
2023-03-27 18:15:11 -07:00
Copyright 2023 Mike Peralta; All rights reserved
2023-03-27 18:15:11 -07:00
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
"""
2019-02-27 15:10:35 -08:00
from domain.Logger import Logger
from domain.Config import Config
import datetime
import os
# import pprint
2019-02-27 16:51:43 -08:00
import shutil
import sys
2022-01-31 14:35:48 -08:00
import time
import yaml
2019-02-27 15:10:35 -08:00
class BackupRotator:
2023-03-27 19:59:45 -07:00
def __init__(self, debug:bool = False):
2019-02-27 15:10:35 -08:00
2023-03-27 19:59:45 -07:00
self.__logger = Logger(name=type(self).__name__, debug=debug)
self.__config_helper = Config(logger=self.__logger)
self.__dry_run = False
self.__configs = []
self.__config_paths = []
self.__calculated_actions = []
2019-02-27 15:10:35 -08:00
def run(self, configs, dry_run: bool = False):
2019-02-27 15:10:35 -08:00
self.info("Begin")
self.__dry_run = dry_run
self.__config_paths = configs
2022-01-31 14:35:48 -08:00
self._consume_configs(self.__config_paths)
# Rotate once per config
for config_index in range(len(self.__configs)):
#
config = self.__configs[config_index]
#
2023-03-27 20:27:35 -07:00
self.info(f"Rotating for config {config_index + 1} of {len(self.__configs)} : {config['__path']}")
2022-01-31 14:35:48 -08:00
self._do_rotate(config)
@staticmethod
def current_time():
now = datetime.datetime.now()
now_s = now.strftime("%b-%d-%Y %I:%M%p")
return str(now_s)
2023-03-27 19:59:45 -07:00
def debug(self, s):
self.__logger.debug(s)
def info(self, s):
self.__logger.info(s)
def warn(self, s):
self.__logger.warn(s)
def error(self, s):
self.__logger.error(s)
2022-01-31 14:35:48 -08:00
def _consume_configs(self, paths: list=None):
2023-03-27 19:59:45 -07:00
configs = self.__config_helper.gather_valid_configs(paths=paths)
for config in configs:
self._consume_config(path=config)
2022-01-31 14:35:48 -08:00
def _consume_config(self, path: str):
# Open the file
f = open(path)
if not f:
raise Exception("Unable to open config file: " + path)
# Parse
config = yaml.safe_load(f)
# Add its own path
config["__path"] = path
# Consume to internal
self.__configs.append(config)
2023-05-29 13:25:03 -07:00
self.info(f"Consumed config from path: {path}")
2022-01-31 14:35:48 -08:00
def _do_rotate(self, config):
2022-01-31 14:35:48 -08:00
self._rotate_paths(config)
2022-01-31 14:35:48 -08:00
def _rotate_paths(self, config):
self.info("Begin rotating " + str(len(config["paths"])) + " paths")
for path in config["paths"]:
2022-01-31 14:35:48 -08:00
self._rotate_path(config, path)
2022-01-31 14:35:48 -08:00
def _rotate_path(self, config, path):
2022-01-31 14:35:48 -08:00
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.info("Rotating path: {}".format(path))
2022-01-31 14:35:48 -08:00
found_any_rotation_keys = False
if "maximum-items" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"])
if "maximum-age" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"])
assert found_any_rotation_keys is True, \
"Config needs one of the following keys: \"maximum-items\""
def _rotate_path_for_maximum_items(self, config, path: str, max_items: int):
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.info("Rotating path for a maximum of {} items: {}".format(
max_items, path
))
2022-01-31 14:35:48 -08:00
children = self._gather_rotation_candidates(config, path)
2023-03-27 19:59:45 -07:00
minimum_items = self._determine_minimum_items(config)
# Do we need to rotate anything out?
if len(children) < minimum_items:
self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
len(children), minimum_items
))
2023-03-26 03:38:27 -07:00
return
elif len(children) <= max_items:
self.info("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format(
len(children), max_items
))
return
self.info("Found {} items to examine".format(len(children)))
#
maximum_purge_count = len(children) - minimum_items
purge_count = len(children) - max_items
self.info("Want to purge {} items".format(purge_count))
if purge_count > maximum_purge_count:
self.info("Reducing purge count from {} to {} items to respect minimum items setting ({})".format(
purge_count, maximum_purge_count, minimum_items
))
purge_count = maximum_purge_count
children_to_purge = []
for purge_index in range(purge_count):
#
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children)
children.remove(item_to_purge)
self.info("Found next item to purge: ({}) {} ({})".format(
2022-01-31 14:35:48 -08:00
purge_index + 1,
os.path.basename(item_to_purge),
item_age
2022-01-31 14:35:48 -08:00
))
#
children_to_purge.append(item_to_purge)
#
self.info("Removing items")
for child_to_purge in children_to_purge:
child_basename = os.path.basename(child_to_purge)
self._remove_item(config, child_to_purge)
2022-01-31 14:35:48 -08:00
def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int):
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.info("Rotating path for max age of {} days: {}".format(max_age_days, path))
2022-01-31 14:35:48 -08:00
children = self._gather_rotation_candidates(config, path)
2023-03-26 03:38:27 -07:00
minimum_items = self._determine_minimum_items(config)
# Do we need to rotate anything out?
if len(children) < minimum_items:
self.info("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
2023-03-26 03:38:27 -07:00
len(children), minimum_items
))
return
2022-01-31 14:35:48 -08:00
self.info("Examining {} items for deletion".format(len(children)))
2022-01-31 14:35:48 -08:00
children_to_delete = []
for child in children:
age_seconds = self._detect_item_age_seconds(config, child)
age_days = self._detect_item_age_days(config, child)
age_formatted = self.seconds_to_time_string(age_seconds)
child_basename = os.path.basename(child)
if age_days > max_age_days:
self.info("[Old enough ] {} ({})".format(
2022-01-31 14:35:48 -08:00
child_basename, age_formatted
))
children_to_delete.append(child)
else:
self.info("[Not Old enough] {} ({})".format(
2022-01-31 14:35:48 -08:00
child_basename, age_formatted
))
2023-03-27 20:27:35 -07:00
if len(children_to_delete) > 0:
self.info("Removing old items ...")
for child_to_delete in children_to_delete:
basename = os.path.basename(child_to_delete)
self._remove_item(config, child_to_delete)
else:
self.info("No old items to remove")
2023-03-27 20:27:35 -07:00
2022-01-31 14:35:48 -08:00
@staticmethod
2022-01-31 14:35:48 -08:00
def _gather_rotation_candidates(config, path):
candidates = []
if "target-type" not in config.keys():
raise Exception("Please provide the configuration key: target-type")
for item_name in os.listdir(path):
item_path = os.path.join(path, item_name)
if config["target-type"] == "file":
if not os.path.isfile(item_path):
continue
elif config["target-type"] == "directory":
if not os.path.isdir(item_path):
continue
else:
raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"")
candidates.append(item_path)
return candidates
2022-01-31 14:35:48 -08:00
def _pick_oldest_item(self, config, items):
best_item = None
best_ctime = None
for item in items:
2022-01-31 14:35:48 -08:00
ctime = self._detect_item_date(config, item)
if best_ctime is None or ctime < best_ctime:
best_ctime = ctime
best_item = item
age_seconds = self._detect_item_age_seconds(config, best_item)
age_string = self.seconds_to_time_string(age_seconds)
return best_item, best_ctime, age_seconds, age_string
2022-01-31 14:35:48 -08:00
@staticmethod
def _detect_item_date(config, item):
assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\""
detection = config["date-detection"]
if detection == "file":
ctime = os.path.getctime(item)
else:
2024-04-21 02:07:40 -07:00
raise AssertionError(f"Invalid value for \"date-detection\"; Should be one of [file]: {detection}")
2022-01-31 14:35:48 -08:00
return ctime
def _detect_item_age_seconds(self, config, item):
now = time.time()
ctime = self._detect_item_date(config, item)
delta = now - ctime
return delta
def _detect_item_age_days(self, config, item):
age_seconds = self._detect_item_age_seconds(config, item)
age_days = int(age_seconds / 86400)
return age_days
def seconds_to_time_string(self, seconds: float):
if isinstance(seconds, float):
pass
elif isinstance(seconds, int):
seconds = float * 1.0
else:
raise AssertionError("Seconds must be an int or float")
# Map
map = {
"year": 31536000.0,
"month": 2592000.0,
"week": 604800.0,
"day": 86400.0,
"hour": 3600.0,
"minute": 60.0,
"second": 1.0
}
s_parts = []
for unit_label in map.keys():
unit_seconds = map[unit_label]
if seconds >= unit_seconds:
unit_count = int(seconds / unit_seconds)
s_parts.append("{} {}{}".format(
unit_count, unit_label,
"" if unit_count == 1 else "s"
))
seconds -= unit_seconds * unit_count
s = ", ".join(s_parts)
return s
def _remove_item(self, config, path):
if os.path.isfile(path):
self._remove_file(config, path)
elif os.path.isdir(path):
self._remove_directory(config, path)
else:
raise AssertionError("Don't know how to remove this item: {}".format(path))
def _remove_file(self, config, file_path):
if not os.path.isfile(file_path):
raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path))
if self.__dry_run:
2023-03-27 20:27:35 -07:00
self.info(f"Won't purge file during global-level dry run: {file_path}")
elif "dry-run" in config.keys() and config["dry-run"] is True:
2023-03-27 20:27:35 -07:00
self.info(f"Won't purge file during config-level dry run: {file_path}")
else:
2023-03-27 20:27:35 -07:00
self.info(f"Purging file: {file_path}")
os.remove(file_path)
2022-01-31 14:35:48 -08:00
def _remove_directory(self, config, dir_path):
if not os.path.isdir(dir_path):
raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))
if self.__dry_run:
2023-03-27 20:27:35 -07:00
self.info(f"Won't purge directory during global-level dry run: {dir_path}")
elif "dry-run" in config.keys() and config["dry-run"] is True:
2023-03-27 20:27:35 -07:00
self.info(f"Won't purge directory during config-level dry run: {dir_path}")
else:
2023-03-27 20:27:35 -07:00
self.info(f"Purging directory: {dir_path}")
shutil.rmtree(dir_path)
def _determine_minimum_items(self, config):
minimum_items = 0
if "minimum-items" in config.keys():
minimum_items = config["minimum-items"]
self.info("Won't delete anything unless a minimum of {} items were found".format(minimum_items))
else:
self.info("No value found for \"minimum-items\"; Will not enforce minimum item constraint.")
return minimum_items