Appease Pycharm's incessant complaints

This commit is contained in:
Mike 2019-08-04 18:04:45 -07:00
parent ee2e1af737
commit 99a3bca3ba

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# #
import datetime import datetime
import os import os
@ -7,52 +8,48 @@ import shlex
import subprocess import subprocess
import sys import sys
# #
class MikesBackup: class MikesBackup:
# #
_log_dir = None __log_dir = None
_remote_host = None __remote_host = None
_remote_user = None __remote_user = None
_destination_dir_base = None __destination_dir_base = None
# #
_ssh_key = None __ssh_key = None
_quiet_ssh = True __quiet_ssh = True
# #
_source_dirs = [] __source_dirs = []
_source_dir_excludes = [] __source_dir_excludes = []
# #
_force_full = False __force_full = False
_force_differential = False __force_differential = False
# #
def __init__(self): def __init__(self):
self.ParseArgs() self.parse_args()
# #
def eprint(*args, **kwargs): def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs) print(*args, file=sys.stderr, **kwargs)
# #
def log(): def parse_args(self):
pass
#
def ParseArgs(self):
# Simple mapped args # Simple mapped args
args_map = { args_map = {
"--log-dir" : "_log_dir", "--log-dir": "__log_dir",
"--remote-host" : "_remote_host", "--remote-host": "__remote_host",
"--remote-user" : "_remote_user", "--remote-user": "__remote_user",
"--destination-dir" : "_destination_dir_base", "--destination-dir": "__destination_dir_base",
"--ssh-key" : "_ssh_key" "--ssh-key": "__ssh_key"
} }
# #
print() print()
print ("Parsing arguments") print("Parsing arguments")
a = 1 a = 1
while a < len(sys.argv): while a < len(sys.argv):
@ -65,7 +62,7 @@ class MikesBackup:
if arg == arg_name: if arg == arg_name:
valid_arg = True valid_arg = True
self_var_name = args_map[arg_name] self_var_name = args_map[arg_name]
self_var_value = sys.argv[ a + 1 ] self_var_value = sys.argv[a + 1]
self.__dict__[self_var_name] = self_var_value self.__dict__[self_var_name] = self_var_value
print("Found argument \"", arg_name, "\" ==>", self_var_value) print("Found argument \"", arg_name, "\" ==>", self_var_value)
a = a + 1 a = a + 1
@ -74,67 +71,71 @@ class MikesBackup:
valid_arg = True valid_arg = True
elif arg == "--full": elif arg == "--full":
valid_arg = True valid_arg = True
self._force_full = True self.__force_full = True
elif arg == "--diff" or arg == "--differential": elif arg == "--diff" or arg == "--differential":
valid_arg = True valid_arg = True
self._force_differential = True self.__force_differential = True
elif arg == "--source-dir": elif arg == "--source-dir":
valid_arg = True valid_arg = True
self._source_dirs.append( sys.argv[ a + 1 ] ) self.__source_dirs.append(sys.argv[a + 1])
print("Found source dir:", sys.argv[ a + 1 ]) print("Found source dir:", sys.argv[a + 1])
a = a + 1 a = a + 1
elif arg == "--exclude": elif arg == "--exclude":
valid_arg = True valid_arg = True
self._source_dir_excludes.append( sys.argv[ a + 1 ] ) self.__source_dir_excludes.append(sys.argv[a + 1])
print("Found exclude dir:", sys.argv[ a + 1 ]) print("Found exclude dir:", sys.argv[a + 1])
a = a + 1 a = a + 1
a = a + 1 a = a + 1
# #
if not valid_arg: if not valid_arg:
raise Exception("Invalid argument:", arg); raise Exception("Invalid argument:", arg)
# @staticmethod
def GetDatetimeForFilename(self): def get_datetime_for_filename():
# #
return datetime.datetime.now().strftime('%Y-%b-%d_%I%M%p') return datetime.datetime.now().strftime('%Y-%b-%d_%I%M%p')
# #
def IsUsingSSH(self): def is_using_ssh(self):
# #
if self._remote_host != None or self._remote_user != None or self._ssh_key != None: if (
self.__remote_host is not None
or self.__remote_user is not None
or self.__ssh_key is not None
):
return True return True
return False return False
# #
def DemandSSHConfig(self): def demand_ssh_config(self):
# #
if self.IsUsingSSH(): if self.is_using_ssh():
if self._remote_host == None: if self.__remote_host is None:
raise Exception("Please provide remote host") raise Exception("Please provide remote host")
if self._remote_user == None: if self.__remote_user is None:
raise Exception("Please provide remote user") raise Exception("Please provide remote user")
# #
def DemandDestinationDirectoryConfig(self): def demand_destination_directory_config(self):
# #
if self._destination_dir_base == None: if self.__destination_dir_base is None:
raise Exception("Please provide backup destination directory") raise Exception("Please provide backup destination directory")
# #
def DoesDestinationDirectoryExist(self, destination_path): def does_destination_directory_exist(self, destination_path):
# #
print("Trying to determine if destination path exists:", destination_path) print("Trying to determine if destination path exists:", destination_path)
# Local? # Local?
if not self.IsUsingSSH(): if not self.is_using_ssh():
print("Checking for local destination path") print("Checking for local destination path")
if os.path.isdir(destination_path): if os.path.isdir(destination_path):
print("Local destination path exists") print("Local destination path exists")
@ -150,7 +151,7 @@ class MikesBackup:
] ]
# #
code, stdout, stderr = self.ExecuteRemoteSSHCommand(command) code, stdout, stderr = self.execute_remote_ssh_command(command)
if code == 0: if code == 0:
print("Remote destination dir was found: " + destination_path) print("Remote destination dir was found: " + destination_path)
return True return True
@ -160,208 +161,208 @@ class MikesBackup:
return False return False
# #
def DemandDestinationBaseBackupDirectory(self): def demand_destination_base_backup_directory(self):
# #
self.DemandDestinationDirectoryConfig() self.demand_destination_directory_config()
# #
destination_path = self._destination_dir_base destination_path = self.__destination_dir_base
# #
if self.DoesDestinationDirectoryExist(destination_path) == False: if self.does_destination_directory_exist(destination_path) is False:
raise Exception("Backup destination directory doesn't exist: " + destination_path) raise Exception("Backup destination directory doesn't exist: " + destination_path)
# #
def DoesFullBackupDestinationDirectoryExist(self): def does_full_backup_destination_directory_exist(self):
# #
dir_path = self.MakeFullBackupDestinationPath() dir_path = self.make_full_backup_destination_path()
# #
print("Trying to determine if Full backup destination directory exists:", dir_path) print("Trying to determine if Full backup destination directory exists:", dir_path)
return self.DoesDestinationDirectoryExist(dir_path) return self.does_destination_directory_exist(dir_path)
# #
def GetSourceDirectories(self): def get_source_directories(self):
# #
if len(self._source_dirs) == 0: if len(self.__source_dirs) == 0:
raise Exception("No source directories specified") raise Exception("No source directories specified")
return self._source_dirs return self.__source_dirs
# #
def DoBackup(self): def do_backup(self):
# #
print() print()
print("Enter: DoBackup") print("Enter: do_backup")
# Remote base dir must exist # Remote base dir must exist
self.DemandDestinationBaseBackupDirectory() self.demand_destination_base_backup_directory()
# Forced full or differential by args? # Forced full or differential by args?
if self._force_full == True or self._force_differential == True: if self.__force_full is True or self.__force_differential is True:
if self._force_full == True: if self.__force_full is True:
print("Forcing full backup") print("Forcing full backup")
self.DoFullBackup() self.do_full_backup()
else: else:
print("Forcing differential backup") print("Forcing differential backup")
self.DoDifferentialBackup() self.do_differential_backup()
return return
# Automatically choose full or differential # Automatically choose full or differential
if self.DoesFullBackupDestinationDirectoryExist(): if self.does_full_backup_destination_directory_exist():
print("Automatically choosing differential backup, because full backup destination directory already exists") print("Automatically choosing differential backup, because full backup destination directory already exists")
self.DoDifferentialBackup() self.do_differential_backup()
else: else:
print("Automatically choosing full backup, because full backup destination directory wasn't found") print("Automatically choosing full backup, because full backup destination directory wasn't found")
self.DoFullBackup() self.do_full_backup()
# #
def DoFullBackup(self): def do_full_backup(self):
# Start args # Start args
args = [] args = []
# Get destination directory # Get destination directory
destination_dir = self.MakeFullBackupDestinationPath() destination_dir = self.make_full_backup_destination_path()
# Append source directories # Append source directories
args.extend(self.GetSourceDirectories()) args.extend(self.get_source_directories())
# Append remote destination directory # Append remote destination directory
#args.append( self._remote_user + "@" + self._remote_host + ":" + remote_dir) # args.append( self.__remote_user + "@" + self.__remote_host + ":" + remote_dir)
args.append( self.MakeRsyncRemoteDestinationPart(destination_dir) ) args.append(self.make_rsync_remote_destination_part(destination_dir))
#print("Args", str(args)) # print("Args", str(args))
print("Destination dir:", destination_dir) print("Destination dir:", destination_dir)
self.ExecuteRsync(args) self.execute_rsync(args)
# #
def DoDifferentialBackup(self): def do_differential_backup(self):
# Start args # Start args
args = [] args = []
# Get directories # Get directories
link_dest_dir = self.MakeFullBackupDestinationPath() link_dest_dir = self.make_full_backup_destination_path()
destination_dir = self.MakeRemoteDifferentialBackupPath() destination_dir = self.make_remote_differential_backup_path()
self.EnsureDestinationDirectory(destination_dir) self.ensure_destination_directory(destination_dir)
# Add link dest arg # Add link dest arg
args.append("--link-dest") args.append("--link-dest")
args.append(link_dest_dir) args.append(link_dest_dir)
# Append source directories # Append source directories
args.extend(self.GetSourceDirectories()) args.extend(self.get_source_directories())
# Append remote destination directory # Append remote destination directory
#args.append( self._remote_user + "@" + self._remote_host + ":" + remote_dir) # args.append( self.__remote_user + "@" + self.__remote_host + ":" + remote_dir)
args.append( self.MakeRsyncRemoteDestinationPart(destination_dir) ) args.append(self.make_rsync_remote_destination_part(destination_dir))
#print("Args", str(args)) # print("Args", str(args))
print("Link destination dir:", link_dest_dir) print("Link destination dir:", link_dest_dir)
print("Destination dir:", destination_dir) print("Destination dir:", destination_dir)
self.ExecuteRsync(args) self.execute_rsync(args)
# #
def MakeLogDirectoryPath(self): def make_log_directory_path(self):
# #
log_dir = self._log_dir log_dir = self.__log_dir
if log_dir == None: if log_dir is None:
print("No log directory specified; Defaulting to current working directory") print("No log directory specified; Defaulting to current working directory")
log_dir = "." log_dir = "."
return log_dir return log_dir
# #
def MakeLogPath(self): def make_log_path(self):
# Log dir # Log dir
log_dir = self.MakeLogDirectoryPath() log_dir = self.make_log_directory_path()
# Path # Path
log_path = os.path.join(log_dir, self.GetDatetimeForFilename() + ".log") log_path = os.path.join(log_dir, self.get_datetime_for_filename() + ".log")
return log_path return log_path
# #
def MakeFullBackupDestinationPath(self): def make_full_backup_destination_path(self):
# #
if self._destination_dir_base == None: if self.__destination_dir_base is None:
raise Exception("No remote directory was specified") raise Exception("No remote directory was specified")
# #
return os.path.join(self._destination_dir_base, "Full") return os.path.join(self.__destination_dir_base, "Full")
# #
def MakeRemoteDifferentialBackupPath(self): def make_remote_differential_backup_path(self):
# #
if self._destination_dir_base == None: if self.__destination_dir_base is None:
raise Exception("No remote directory was specified") raise Exception("No remote directory was specified")
# #
return os.path.join(self._destination_dir_base, "Differential", self.GetDatetimeForFilename()) return os.path.join(self.__destination_dir_base, "Differential", self.get_datetime_for_filename())
# #
def MakeRsyncRemoteDestinationPart(self, destination_dir): def make_rsync_remote_destination_part(self, destination_dir):
# #
part = "" part = ""
# #
if self._remote_host != None: if self.__remote_host is not None:
if self._remote_user != None: if self.__remote_user is not None:
part += self._remote_user + "@" part += self.__remote_user + "@"
part += self._remote_host + ":" part += self.__remote_host + ":"
# #
part += destination_dir part += destination_dir
return part return part
# @staticmethod
def EnsureLocalDirectory(self, d): def ensure_local_directory(d):
# #
if not os.path.exists(d): if not os.path.exists(d):
os.makedirs(d) os.makedirs(d)
# #
def EnsureDestinationDirectory(self, d): def ensure_destination_directory(self, d):
# #
if not self.DoesDestinationDirectoryExist(d): if not self.does_destination_directory_exist(d):
# #
print("Destination directory doesn't exist; Will create:", d) print("Destination directory doesn't exist; Will create:", d)
# #
if self.IsUsingSSH(): if self.is_using_ssh():
command = [ command = [
"mkdir", "mkdir",
"--parents", "--parents",
d d
] ]
self.ExecuteRemoteSSHCommand(command) self.execute_remote_ssh_command(command)
else: else:
os.makedirs(d) os.makedirs(d)
# #
def StartRsyncArgs(self): def start_rsync_args(self):
# #
self.EnsureLocalDirectory(self.MakeLogDirectoryPath()) self.ensure_local_directory(self.make_log_directory_path())
args = [ args = [
"rsync", "rsync",
"--log-file", self.MakeLogPath(), "--log-file", self.make_log_path(),
"--archive", "--archive",
"--compress", "--compress",
"--progress", "--progress",
@ -374,54 +375,54 @@ class MikesBackup:
] ]
# #
for e in self._source_dir_excludes: for e in self.__source_dir_excludes:
args.append("--exclude") args.append("--exclude")
args.append(e) args.append(e)
# #
#args.append("--dry-run") # DEBUG !!! # args.append("--dry-run") # DEBUG !!!
return args return args
# #
def StartRsyncEnvironmentalVariables(self): def start_rsync_environment_variables(self):
# #
env = {} env = {}
# #
if self._ssh_key != None or self._quiet_ssh == True: if self.__ssh_key is not None or self.__quiet_ssh is True:
env["RSYNC_RSH"] = "ssh" env["RSYNC_RSH"] = "ssh"
if self._ssh_key != None: if self.__ssh_key is not None:
env["RSYNC_RSH"] += " -i " + shlex.quote(self._ssh_key) env["RSYNC_RSH"] += " -i " + shlex.quote(self.__ssh_key)
if self._quiet_ssh == True: if self.__quiet_ssh is True:
env["RSYNC_RSH"] += " -q" env["RSYNC_RSH"] += " -q"
return env return env
# #
def ExecuteRemoteSSHCommand(self, command): def execute_remote_ssh_command(self, command):
# #
self.DemandSSHConfig() self.demand_ssh_config()
# #
args = [] args = list()
# ssh command # ssh command
args.append("ssh") args.append("ssh")
# Quiet? # Quiet?
if self._quiet_ssh == True: if self.__quiet_ssh is True:
args.append("-q") args.append("-q")
# ssh key # ssh key
if self._ssh_key != None: if self.__ssh_key is not None:
args.append("-i") args.append("-i")
args.append(self._ssh_key) args.append(self.__ssh_key)
# ssh user@host # ssh user@host
args.append(self._remote_user + "@" + self._remote_host) args.append(self.__remote_user + "@" + self.__remote_host)
# Append the command # Append the command
args.append("--") args.append("--")
@ -433,58 +434,59 @@ class MikesBackup:
raise Exception("Unsupported command datatype") raise Exception("Unsupported command datatype")
# Spawn SSH in shell # Spawn SSH in shell
#process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(args) process = subprocess.Popen(args)
#stdout, stderr = process.communicate() # stdout, stderr = process.communicate()
process.communicate() process.communicate()
stdout = "" stdout = ""
stderr = "" stderr = ""
#print (stderr.decode()) # print(stderr.decode())
return (process.returncode, stdout, stderr) return process.returncode, stdout, stderr
# #
def ExecuteRsync(self, _args): def execute_rsync(self, _args):
# Demand stuff # Demand stuff
self.DemandDestinationDirectoryConfig() self.demand_destination_directory_config()
if self.IsUsingSSH(): if self.is_using_ssh():
self.DemandSSHConfig() self.demand_ssh_config()
# #
args = self.StartRsyncArgs() args = self.start_rsync_args()
args.extend(_args) args.extend(_args)
#print(str(args)) # print(str(args))
# #
env = self.StartRsyncEnvironmentalVariables() env = self.start_rsync_environment_variables()
# #
#print("Debug -> Want to execute Rsync") # print("Debug -> Want to execute Rsync")
#print("Args:", str(args)) # print("Args:", str(args))
#print("Env:", str(env)) # print("Env:", str(env))
#return (0, "", "") # return (0, "", "")
# Spawn Rsync in shell # Spawn Rsync in shell
#process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) # process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
process = subprocess.Popen(args, env=env) process = subprocess.Popen(args, env=env)
#stdout, stderr = process.communicate() # stdout, stderr = process.communicate()
process.communicate() process.communicate()
#self.eprint(stderr.decode()) # self.eprint(stderr.decode())
stdout = "" stdout = ""
stderr = "" stderr = ""
return (process.returncode, stdout, stderr) return process.returncode, stdout, stderr
def main():
b = MikesBackup()
b.do_backup()
# #
if __name__ == "__main__": if __name__ == "__main__":
# #
b = MikesBackup() main()
b.DoBackup()