From 5d2e93ca410adfbac041dc98012fc86c1b142419 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 1 Feb 2022 04:05:48 +0530 Subject: [PATCH] Upgrade: Can now delete based on age --- BackupRotator.py | 208 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 159 insertions(+), 49 deletions(-) diff --git a/BackupRotator.py b/BackupRotator.py index 16b4c4d..357d83e 100755 --- a/BackupRotator.py +++ b/BackupRotator.py @@ -6,7 +6,7 @@ Mike's Backup Rotator A simple script to help automatically rotate backup files -Copyright 2019 Mike Peralta; All rights reserved +Copyright 2022 Mike Peralta; All rights reserved Released under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more) @@ -17,6 +17,7 @@ import os import shutil import sys import syslog +import time import yaml @@ -36,7 +37,7 @@ class BackupRotator: self.__dry_run = dry_run self.__config_paths = configs - self.consume_configs(self.__config_paths) + self._consume_configs(self.__config_paths) # Rotate once per config for config_index in range(len(self.__configs)): @@ -46,7 +47,7 @@ class BackupRotator: # self.log("Rotating for config " + str(config_index + 1) + " of " + str(len(self.__configs)), config["__path"]) - self.do_rotate(config) + self._do_rotate(config) @staticmethod def current_time(): @@ -67,7 +68,7 @@ class BackupRotator: print(to_log) - def consume_configs(self, paths: list=None): + def _consume_configs(self, paths: list=None): assert paths is not None, "Config paths cannot be None" assert len(paths) > 0, "Must provide at least one config file path" @@ -77,16 +78,16 @@ class BackupRotator: # If this is a single path if os.path.isfile(path): - self.consume_config(path) + self._consume_config(path) # If this is a directory elif os.path.isdir(path): # Iterate over each file inside for file_name in os.listdir(path): - self.consume_config(os.path.join(path, file_name)) + self._consume_config(os.path.join(path, file_name)) - def consume_config(self, path: str): + def _consume_config(self, path: str): # Open the file f = open(path) @@ -103,60 +104,103 @@ class BackupRotator: self.__configs.append(config) self.log("Consumed config from path:", path) - def do_rotate(self, config): + def _do_rotate(self, config): - self.rotate_paths(config) + self._rotate_paths(config) - def rotate_paths(self, config): + def _rotate_paths(self, config): self.log("Begin rotating " + str(len(config["paths"])) + " paths") for path in config["paths"]: - self.rotate_path(config, path) + self._rotate_path(config, path) - def rotate_path(self, config, path): + def _rotate_path(self, config, path): - self.log("Rotating path", path) + assert os.path.isdir(path), "Path should be a directory: {}".format(path) - if "maximum-items" not in config: - raise Exception("Please provide config key: \"maximum-items\"") - max_items = config["maximum-items"] + self.log("Rotating path: {}".format(path)) - if not os.path.isdir(path): - raise Exception("Path should be a directory:" + str(path)) + found_any_rotation_keys = False + if "maximum-items" in config.keys(): + found_any_rotation_keys = True + self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"]) + if "maximum-age" in config.keys(): + found_any_rotation_keys = True + self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"]) + + assert found_any_rotation_keys is True, \ + "Config needs one of the following keys: \"maximum-items\"" + + def _rotate_path_for_maximum_items(self, config, path: str, max_items: int): - children = self.gather_rotation_candidates(config, path) + assert os.path.isdir(path), "Path should be a directory: {}".format(path) + + self.log("Rotating path for maximum items: {}".format(path)) + + children = self._gather_rotation_candidates(config, path) # Do we need to rotate anything out? if len(children) <= max_items: self.log( - "Path only has " + str(len(children)) + " items," - + " but needs more than " + str(max_items) + " for rotation" - + "; Won't rotate this path." + "Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format( + len(children), max_items + ) ) return # purge_count = len(children) - max_items - self.log( - "Need to purge " + str(purge_count) + " items" - ) + self.log("Need to purge {} items".format(purge_count)) for purge_index in range(purge_count): # - item_to_purge = self.pick_item_to_purge(config, children) + item_to_purge, item_ctime = self._pick_oldest_item(config, children) children.remove(item_to_purge) + self.log("Found next item to purge: ({}) {} (ctime: {})".format( + purge_index + 1, + os.path.basename(item_to_purge), item_ctime + )) # - if os.path.isfile(item_to_purge): - self.remove_file(config, item_to_purge) - elif os.path.isdir(item_to_purge): - self.remove_directory(config, item_to_purge) - else: - raise Exception("Don't know how to remove this item: " + str(item_to_purge)) + self._remove_item(config, item_to_purge) + def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int): + + assert os.path.isdir(path), "Path should be a directory: {}".format(path) + + self.log("Rotating path for max age of {} days: {}".format(max_age_days, path)) + + children = self._gather_rotation_candidates(config, path) + + self.log("Examining {} items for deletion") + children_to_delete = [] + for child in children: + + age_seconds = self._detect_item_age_seconds(config, child) + age_days = self._detect_item_age_days(config, child) + age_formatted = self.seconds_to_time_string(age_seconds) + child_basename = os.path.basename(child) + + if age_days > max_age_days: + self.log("Old enough to delete: {} ({})".format( + child_basename, age_formatted + )) + children_to_delete.append(child) + else: + self.log("Not old enough to delete: {} ({})".format( + child_basename, age_formatted + )) + + self.log("Removing old items ...") + for child_to_delete in children_to_delete: + basename = os.path.basename(child_to_delete) + self.log("> {}".format(basename)) + self._remove_item(config, child_to_delete) + + @staticmethod - def gather_rotation_candidates(config, path): + def _gather_rotation_candidates(config, path): candidates = [] @@ -180,28 +224,94 @@ class BackupRotator: return candidates - @staticmethod - def pick_item_to_purge(config, items): + def _pick_oldest_item(self, config, items): - if "date-detection" not in config.keys(): - raise Exception("Please provide config key: \"date-detection\"") - - detection = config["date-detection"] best_item = None best_ctime = None for item in items: - if detection == "file": - ctime = os.path.getctime(item) - if best_ctime is None or ctime < best_ctime: - best_ctime = ctime - best_item = item - else: - raise Exception("Invalid value for \"date-detection\": " + str(detection)) + ctime = self._detect_item_date(config, item) + if best_ctime is None or ctime < best_ctime: + best_ctime = ctime + best_item = item - return best_item + return best_item, best_ctime - def remove_file(self, config, file_path): + @staticmethod + def _detect_item_date(config, item): + + assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\"" + detection = config["date-detection"] + + if detection == "file": + ctime = os.path.getctime(item) + else: + raise AssertionError("Invalid value for \"date-detection\"; Should be one of {file}: {}".format( + detection + )) + + return ctime + + def _detect_item_age_seconds(self, config, item): + + now = time.time() + ctime = self._detect_item_date(config, item) + delta = now - ctime + + return delta + + def _detect_item_age_days(self, config, item): + + age_seconds = self._detect_item_age_seconds(config, item) + age_days = int(age_seconds / 86400) + + return age_days + + def seconds_to_time_string(self, seconds: float): + + if isinstance(seconds, float): + pass + elif isinstance(seconds, int): + seconds = float * 1.0 + else: + raise AssertionError("Seconds must be an int or float") + + # Map + map = { + "year": 31536000.0, + "month": 2592000.0, + "week": 604800.0, + "day": 86400.0, + "hour": 3600.0, + "minute": 60.0, + "second": 1.0 + } + + s_parts = [] + for unit_label in map.keys(): + unit_seconds = map[unit_label] + if seconds >= unit_seconds: + unit_count = int(seconds / unit_seconds) + s_parts.append("{} {}{}".format( + unit_count, unit_label, + "" if unit_count == 1 else "s" + )) + seconds -= unit_seconds * unit_count + + s = ", ".join(s_parts) + + return s + + def _remove_item(self, config, path): + + if os.path.isfile(path): + self._remove_file(config, path) + elif os.path.isdir(path): + self._remove_directory(config, path) + else: + raise AssertionError("Don't know how to remove this item: {}".format(path)) + + def _remove_file(self, config, file_path): if not os.path.isfile(file_path): raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path)) @@ -214,7 +324,7 @@ class BackupRotator: self.log("Purging file:", file_path) os.remove(file_path) - def remove_directory(self, config, dir_path): + def _remove_directory(self, config, dir_path): if not os.path.isdir(dir_path): raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))