#!/usr/env python3 import datetime import os import shutil import sys import yaml class BackupRotator: def __init__(self): self.__config = None self.__config_path = None def run(self): self.log("Begin") self.consume_arguments() self.consume_config(self.__config_path) self.rotate_paths() @staticmethod def current_time(): now = datetime.datetime.now() now_s = now.strftime("%b-%d-%Y %I:%M%p") return str(now_s) def log(self, s, o=None): now = self.current_time() to_log = "[" + now + "][Backup Rotator] " + str(s) if o is not None: to_log += " " + str(o) print(to_log) def consume_arguments(self): for i in range(1, len(sys.argv)): arg = sys.argv[i] if arg == "--config": i, self.__config_path = self.consume_argument_companion(i) print("Found config path:", self.__config_path) @staticmethod def consume_argument_companion(arg_index): companion_index = arg_index + 1 if companion_index >= len(sys.argv): raise Exception("Expected argument after", sys.argv[arg_index]) return companion_index, sys.argv[companion_index] def consume_config(self, path=None): if path is None: raise Exception("Auto-finding of config file not implemented") f = open(path) self.__config = yaml.load(f) self.log("Consumed config from path: ", path) def rotate_paths(self): self.log("Begin rotating " + str(len(self.__config["paths"])) + " paths") for path in self.__config["paths"]: self.rotate_path(path) def rotate_path(self, path): self.log("Rotating path", path) if "maximum-items" not in self.__config: raise Exception("Please provide config key: \"maximum-items\"") max_items = self.__config["maximum-items"] if not os.path.isdir(path): raise Exception("Path should be a directory:" + str(path)) children = self.gather_rotation_candidates(path) # Do we need to rotate anything out? if len(children) <= max_items: self.log( "Path only has " + str(len(children)) + " items," + " but needs " + str(max_items) + " for rotation" + "; Won't rotate this path." ) return # purge_count = len(children) - max_items self.log( "Need to purge " + str(purge_count) + " items" ) for purge_index in range(purge_count): item_to_purge = self.pick_item_to_purge(children) children.remove(item_to_purge) self.log("Purging item:", item_to_purge) if os.path.isfile(item_to_purge): os.remove(item_to_purge) elif os.path.isdir(item_to_purge): shutil.rmtree(item_to_purge) else: raise Exception("Don't know how to remove this item: " + str(item_to_purge)) def gather_rotation_candidates(self, path): candidates = [] if "target-type" not in self.__config.keys(): raise Exception("Please provide the configuration key: target-type") for item_name in os.listdir(path): item_path = os.path.join(path, item_name) if self.__config["target-type"] == "file": if not os.path.isfile(item_path): continue elif self.__config["target-type"] == "directory": if not os.path.isdir(item_path): continue else: raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"") candidates.append(item_path) return candidates def pick_item_to_purge(self, items): if "date-detection" not in self.__config.keys(): raise Exception("Please provide config key: \"date-detection\"") detection = self.__config["date-detection"] best_item = None best_ctime = None for item in items: if detection == "file": ctime = os.path.getctime(item) if best_ctime is None or ctime < best_ctime: best_ctime = ctime best_item = item else: raise Exception("Invalid value for \"date-detection\": " + str(detection)) return best_item def main(): rotator = BackupRotator() rotator.run() if __name__ == "__main__": main()