41 Commits

Author SHA1 Message Date
403531d7f0 logging tweaks 2024-06-09 10:33:50 -07:00
bd088ac545 Probably fixed the mod time issue 2024-06-09 10:25:03 -07:00
d2af261477 ctime now working? 2024-06-09 09:57:03 -07:00
5828a0363f guidance 2024-06-09 09:54:10 -07:00
b2261a3c49 guidance 2024-06-09 09:50:12 -07:00
6ceaa89dad more config guidance 2024-06-09 09:43:21 -07:00
2f706bacf6 Bug squashing 2024-06-09 09:41:00 -07:00
dbbe3b88af Lots of refactoring, trying to get a proper Config class going 2024-06-09 09:22:48 -07:00
b8aff6429f log tweak 2024-06-09 07:54:19 -07:00
33c4233797 Lots of refactoring, trying to upgrade all str.format calls and switch to pathlib 2024-06-09 07:50:34 -07:00
1564f4cf8a improve logging a bit 2024-06-09 06:51:51 -07:00
2e6d268de0 Move a method into a new static Util class 2024-06-09 06:43:02 -07:00
bfafd890d0 tweak logging 2024-06-09 06:42:28 -07:00
84791c85a7 Start using an init-environment helper and bump pyenv version 2024-06-09 04:32:33 -07:00
cd14dc6158 log tweak 2024-04-21 02:07:40 -07:00
e4d5e1b595 Update pyenv version and relock pipenv 2023-08-18 01:53:51 -07:00
2e4638e448 Typo 2023-05-29 13:25:03 -07:00
916064c153 Bump python version and relock pipenv 2023-05-20 18:24:25 -07:00
ec894014c6 Uhm whoops 2023-03-27 20:27:35 -07:00
d66688eb3c Remove annoying log emissions 2023-03-27 20:17:45 -07:00
5e30c2f7da Further upgrades to Logging 2023-03-27 20:12:37 -07:00
c0769ad0b1 Working on logging 2023-03-27 19:59:45 -07:00
9d17178012 Adding dedicated Logger and Config classes 2023-03-27 19:40:52 -07:00
4f0b29cd3d Start a domain folder 2023-03-27 18:50:11 -07:00
9a2efa9f0a When consuming configs inside a dir, ignore nested dirs and files without valid YAML extensions 2023-03-27 18:44:27 -07:00
30bb98dff0 Update license info 2023-03-27 18:15:11 -07:00
429a4a6712 Hilarious oversights 2023-03-26 03:38:27 -07:00
acfbb90f91 hooks? 2023-03-26 02:26:02 -07:00
9f26c09453 hooks? 2023-03-26 02:25:56 -07:00
248f759d96 hooks? 2023-03-26 02:25:27 -07:00
0125e92a0a hooks? 2023-03-26 02:24:20 -07:00
a59c573174 hooks? 2023-03-26 02:24:05 -07:00
b3687abb62 hooks? 2023-03-26 02:23:07 -07:00
91b2b0d98a hooks? 2023-03-26 02:22:15 -07:00
cf9be50c2a hooks? 2023-03-26 02:21:58 -07:00
5ffe16cd31 hooks? 2023-03-26 02:20:39 -07:00
8e03950102 Bump python version 2023-03-26 02:17:04 -07:00
9aa66d8e50 Bump python version to 3.10 because old one was segfaulting 2022-07-15 03:26:00 -07:00
cfccf4aa70 Uhm specify a pyenv version? 2022-07-10 06:30:55 -07:00
e704930c71 Re-lock pipenv using more specific python version 2022-07-10 06:30:33 -07:00
86aed2d1f1 Bugfix: Bad var name caused wrong files to be deleted in check for max items 2022-02-13 16:06:14 +05:30
13 changed files with 1267 additions and 426 deletions

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.12.4

View File

@ -1,378 +0,0 @@
#!/usr/bin/env python3
"""
Mike's Backup Rotator
A simple script to help automatically rotate backup files
Copyright 2022 Mike Peralta; All rights reserved
Released under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
"""
import datetime
import os
import shutil
import sys
import syslog
import time
import yaml
class BackupRotator:
def __init__(self):
self.__dry_run = False
self.__configs = []
self.__config_paths = []
self.__calculated_actions = []
def run(self, configs, dry_run: bool = False):
self.log("Begin")
self.__dry_run = dry_run
self.__config_paths = configs
self._consume_configs(self.__config_paths)
# Rotate once per config
for config_index in range(len(self.__configs)):
#
config = self.__configs[config_index]
#
self.log("Rotating for config " + str(config_index + 1) + " of " + str(len(self.__configs)), config["__path"])
self._do_rotate(config)
@staticmethod
def current_time():
now = datetime.datetime.now()
now_s = now.strftime("%b-%d-%Y %I:%M%p")
return str(now_s)
def log(self, s, o=None):
now = self.current_time()
to_log = "[" + now + "][Backup Rotator] " + str(s)
if o is not None:
to_log += " " + str(o)
syslog.syslog(to_log)
print(to_log)
def _consume_configs(self, paths: list=None):
assert paths is not None, "Config paths cannot be None"
assert len(paths) > 0, "Must provide at least one config file path"
# Use each config path
for path in paths:
# If this is a single path
if os.path.isfile(path):
self._consume_config(path)
# If this is a directory
elif os.path.isdir(path):
# Iterate over each file inside
for file_name in os.listdir(path):
self._consume_config(os.path.join(path, file_name))
def _consume_config(self, path: str):
# Open the file
f = open(path)
if not f:
raise Exception("Unable to open config file: " + path)
# Parse
config = yaml.safe_load(f)
# Add its own path
config["__path"] = path
# Consume to internal
self.__configs.append(config)
self.log("Consumed config from path:", path)
def _do_rotate(self, config):
self._rotate_paths(config)
def _rotate_paths(self, config):
self.log("Begin rotating " + str(len(config["paths"])) + " paths")
for path in config["paths"]:
self._rotate_path(config, path)
def _rotate_path(self, config, path):
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path: {}".format(path))
found_any_rotation_keys = False
if "maximum-items" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_items(config=config, path=path, max_items=config["maximum-items"])
if "maximum-age" in config.keys():
found_any_rotation_keys = True
self._rotate_path_for_maximum_age(config=config, path=path, max_age_days=config["maximum-age"])
assert found_any_rotation_keys is True, \
"Config needs one of the following keys: \"maximum-items\""
def _rotate_path_for_maximum_items(self, config, path: str, max_items: int):
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path for a maximum of {} items: {}".format(
max_items, path
))
children = self._gather_rotation_candidates(config, path)
minimum_items = self._determine_minimum_items(config)
# Do we need to rotate anything out?
if len(children) < minimum_items:
self.log("Path only has {} items, which does not meet the minimum threshold of {} items. Won't rotate this path.".format(
len(children), minimum_items
))
elif len(children) <= max_items:
self.log("Path only has {} items, but needs more than {} for rotation; Won't rotate this path.".format(
len(children), max_items
))
return
self.log("Found {} items to examine".format(len(children)))
#
maximum_purge_count = len(children) - minimum_items
purge_count = len(children) - max_items
self.log("Want to purge {} items".format(purge_count))
if purge_count > maximum_purge_count:
self.log("Reducing purge count from {} to {} items to respect minimum items setting ({})".format(
purge_count, maximum_purge_count, minimum_items
))
purge_count = maximum_purge_count
children_to_purge = []
for purge_index in range(purge_count):
#
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(config, children)
children.remove(item_to_purge)
self.log("Found next item to purge: ({}) {} ({})".format(
purge_index + 1,
os.path.basename(item_to_purge),
item_age
))
#
children_to_purge.append(item_to_purge)
#
self.log("Removing items")
for child_to_purge in children_to_purge:
child_basename = os.path.basename(child_to_purge)
self._remove_item(config, item_to_purge)
def _rotate_path_for_maximum_age(self, config, path: str, max_age_days: int):
assert os.path.isdir(path), "Path should be a directory: {}".format(path)
self.log("Rotating path for max age of {} days: {}".format(max_age_days, path))
children = self._gather_rotation_candidates(config, path)
self.log("Examining {} items for deletion".format(len(children)))
children_to_delete = []
for child in children:
age_seconds = self._detect_item_age_seconds(config, child)
age_days = self._detect_item_age_days(config, child)
age_formatted = self.seconds_to_time_string(age_seconds)
child_basename = os.path.basename(child)
if age_days > max_age_days:
self.log("[Old enough ] {} ({})".format(
child_basename, age_formatted
))
children_to_delete.append(child)
else:
self.log("[Not Old enough] {} ({})".format(
child_basename, age_formatted
))
if len(children_to_delete) > 0:
self.log("Removing old items ...")
for child_to_delete in children_to_delete:
basename = os.path.basename(child_to_delete)
self._remove_item(config, child_to_delete)
else:
self.log("No old items to remove")
@staticmethod
def _gather_rotation_candidates(config, path):
candidates = []
if "target-type" not in config.keys():
raise Exception("Please provide the configuration key: target-type")
for item_name in os.listdir(path):
item_path = os.path.join(path, item_name)
if config["target-type"] == "file":
if not os.path.isfile(item_path):
continue
elif config["target-type"] == "directory":
if not os.path.isdir(item_path):
continue
else:
raise Exception("Configuration key \"target-type\" must be \"file\" or \"directory\"")
candidates.append(item_path)
return candidates
def _pick_oldest_item(self, config, items):
best_item = None
best_ctime = None
for item in items:
ctime = self._detect_item_date(config, item)
if best_ctime is None or ctime < best_ctime:
best_ctime = ctime
best_item = item
age_seconds = self._detect_item_age_seconds(config, best_item)
age_string = self.seconds_to_time_string(age_seconds)
return best_item, best_ctime, age_seconds, age_string
@staticmethod
def _detect_item_date(config, item):
assert "date-detection" in config.keys(), "Please provide config key: \"date-detection\""
detection = config["date-detection"]
if detection == "file":
ctime = os.path.getctime(item)
else:
raise AssertionError("Invalid value for \"date-detection\"; Should be one of {file}: {}".format(
detection
))
return ctime
def _detect_item_age_seconds(self, config, item):
now = time.time()
ctime = self._detect_item_date(config, item)
delta = now - ctime
return delta
def _detect_item_age_days(self, config, item):
age_seconds = self._detect_item_age_seconds(config, item)
age_days = int(age_seconds / 86400)
return age_days
def seconds_to_time_string(self, seconds: float):
if isinstance(seconds, float):
pass
elif isinstance(seconds, int):
seconds = float * 1.0
else:
raise AssertionError("Seconds must be an int or float")
# Map
map = {
"year": 31536000.0,
"month": 2592000.0,
"week": 604800.0,
"day": 86400.0,
"hour": 3600.0,
"minute": 60.0,
"second": 1.0
}
s_parts = []
for unit_label in map.keys():
unit_seconds = map[unit_label]
if seconds >= unit_seconds:
unit_count = int(seconds / unit_seconds)
s_parts.append("{} {}{}".format(
unit_count, unit_label,
"" if unit_count == 1 else "s"
))
seconds -= unit_seconds * unit_count
s = ", ".join(s_parts)
return s
def _remove_item(self, config, path):
if os.path.isfile(path):
self._remove_file(config, path)
elif os.path.isdir(path):
self._remove_directory(config, path)
else:
raise AssertionError("Don't know how to remove this item: {}".format(path))
def _remove_file(self, config, file_path):
if not os.path.isfile(file_path):
raise Exception("Tried to remove a file, but this path isn't a file: " + str(file_path))
if self.__dry_run:
self.log("Won't purge file during global-level dry run: ", file_path)
elif "dry-run" in config.keys() and config["dry-run"] is True:
self.log("Won't purge file during config-level dry run: ", file_path)
else:
self.log("Purging file:", file_path)
os.remove(file_path)
def _remove_directory(self, config, dir_path):
if not os.path.isdir(dir_path):
raise Exception("Tried to remove a directory, but this path isn't a directory: " + str(dir_path))
if self.__dry_run:
self.log("Won't purge directory during global-level dry run: ", dir_path)
elif "dry-run" in config.keys() and config["dry-run"] is True:
self.log("Won't purge directory during config-level dry run: ", dir_path)
else:
self.log("Purging directory:", dir_path)
shutil.rmtree(dir_path)
def _determine_minimum_items(self, config):
minimum_items = 0
if "minimum-items" in config.keys():
minimum_items = config["minimum-items"]
self.log("Won't delete anything unless a minimum of {} items were found".format(minimum_items))
else:
self.log("No value found for \"minimum-items\"; Will not enforce minimum item constraint.")
return minimum_items

View File

@ -4,10 +4,12 @@ verify_ssl = true
name = "pypi" name = "pypi"
[packages] [packages]
pyyaml = ">=5.4" #pyyaml = ">=5.4"
pyyaml = "*"
[dev-packages] [dev-packages]
[requires] [requires]
python_version = "3" python_version = "3.12"

91
Pipfile.lock generated
View File

@ -1,11 +1,11 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "ac6a3bf65ec43902d7a8907c3b0ae70e365127a1f93ecf12080f847eadb7dc35" "sha256": "32165f2eba4fd1d8db46f280f7cc3f2c9a5ed5ed87eb9ecc34f26b08ffdee5ac"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
"python_version": "3" "python_version": "3.12"
}, },
"sources": [ "sources": [
{ {
@ -18,42 +18,61 @@
"default": { "default": {
"pyyaml": { "pyyaml": {
"hashes": [ "hashes": [
"sha256:0283c35a6a9fbf047493e3a0ce8d79ef5030852c51e9d911a27badfde0605293", "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5",
"sha256:055d937d65826939cb044fc8c9b08889e8c743fdc6a32b33e2390f66013e449b", "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc",
"sha256:07751360502caac1c067a8132d150cf3d61339af5691fe9e87803040dbc5db57", "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df",
"sha256:0b4624f379dab24d3725ffde76559cff63d9ec94e1736b556dacdfebe5ab6d4b", "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741",
"sha256:0ce82d761c532fe4ec3f87fc45688bdd3a4c1dc5e0b4a19814b9009a29baefd4", "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206",
"sha256:1e4747bc279b4f613a09eb64bba2ba602d8a6664c6ce6396a4d0cd413a50ce07", "sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27",
"sha256:213c60cd50106436cc818accf5baa1aba61c0189ff610f64f4a3e8c6726218ba", "sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595",
"sha256:231710d57adfd809ef5d34183b8ed1eeae3f76459c18fb4a0b373ad56bedcdd9", "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62",
"sha256:277a0ef2981ca40581a47093e9e2d13b3f1fbbeffae064c1d21bfceba2030287", "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98",
"sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513", "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696",
"sha256:40527857252b61eacd1d9af500c3337ba8deb8fc298940291486c465c8b46ec0", "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290",
"sha256:473f9edb243cb1935ab5a084eb238d842fb8f404ed2193a915d1784b5a6b5fc0", "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9",
"sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92", "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d",
"sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f", "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6",
"sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2", "sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867",
"sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc", "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47",
"sha256:819b3830a1543db06c4d4b865e70ded25be52a2e0631ccd2f6a47a2822f2fd7c", "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486",
"sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86", "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6",
"sha256:98c4d36e99714e55cfbaaee6dd5badbc9a1ec339ebfc3b1f52e293aee6bb71a4", "sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3",
"sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c", "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007",
"sha256:9fa600030013c4de8165339db93d182b9431076eb98eb40ee068700c9c813e34", "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938",
"sha256:a80a78046a72361de73f8f395f1f1e49f956c6be882eed58505a15f3e430962b", "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0",
"sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c", "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c",
"sha256:b5b9eccad747aabaaffbc6064800670f0c297e52c12754eb1d976c57e4f74dcb", "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735",
"sha256:c5687b8d43cf58545ade1fe3e055f70eac7a5a1a0bf42824308d868289a95737", "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d",
"sha256:cba8c411ef271aa037d7357a2bc8f9ee8b58b9965831d9e51baf703280dc73d3", "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28",
"sha256:d15a181d1ecd0d4270dc32edb46f7cb7733c7c508857278d3d378d14d606db2d", "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4",
"sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53", "sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba",
"sha256:d4eccecf9adf6fbcc6861a38015c2a64f38b9d94838ac1810a9023a0609e1b78", "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8",
"sha256:d67d839ede4ed1b28a4e8909735fc992a923cdb84e618544973d7dfc71540803", "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef",
"sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a", "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5",
"sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174", "sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd",
"sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5" "sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3",
"sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0",
"sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515",
"sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c",
"sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c",
"sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924",
"sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34",
"sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43",
"sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859",
"sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673",
"sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54",
"sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a",
"sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b",
"sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab",
"sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa",
"sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c",
"sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585",
"sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d",
"sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"
], ],
"index": "pypi", "index": "pypi",
"version": "==6.0" "markers": "python_version >= '3.6'",
"version": "==6.0.1"
} }
}, },
"develop": {} "develop": {}

View File

@ -5,6 +5,12 @@ This program functions somewhat similarly to a log rotator. It's purpose is to r
Suppose you have a third party backup program regularly dropping backup files into some directory. You could use this program to limit the number of files that remain in the directory at any given time. Suppose you have a third party backup program regularly dropping backup files into some directory. You could use this program to limit the number of files that remain in the directory at any given time.
# License
Copyright 2023 Mike Peralta; All rights reserved
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
# Requirements # Requirements
* Python 3 * Python 3
@ -78,11 +84,25 @@ Specifies the method used when attempting to determine how old a backup file/dir
Currently, only *file* is supported Currently, only *file* is supported
### minimum-items < INTEGER >
Specifies the minimum number of backup files/dirs that must be present before rotating can happen. Should be an integer.
This option doesn't specify how much to rotate on its own, but when rotation is possible. It should probably be used with maximum-age or something other than maximum-items.
For example, when the *minimum-items* value is set to 5, and *target-type* is *file*, the program will not rotate any files until there are at least 5 in the target directory.
### maximum-items < INTEGER > ### maximum-items < INTEGER >
Specifies the maximum number of backup files/dirs that are allowed in a path before rotating will happen. Should be an integer. Specifies the maximum number of backup files/dirs that are allowed in a path before rotating will happen. Should be an integer.
For example, when the *maximum-items* value is set to 5, and *target-type* is *file*, the program will not rotate any files until there are at least 5 in the target directory. For example, when the *maximum-items* value is set to 500, and *target-type* is *file*, the program will not rotate any files until there are at least 500 in the target directory.
### maximum-age < INTEGER >
Specifies the maximum age (in days) of backup files/dirs that are allowed in a path before rotating will happen. Should be an integer.
For example, when the *maximum-age* value is set to 30, and *target-type* is *file*, the program will not rotate any files that are newer than 30 days.
### paths < Array of Paths > ### paths < Array of Paths >

408
domain/BackupRotator.py Executable file
View File

@ -0,0 +1,408 @@
#!/usr/bin/env python3
"""
Mike's Backup Rotator
A simple script to help automatically rotate backup files
Copyright 2024 Mike Peralta; All rights reserved
Releasing to the public under the GNU GENERAL PUBLIC LICENSE v3 (See LICENSE file for more)
"""
from domain.config.Config import Config
from domain.config.ConfigFile import ConfigFile
from domain.Logger import Logger
from domain.Util import Util
import datetime
from pathlib import Path
import shutil
class BackupRotator:
def __init__(
self,
config_paths: [Path] = None,
debug: bool = False,
systemd: bool = False,
write_to_syslog: bool = False
):
self.__logger = Logger(
name=type(self).__name__,
debug=debug,
systemd=systemd,
write_to_syslog=write_to_syslog,
)
self.__config = Config(
logger=self.__logger,
config_files_paths=config_paths
)
self.__global_dry_run = True
self.__calculated_actions = []
def run(self, global_dry_run: bool = True):
self.info("Begin rotating")
self.__global_dry_run = global_dry_run
if self.__global_dry_run:
self.info(f"Running as a dry run, globally.")
else:
self.info(f"Won't run as a global dry run.")
# Rotate once per config
config_file_index = -1
for config_file in self.__config.config_files:
config_file: ConfigFile
config_file_index += 1
self.info(
f"Rotating for config {config_file_index + 1} of {len(self.__config.config_files)}"
f" : {config_file.path}"
f"\n{config_file}"
)
self._do_rotate(config_file)
@staticmethod
def current_time():
now = datetime.datetime.now()
now_s = now.strftime("%b-%d-%Y %I:%M%p")
return str(now_s)
def debug(self, s):
self.__logger.debug(s)
def info(self, s):
self.__logger.info(s)
def warn(self, s):
self.__logger.warning(s)
def error(self, s):
self.__logger.error(s)
def _do_rotate(self, config: ConfigFile):
self.info(
f"Rotating for config: {config.path}"
)
if config.dry_run:
self.info(
f"Config {config.path.name} is set for a dry run (no deleting)."
)
else:
self.info(
f"Config {config.path.name} is not set for a dry run (will delete)."
)
self._rotate_paths(config=config)
def _rotate_paths(self, config: ConfigFile):
paths = config.rotatable_paths
self.info(f"Begin rotating {len(paths)} paths")
for path in paths:
path: Path
self._rotate_path(config=config, path=path)
def _rotate_path(self, config: ConfigFile, path: Path):
assert path.is_dir(), (
f"Path should be a directory: {path}"
)
self.info(
f"Rotating path: {path}"
)
self._rotate_path_for_maximum_items(
config=config,
path=path,
)
self._rotate_path_for_maximum_age(
config=config,
path=path,
)
def _rotate_path_for_maximum_items(self, config: ConfigFile, path: Path):
assert path.is_dir(), f"Path should be a directory: {path}"
if config.maximum_items:
self.info(
f"Rotating path for a maximum of {config.maximum_items} items: {path}"
)
else:
self.info(
f"Not configured to rotate for maximum number of items."
)
return
self.info(
f"Will gather rotation candidates for maximum number of items."
)
candidate_items = self._gather_rotation_candidates(config=config, path=path)
minimum_items = self._determine_minimum_items(config=config)
# Do we need to rotate anything out?
if len(candidate_items) < minimum_items:
self.info(
f"Path only has {len(candidate_items)} items"
f", which does not meet the minimum threshold of {minimum_items} items."
" Won't rotate this path."
)
return
elif len(candidate_items) <= config.maximum_items:
self.info(
f"Path only has {len(candidate_items)} items"
f", but needs more than {config.maximum_items} for rotation"
"; Won't rotate this path."
)
return
self.info(f"Found {len(candidate_items)} items to examine")
#
maximum_purge_count = len(candidate_items) - minimum_items
purge_count = len(candidate_items) - config.maximum_items
self.info(
f"Want to purge {purge_count} items to stay under maximum of {config.maximum_items}"
)
if purge_count > maximum_purge_count:
self.info(
f"Reducing purge count from"
f" {purge_count} to {maximum_purge_count} items"
f" to respect minimum items setting ({minimum_items})"
)
purge_count = maximum_purge_count
items_to_purge = []
for purge_index in range(purge_count):
#
item_to_purge, item_ctime, item_age_seconds, item_age = self._pick_oldest_item(
config=config, items=candidate_items
)
item_to_purge: Path
candidate_items.remove(item_to_purge)
self.info(
f"Will purge: ({purge_index + 1})"
f" {item_to_purge.name}"
f" ({item_age})"
)
#
items_to_purge.append(item_to_purge)
#
self.info("Removing items")
for item_to_purge in items_to_purge:
item_to_purge: Path
self.debug(f"Purging item: {item_to_purge.name}")
self._remove_item(config=config, path=item_to_purge)
def _rotate_path_for_maximum_age(self, config: ConfigFile, path: Path):
assert path.is_dir(), f"Path should be a directory: {path}"
if config.maximum_age:
self.info(
f"Rotating path for max age of {config.maximum_age} days: {path}"
)
else:
self.info(
f"Not configured to rotate for a maximum number of days."
)
return
self.info(
f"Will gather rotation candidates for maximum age, in days."
)
candidate_items = self._gather_rotation_candidates(config=config, path=path)
minimum_items = self._determine_minimum_items(config=config)
# Do we need to rotate anything out?
if len(candidate_items) < minimum_items:
self.info(
f"Path only has {len(candidate_items)} items"
f", which does not meet the minimum threshold of {minimum_items} items."
f" Won't rotate this path."
)
return
self.info(
f"Examining {len(candidate_items)} items for deletion"
)
items_to_delete = []
for item in candidate_items:
age_seconds = Util.detect_item_age_seconds(config=config, item=item)
age_days = Util.detect_item_age_days(config=config, item=item)
age_formatted = Util.seconds_to_time_string(age_seconds)
if age_days > config.maximum_age:
self.info(
f"[Old enough ] {item.name} ({age_formatted})"
)
items_to_delete.append(item)
else:
self.info(
f"[Not Old enough] {item.name} ({age_formatted})"
)
if len(items_to_delete) > 0:
self.info("Removing old items ...")
for item in items_to_delete:
self._remove_item(config, item)
else:
self.info("No old items to remove")
def _gather_rotation_candidates(self, config: ConfigFile, path: Path) -> [Path]:
self.debug(f"Begin gathering rotation candidates for: {path}")
candidates: [Path] = []
for item in path.iterdir():
self.debug(f"Found an item: {item.name}")
if config.target_type == "file":
if not item.is_file():
self.debug(f"Not a file; Skipping: {item.name}")
continue
elif config.target_type == "directory":
if not item.is_dir():
self.debug(f"Not a directory; Skipping: {item.name}")
continue
else:
raise Exception(
f"Unsupported target type: {config.target_type}"
)
candidates.append(item)
self.__logger.info(f"Returning {len(candidates)} potential candidates to remove.")
return candidates
def _pick_oldest_item(self, config: ConfigFile, items: [Path]) -> (Path, float, float, str):
best_item = None
best_ctime = None
for item in items:
ctime = Util.detect_item_creation_date(config, item)
if best_ctime is None or ctime < best_ctime:
best_ctime = ctime
best_item = item
age_seconds = Util.detect_item_age_seconds(config, best_item)
age_string = Util.seconds_to_time_string(age_seconds)
return best_item, best_ctime, age_seconds, age_string
def _remove_item(self, config: ConfigFile, path: Path):
if path.is_file():
self._remove_file(config=config, file_path=path)
elif path.is_dir():
self._remove_directory(config=config, dir_path=path)
else:
raise AssertionError(
f"Don't know how to remove this item: {path}"
)
def _remove_file(self, config: ConfigFile, file_path: Path):
if not file_path.is_file():
raise Exception(
f"Tried to remove a file, but this path isn't a file: {file_path}"
)
if self.__global_dry_run:
self.info(f"(Global Dry Run) {file_path}")
elif config.dry_run is True:
self.info(f"(Config Dry Run) {file_path}")
else:
self.info(f"Purging file: {file_path}")
file_path.unlink()
def _remove_directory(self, config: ConfigFile, dir_path: Path):
if not dir_path.is_dir():
raise Exception(
f"Tried to remove a directory"
f", but this path isn't a directory: {dir_path}"
)
if self.__global_dry_run:
self.info(f"(Global Dry Run) {dir_path}")
elif config.dry_run:
self.info(f"(Config Dry Run) {dir_path}")
else:
self.info(f"Purging directory: {dir_path}")
shutil.rmtree(dir_path)
def _determine_minimum_items(self, config) -> int:
minimum_items = 0
if config.minimum_items is not None:
minimum_items = config.minimum_items
self.info(
f"Won't delete anything unless a minimum of {minimum_items} items were found"
)
else:
self.info(
"No minimum number of items specified"
"; Will not enforce minimum item constraint."
)
return minimum_items

94
domain/Logger.py Normal file
View File

@ -0,0 +1,94 @@
import logging
from logging.handlers import SysLogHandler
import sys
class Logger:
def __init__(
self,
name: str,
debug: bool = False,
write_to_syslog: bool = False,
systemd: bool = False
):
self.__name = name
self.__debug = debug
self.__write_to_syslog = write_to_syslog
self.__systemd = systemd
self._init_logger()
def _init_logger(self):
self.__logger = logging.getLogger(self.__name)
if self.__debug:
level = logging.DEBUG
else:
level = logging.INFO
self.__logger.setLevel(level)
formatter = logging.Formatter(
fmt="[{name}][{levelname:<7}] {message}",
style='{'
)
formatter_full = logging.Formatter(
fmt="[{asctime}][{name}][{levelname:<7}] {message}",
style='{'
)
# Console output / stream handler (STDOUT)
handler = logging.StreamHandler(
stream=sys.stdout
)
handler.setLevel(level)
handler.addFilter(lambda entry: entry.levelno <= logging.INFO)
handler.setFormatter(
formatter if self.__systemd else formatter_full
)
self.__logger.addHandler(handler)
# Console output / stream handler (STDERR)
handler = logging.StreamHandler(
stream=sys.stderr
)
handler.setLevel(logging.WARNING)
handler.setFormatter(
formatter if self.__systemd else formatter_full
)
self.__logger.addHandler(handler)
# Syslog handler
if self.__write_to_syslog:
handler = SysLogHandler(
address="/dev/log"
)
handler.setLevel(level)
handler.setFormatter(formatter)
self.__logger.addHandler(handler)
# This is annoying inside cron
self.debug("Test debug log")
self.info("Test info log")
self.warn("Test warn log")
self.error("Test error log")
def debug(self, s):
self.__logger.debug(s)
def info(self, s):
self.__logger.info(s)
def warn(self, s):
self.__logger.warning(s)
def warning(self, s):
self.__logger.warning(s)
def error(self, s):
self.__logger.error(s)

128
domain/Util.py Normal file
View File

@ -0,0 +1,128 @@
from domain.config.ConfigFile import ConfigFile
import datetime
from pathlib import Path
class Util:
def __init__(self):
pass
@staticmethod
def get_dir_files_recursive(path: Path) -> [Path]:
files_paths = []
for dir_path, dirs_names, filenames in path.walk():
for file_name in filenames:
file_path = dir_path / file_name
files_paths.append(file_path)
return files_paths
@staticmethod
def detect_item_creation_date(config: ConfigFile, item: Path) -> datetime.datetime:
stat = None
if config.date_detection == "file":
# Try for the most accurate stat
# First one that raises will just break the block, obv
try:
stat = item.stat().st_ctime
# print("got ctime")
stat = item.stat().st_mtime
# print("got mtime")
stat = item.stat().st_birthtime
# print("got btime")
except AttributeError:
pass
else:
raise AssertionError(
f"Unsupported date-detection option: {config.date_detection}"
)
stamp = datetime.datetime.fromtimestamp(
stat
)
# print("Stat:", stat)
# print("Stamp:", stamp)
# print(item.name, "==>", stamp)
return stamp
@staticmethod
def detect_item_age_seconds(config: ConfigFile, item: Path) -> float:
now = datetime.datetime.now()
ctime = Util.detect_item_creation_date(config=config, item=item)
delta = now - ctime
seconds = delta.seconds
# print(item.name, "==>", seconds, f"({ctime})")
# print(">", "Now was:", now)
# print(">", "ctime was:", ctime)
# print(">", "Delta was:", delta)
# print(">", "Seconds was:", delta.total_seconds())
return delta.total_seconds()
@staticmethod
def detect_item_age_days(config: ConfigFile, item: Path) -> int:
age_seconds = Util.detect_item_age_seconds(
config=config, item=item
)
age_days = int(age_seconds / 86400)
return age_days
@staticmethod
def seconds_to_time_string(seconds: float):
if isinstance(seconds, float):
pass
elif isinstance(seconds, int):
seconds = float(seconds)
else:
raise AssertionError("Seconds must be an int or float")
# Map
dt_map = {
"year": 31536000.0,
"month": 2592000.0,
"week": 604800.0,
"day": 86400.0,
"hour": 3600.0,
"minute": 60.0,
"second": 1.0
}
s_parts = []
for unit_label in dt_map.keys():
unit_seconds = dt_map[unit_label]
if seconds >= unit_seconds:
unit_count = int(seconds / unit_seconds)
unit_plural = "" if unit_count == 1 else "s"
s_parts.append(
f"{unit_count} {unit_label}{unit_plural}"
)
seconds -= unit_seconds * unit_count
s = ", ".join(s_parts)
return s

43
domain/config/Config.py Normal file
View File

@ -0,0 +1,43 @@
from domain.config.ConfigFile import ConfigFile
from domain.config.Scanner import Scanner
from domain.Logger import Logger
from pathlib import Path
class Config:
def __init__(self, logger: Logger, config_files_paths: [Path]):
self.__logger = logger
self.__config_files_paths: [Path] = config_files_paths
self.__configs = {}
self.__scanner = Scanner(
logger=self.__logger
)
self._consume_configs()
def _consume_configs(self):
config_paths = self.__scanner.gather_valid_config_paths(
paths=self.__config_files_paths
)
for config_path in config_paths:
config = ConfigFile(
logger=self.__logger,
path=config_path
)
self.__configs[config.key] = config
@property
def config_files(self) -> [ConfigFile]:
return self.__configs.values()

301
domain/config/ConfigFile.py Normal file
View File

@ -0,0 +1,301 @@
from domain.Logger import Logger
from pathlib import Path
import yaml
class ConfigFile:
__VALID_TARGET_TYPES = [
"file",
"directory"
]
__VALID_DATE_DETECTION_TYPES = [
"file"
]
def __init__(
self, logger: Logger,
path: Path,
):
self.__logger = logger
self.__path = path.absolute()
# noinspection PyTypeChecker
self.__data: dict = None
self.__dry_run: bool = True
# noinspection PyTypeChecker
self.__target_type: str = None
# noinspection PyTypeChecker
self.__date_detection: str = None
self.__rotatable_paths: [Path] = []
self.__minimum_items: int = 0
# noinspection PyTypeChecker
self.__maximum_items: int = None
# noinspection PyTypeChecker
self.__maximum_age: int = None
self._load()
self._consume()
def __str__(self):
s = ""
s += "*** Config File ***"
s += f"\n> Path: {self.__path}"
s += f"\n> Dry run: " + ("Yes" if self.__dry_run else "No")
s += f"\n> Minimum items: {self.__minimum_items}"
s += f"\n> Maximum items: {self.__maximum_items}"
s += f"\n> Maximum age (in days): {self.__maximum_age}"
s += f"\n> Target type: {self.__target_type}"
s += f"\n> Date detection: {self.__date_detection}"
s += f"\n> Rotatable paths: "
if len(self.__rotatable_paths) > 0:
for p in self.__rotatable_paths:
s += f"\n>> {p}"
else:
s += "\n>> [none]"
return s
def _load(self):
self.info(f"Loading config: {self.__path}")
assert self.__path.is_file(), (
f"Cannot load config file because it isn't a file: {self.__path}"
)
# Open the file
self.debug(f"Opening config file for load: {self.__path}")
f = open(str(self.__path))
if not f:
raise Exception(f"Unable to open config file: {self.__path}")
# Load data
self.__data = yaml.safe_load(f)
assert self.__data is not None, (
f"Config file seems to be null or empty: {self.__path}"
)
# Consume to internal
self.info(f"Loaded config from path: {self.__path}")
def _consume(self):
try:
assert isinstance(self.__data, dict), (
f"Config file should be a dict!"
)
if "options" in self.__data.keys():
self.info(f"Found options setting")
options = self.__data["options"]
assert isinstance(options, dict), "Options must be a dict"
if "dry-run" in options.keys():
dry_run = self.__data["options"]["dry-run"]
self.info(f"Found dry run option: {dry_run}")
assert isinstance(dry_run, bool), "dry-run setting must be boolean"
self.__dry_run = dry_run
else:
self.warning(f"No dry-run option found; Will use default: {self.__dry_run}")
if "minimum-items" in options.keys():
minimum_items = options["minimum-items"]
self.info(f"Found minimum-items option: {minimum_items}")
assert isinstance(minimum_items, int), (
f"Option minimum-items must be int, but got: {minimum_items}"
)
self.__minimum_items = minimum_items
else:
self.warning(
f"No minimum-items option found; Will use default: {self.__minimum_items}"
)
assert (
"maximum-items" in options.keys()
or
"maximum-age" in options.keys()
), (
"Options should include either maximum-items or maximum-age"
)
if "maximum-items" in options.keys():
maximum_items = options["maximum-items"]
self.info(f"Found maximum-items option: {maximum_items}")
assert isinstance(maximum_items, int), (
f"Option maximum-items must be int, but got: {maximum_items}"
)
assert maximum_items > 0, (
f"Option maximum-items is zero, which doesn't make sense."
)
self.__maximum_items = maximum_items
else:
self.warning(
f"No maximum-items option found; Will use default: {self.__maximum_items}"
)
if "maximum-age" in options.keys():
maximum_age = options["maximum-age"]
self.info(f"Found maximum-age option (max age in days): {maximum_age}")
assert isinstance(maximum_age, int), (
f"Option maximum-age must be int, but got: {maximum_age}"
)
assert maximum_age > 0, (
f"Option maximum-age is zero, which doesn't make sense."
)
self.__maximum_age = maximum_age
else:
self.warning(
f"No maximum-age option found; Will use default: {self.__maximum_age}"
)
assert "target-type" in options.keys(), (
f"Option target-type is required"
)
target_type = options["target-type"]
self.info(f"Found target-type option: {target_type}")
assert isinstance(target_type, str), (
f"Option target-type must be str, but got: {target_type}"
)
assert target_type in self.__VALID_TARGET_TYPES, (
f"Option target-type must be one of: {self.__VALID_TARGET_TYPES}"
)
self.__target_type = target_type
if "date-detection" in options.keys():
date_detection = options["date-detection"]
self.info(f"Found date-detection option: {date_detection}")
assert isinstance(date_detection, str), (
f"Option date-detection must be str, but got: {date_detection}"
)
assert date_detection in self.__VALID_DATE_DETECTION_TYPES, (
f"Option date-detection must be one of: {self.__VALID_DATE_DETECTION_TYPES}"
)
self.__date_detection = date_detection
else:
self.error(
f"Option date-detection not found; Will use default: {self.__date_detection}"
)
raise AssertionError(
f"Option date-detection is required."
)
else:
self.error(f"No options key found!")
raise AssertionError(f"No options key found!")
assert "paths" in self.__data, (
f"Could not find 'paths' key"
)
rotatable_paths = self.__data["paths"]
if isinstance(rotatable_paths, str):
rotatable_paths = [rotatable_paths]
assert isinstance(rotatable_paths, list), (
"Rotatable 'paths' key must be a string or list"
)
for i in range(len(rotatable_paths)):
p = rotatable_paths[i]
if isinstance(p, Path):
continue
elif isinstance(p, str):
rotatable_paths[i] = Path(p)
else:
raise AssertionError(
f"All rotatable paths must be strings or pathlib::Path objects"
)
self.__rotatable_paths = rotatable_paths
self.info(f"Found {len(self.__rotatable_paths)} rotatable paths")
except KeyError as e:
self.error(
f"Failed to load config due to KeyError"
f"\nFile: {self.__path}"
f"\nError: {str(e)}"
)
raise e
except AssertionError as e:
self.error(
f"Failed to load config due to AssertionError"
f"\nFile: {self.__path}"
f"\nError: {str(e)}"
)
raise e
def debug(self, s):
self.__logger.debug(f"({self.__path.name}) {s}")
def info(self, s):
self.__logger.info(f"({self.__path.name}) {s}")
def warning(self, s):
self.__logger.warning(f"({self.__path.name}) {s}")
def error(self, s):
self.__logger.error(f"({self.__path.name}) {s}")
@property
def key(self) -> str:
return str(self.__path)
@property
def path(self) -> Path:
return self.__path
@property
def data(self) -> dict:
return self.__data
@property
def dry_run(self) -> bool:
return self.__dry_run
@dry_run.setter
def dry_run(self, b: bool):
self.__dry_run = b
@property
def target_type(self) -> str:
return self.__target_type
@property
def date_detection(self) -> str:
return self.__date_detection
@property
def rotatable_paths(self) -> [Path]:
return self.__rotatable_paths
@property
def minimum_items(self) -> int:
return self.__minimum_items
@property
def maximum_items(self) -> int:
return self.__maximum_items
@property
def maximum_age(self) -> int:
return self.__maximum_age

120
domain/config/Scanner.py Normal file
View File

@ -0,0 +1,120 @@
from domain.Logger import Logger
from domain.Util import Util
# import os
from pathlib import Path
class Scanner:
__DEFAULT_VALID_EXTENSIONS = [
"yaml",
"yml"
]
def __init__(self, logger: Logger):
self.__logger = logger
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
def debug(self, s):
self.__logger.debug(f"[{type(self).__name__}] {s}")
def info(self, s):
self.__logger.info(f"[{type(self).__name__}] {s}")
def warn(self, s):
self.__logger.warning(f"[{type(self).__name__}] {s}")
def error(self, s):
self.__logger.error(f"[{type(self).__name__}] {s}")
def gather_valid_config_paths(self, paths: list = None) -> [Path]:
assert paths is not None, "Config paths cannot be None"
assert len(paths) > 0, "Must provide at least one config file path"
self.info("Gathering valid configs")
file_paths = []
config_paths = []
not_config_paths = []
# First gather all files that are potential configs
for path_str in paths:
path = Path(path_str)
self.info(f"Inspecting path: {path}")
if not path.exists():
self.error(f"Path doesn't exist: {path}")
if path.is_file():
self.debug(
f"Path is a file; Adding directly to potential config candidates: {path}"
)
file_paths.append(path)
elif path.is_dir():
self.debug(
f"Path is a dir;"
" Scanning recursively for potential config candidate files: {path}"
)
for file_path in Util.get_dir_files_recursive(path=path):
self.info(f"> Candidate file: {file_path}")
file_paths.append(file_path)
else:
raise AssertionError(
f"Don't know how to handle path that isn't a file or dir: {path}"
)
# Now, filter for files with valid YAML extensions
for file_path in file_paths:
if self.check_file_extension(file_path=file_path, extensions=None):
config_paths.append(file_path)
else:
not_config_paths.append(file_path)
self.info("Filtered out non-config files:")
if len(not_config_paths) > 0:
for not_config_path in not_config_paths:
self.info(f"> {not_config_path}")
else:
self.info("> [none]")
self.info("Kept config-looking files:")
if len(config_paths) > 0:
for config_path in config_paths:
self.info(f"> {config_path}")
else:
self.info("> [none]")
return config_paths
def check_file_extension(self, file_path: Path, extensions: list = None) -> bool:
if extensions is None:
extensions = self.__valid_extensions
file_extension = file_path.suffix
# Strip preceding dot from extension
if len(file_extension) > 0 and file_extension[0] == ".":
file_extension = file_extension[1:]
file_extension = file_extension.lower()
for valid_extension in extensions:
if file_extension == valid_extension:
return True
return False

47
init-environment Executable file
View File

@ -0,0 +1,47 @@
#!/bin/bash
log()
{
echo "[Mike's Backup Rotator - Init Env] $1"
}
complain()
{
echo "[Mike's Backup Rotator - Init Env] $1" 1>&2
}
die()
{
complain "Fatal: $1"
exit 1
}
SCRIPT_PATH=$(readlink -f "$0")
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
SCRIPT_NAME=$(basename "$SCRIPT_PATH")
log "Begin ${SCRIPT_NAME}"
log "Script path: ${SCRIPT_PATH}"
log "Script dir: ${SCRIPT_DIR}"
log "PATH: ${PATH}"
log "PWD before switching: $(pwd)"
cd "${SCRIPT_DIR}" || die "Failed to switch to project directory: ${SCRIPT_DIR}"
log "PWD after switching: $(pwd)"
log "Printing environment:"
printenv
log "Ensuring python installation with pyenv"
pyenv versions
pyenv install --skip-existing || die "Failed to ensure python installation with pyenv"
log "Installing/upgrading pip and pipenv"
pip install --upgrade pip pipenv || die "Failed to install/upgrade pip and pipenv"
log "Removing old pip environment"
pipenv --rm # Don't die because this will return an error if the env didn't already exist
# Install/sync
log "Syncing pip dependencies"
pipenv sync || die "Failed to sync pip environment with pipenv"

54
main.py
View File

@ -1,40 +1,76 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from BackupRotator import BackupRotator from domain.BackupRotator import BackupRotator
import argparse import argparse
#
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Mike's Backup Rotator. Helps automatically remove old backup files or folders." description="Mike's Backup Rotator. Helps automatically remove old backup files or folders."
) )
parser.add_argument(
"--debug", "--verbose",
dest="debug",
default=False,
action="store_true",
help="Verbose/Debug logging mode"
)
parser.add_argument(
"--systemd",
default=False,
dest="systemd",
action="store_true",
help=(
"Pass if this program will be spawned inside systemd"
" or another system that already adds timestamps to log messages."
)
)
parser.add_argument(
"--syslog", "--write-to-syslog",
default=False,
dest="write_to_syslog",
action="store_true",
help=(
"Pass if you'd like this program to write to syslog."
)
)
parser.add_argument( parser.add_argument(
"--config", "-c", "--config", "-c",
dest="config_files", dest="config_paths",
default=[], default=[],
action="append", action="append",
type=str, type=str,
help="Specify a configuration file. Can be called multiple times." help="Specify a configuration file or configuration directory. Can be called multiple times."
) )
parser.add_argument( parser.add_argument(
"--dry-run", "-d", "--dry-run", "-d",
dest="dry_run", dest="global_dry_run",
default=False, default=False,
action="store_true", action="store_true",
help="Only perform an analysis; Don't delete anything." help=(
"Only perform an analysis;"
" Don't delete anything no matter what configs say (configs can specify dry run, too)."
)
) )
args = parser.parse_args() args = parser.parse_args()
rotator = BackupRotator() rotator = BackupRotator(
config_paths=args.config_paths,
debug=args.debug,
systemd=args.systemd,
write_to_syslog=args.write_to_syslog,
)
rotator.run( rotator.run(
configs=args.config_files, global_dry_run=args.global_dry_run
dry_run=args.dry_run
) )