refactor/upgrade work

This commit is contained in:
Mike
2024-07-06 20:01:45 -07:00
parent e96c505367
commit 1dcb08352a
7 changed files with 595 additions and 268 deletions

20
domain/Logger.py Normal file
View File

@ -0,0 +1,20 @@
import sys
import syslog
class Logger:
@staticmethod
def log(s):
print("[SSHConfiger]", s)
syslog.syslog("[SSHConfiger] " + s)
#
@staticmethod
def complain(s):
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
print("[SSHConfiger]", s, file=sys.stderr)

200
domain/SSHConfigChanger.py Executable file
View File

@ -0,0 +1,200 @@
from domain.config.Config import Config
from domain.Logger import Logger
import os
import re
import subprocess
import sys
class SSHConfigChanger:
__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME = "config"
__DEFAULT_TARGET_NAME = "default"
#
def __init__(
self,
action_interface, action_command,
config_file_path,
dry_run: bool = False,
):
self.__logger = Logger(
)
self.__config = Config(
logger=self.__logger,
file_path=config_file_path
)
if dry_run:
self.__logger.complain(f"Dry run enabled at runtime")
self.__config.dry_run = True
# Grab args
self.__action_interface = action_interface
self.__action_command = action_command
def die(self, s):
#
self.__logger.complain(s)
raise Exception(s)
def quit(self, s):
#
self.__logger.log("Quitting because: " + s)
sys.exit(0)
#
def run(self):
self.__logger.log(
f"Running for interface {self.__action_interface}: {self.__action_command}"
)
# Only run if an interface is coming up
if self.__action_command != "up":
self.quit(f"We don't need to run for action command: {self.__action_command}")
# Determine which ssh config file we should use
ssh_config_name = self.determine_ssh_config_name()
if not ssh_config_name:
self.die("Unable to determine appropriate ssh config name; Quitting")
self.__logger.log(
f"Determined ssh config name: {ssh_config_name}"
)
# Make paths
ssh_config_path_link = self.__config.ssh_dir / self.__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME
ssh_config_path_target = self.__config.ssh_dir / ssh_config_name
self.__logger.log(
f"Selecting source config file \"{ssh_config_path_target}\""
f" for link \"{ssh_config_path_link}\""
)
# Don't run unless current ssh config is a symlink or not a file (for safety)
self.require_symlink_or_none(ssh_config_path_link)
if self.__config.dry_run:
self.__logger.complain(
f"Dry run enabled; Won't unlink existing symlink: {ssh_config_path_link}"
)
self.__logger.complain(
f"Dry run enabled; Won't create symlink: {ssh_config_path_link} ==> {ssh_config_path_target}"
)
else:
# Set the symlink
if ssh_config_path_link.exists():
try:
ssh_config_path_link.unlink()
except FileNotFoundError:
pass
ssh_config_path_link.symlink_to(ssh_config_path_target, target_is_directory=False)
#
self.__logger.log("Finished")
#
def require_symlink_or_none(self, file_path):
#
if ( not os.path.isfile(file_path) or os.path.islink(file_path) ):
return True
#
self.die("For safety, we cannot continue if the target link exists and is a file (" + file_path + ")")
def determine_ssh_config_name(self):
#
self.__logger.log("Attempting to determine SSH config name")
# Check each section
found_ssh_config_name = None
for target_name in self.__config.targets:
target = self.__config.targets[target_name]
self.__logger.log(f"Examining target: {target_name}")
# Don't examine default if anything is picked already
if target_name == self.__DEFAULT_TARGET_NAME and found_ssh_config_name:
self.__logger.log(
f"Skipping default section ({self.__DEFAULT_TARGET_NAME}) because we've already found at least one match: {found_ssh_config_name}"
)
continue
# Check the interface
if (
# Matches, if current interface found in adapters
self.__action_interface in target.adapters
# Can also match if we're in the default section
or target_name == self.__DEFAULT_TARGET_NAME
):
pass
else:
self.__logger.log(
f"Target \"{target_name}\" didn't match any interfaces; Skipping"
)
continue
# Grab the SSID this adapter is currently connected to
interface_ssid = self.get_interface_ssid(self.__action_interface)
if not interface_ssid:
self.__logger.log(
f"Interface \"{interface_ssid}\" isn't connected to anything ... "
)
self.__logger.log(
f"Interface \"{self.__action_interface}\" is currently connected to: \"{interface_ssid}\""
)
# Must also match at least one SSID,
# OR we're in the default section
if interface_ssid not in target.ssids and target_name != self.__DEFAULT_TARGET_NAME:
self.__logger.log(
f"Did not find SSID \"{interface_ssid}\" in target ssids: " + str(target.ssids)
)
continue
# Found a match!
found_ssh_config_name = target.ssh_config_name
self.__logger.log(
f"Found matching ssh config name: {found_ssh_config_name}"
)
# Didn't find anything? Go default ...
if not found_ssh_config_name:
if self.__DEFAULT_TARGET_NAME in self.__config.targets.keys():
target = self.__config.targets[self.__DEFAULT_TARGET_NAME]
found_ssh_config_name = target.ssh_config_name
self.__logger.log(
f"No matches found; Defaulting to: {found_ssh_config_name}"
)
return found_ssh_config_name
@staticmethod
def get_interface_ssid(interface_name):
#
p = subprocess.Popen(["nmcli", "dev", "show", interface_name], stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
stdout = stdout.decode()
#
r = re.compile(
pattern=r"^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$",
flags=re.MULTILINE
)
match = r.search(stdout)
return match.group("ssid")

247
domain/config/Config.py Normal file
View File

@ -0,0 +1,247 @@
from domain.Logger import Logger
import os
from pathlib import Path
import yaml
class Target:
def __init__(self, logger: Logger, name):
self.__logger = logger
self.__name = name
self.__data = {}
self.__adapters_names = []
self.__ssids = []
# noinspection PyTypeChecker
self.__ssh_config_name: str = None
def __str__(self):
s = ""
s += f"Target: {self.__name}"
s += f"\n> SSH config file name: {self.__ssh_config_name}"
s += f"\n> Adapters: "
if len(self.__adapters_names) > 0:
s += ", ".join(self.__adapters_names)
else:
s += "[none]"
s += f"\n> SSIDs: "
if len(self.__ssids) > 0:
s += ", ".join(self.__ssids)
else:
s += "[none]"
return s
def log(self, s):
self.__logger.log(
f"[Target::{self.__name}] {s}"
)
def complain(self, s):
self.__logger.complain(
f"[Target::{self.__name}] {s}"
)
def consume_data(self, data: dict):
assert isinstance(data, dict), (
f"Data should be a dict (found: {type(data).__name__}) "
)
self.__data = data
if "adapters" in self.__data.keys():
adapters = self.__data["adapters"]
if isinstance(adapters, list):
pass
elif isinstance(adapters, str):
adapters = [adapters]
else:
raise AssertionError(f"Unsupported adapters data type: {type(adapters).__name__}")
self.__adapters_names.extend(adapters)
if "adapter" in self.__data.keys():
adapters = self.__data["adapter"]
if isinstance(adapters, list):
pass
elif isinstance(adapters, str):
adapters = [adapters]
else:
raise AssertionError(f"Unsupported adapter data type: {type(adapters).__name__}")
self.__adapters_names.extend(adapters)
if "ssids" in self.__data.keys():
ssids = self.__data["ssids"]
if isinstance(ssids, list):
pass
elif isinstance(ssids, str):
ssids = [ssids]
else:
raise AssertionError(f"Unsupported ssids data type: {type(ssids).__name__}")
self.__ssids.extend(ssids)
if "ssid" in self.__data.keys():
ssids = self.__data["ssid"]
if isinstance(ssids, list):
pass
elif isinstance(ssids, str):
ssids = [ssids]
else:
raise AssertionError(f"Unsupported ssid data type: {type(ssids).__name__}")
self.__ssids.extend(ssids)
assert len(self.__adapters_names) > 0, (
f"At least one adapter must be configured at target-name::adapters"
)
assert len(self.__ssids) > 0, (
f"At least one ssid must be configured at target-name::ssids"
)
@property
def ssh_config_name(self) -> str:
return self.__ssh_config_name
@property
def adapters(self) -> list[str]:
return self.__adapters_names
@property
def ssids(self) -> list[str]:
return self.__ssids
class Config:
def __init__(self, logger: Logger, file_path: str):
self.__logger = logger
if isinstance(file_path, str):
file_path = Path(file_path)
elif isinstance(file_path, Path):
pass
else:
raise AssertionError("File path should be a string or Path object")
self.__file_path = file_path
self.__data = {}
self.__dry_run = False
self.__ssh_dir = Path(os.path.expanduser("~"))
self.__targets = {}
self._load_config()
self._consume_config()
print(self)
def __str__(self):
s = ""
s += "*** Config ***"
for target in self.__targets.values():
s += "\n" + str(target)
return s
def _load_config(self):
assert self.__file_path.exists(), "Config file must exist"
with open(self.__file_path) as f:
self.__data = yaml.safe_load(f)
def _consume_config(self):
assert isinstance(self.__data, dict), (
f"Config data must be a dict"
)
assert "options" in self.__data.keys(), (
f"Options key missing from config"
)
options = self.__data["options"]
assert isinstance(options, dict), "Config options must be a dict!"
if "dry-run" in options.keys():
d = options["dry-run"]
assert isinstance(d, bool), "options::dry-run must be a bool"
if d:
self.__logger.log(f"Found configured dry run")
self.__dry_run = d
if "ssh-dir" in options.keys():
ssh_dir = Path(options["ssh-dir"])
assert ssh_dir.exists(), f"options::ssh-dir must be a valid directory"
self.__ssh_dir = ssh_dir
self.__logger.log(f"Found ssh dir: {self.__ssh_dir}")
else:
self.__logger.log(f"options::ssh-dir not found")
self.__targets = {}
# Setup a default target
t = Target(
logger=self.__logger,
name="default",
)
self.__targets["default"] = t
assert "targets" in self.__data.keys(), "Config should specify targets"
targets = self.__data["targets"]
assert isinstance(targets, dict), "Targets should be a dict, where each key is one target"
for target_name in targets.keys():
self.__logger.log(f"Parsing target: {target_name}")
try:
t = Target(
logger=self.__logger,
name=target_name,
)
t.consume_data(data=targets[target_name])
except AssertionError as e:
self.__logger.complain(
f"Failed to parse target \"{target_name}\""
f"\n{e}"
)
raise e
self.__targets[target_name] = t
@property
def file_path(self) -> Path:
return self.__file_path
@property
def dry_run(self) -> bool:
return self.__dry_run
@dry_run.setter
def dry_run(self, b: bool):
self.__dry_run = b
@property
def ssh_dir(self) -> Path | None:
return self.__ssh_dir
@property
def targets(self) -> [Target]:
return self.__targets

View File

@ -1,266 +0,0 @@
#!/usr/bin/env python3
#
import configparser
import json
import logging
import os
import re
import subprocess
import sys
import syslog
#
class SSHConfiger:
#
_action_interface = None
_action_command = None
#
_config_file_path = None
_targets = None
_config = None
#
def __init__(self, action_interface, action_command, config_file_path):
# Grab args
self._action_interface = action_interface
self._action_command = action_command
self._config_file_path = config_file_path
#
def log(self, s):
#
print ("[SSHConfiger]", s)
syslog.syslog("[SSHConfiger] " + s)
#
def complain(self, s):
#
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
print("[SSHConfiger]", s, file=sys.stderr)
#
def die(self, s):
#
self.complain(s)
raise Exception(s)
def quit(self, s):
#
self.log("Quitting because: " + s)
sys.exit(0)
#
def run(self):
#
self.log("Running for interface \""+ self._action_interface +"\": " + self._action_command)
# Only run if an interface is coming up
if ( self._action_command != "up" ):
self.quit("We don't need to run for action command: " + str(self._action_command))
# Parse the config
self.parse_config()
# Grab the specified ssh directory
if ( "ssh_dir" not in self._config.keys() ):
self.die("Config file needs key \"ssh_dir\" inside \"config\" section")
ssh_dir = self._config["ssh_dir"]
self.log("SSH Dir: " + str(ssh_dir))
# Determine which ssh config file we should use
ssh_config_name = self.determine_ssh_config_name()
self.log("Determined ssh config name: " + str(ssh_config_name))
if not ssh_config_name:
self.die("Unable to determine appropriate ssh config name; Quitting")
# Make paths
ssh_config_path_link = os.path.join(ssh_dir, "config")
ssh_config_path_target = os.path.join(ssh_dir, ssh_config_name)
self.log("Selecting source config file \"" + ssh_config_path_target + "\" for link \"" + ssh_config_path_link + "\"")
# Don't run unless current ssh config is a symlink or not a file (for safety)
self.require_symlink_or_none(ssh_config_path_link)
# Set the symlink
try:
os.unlink(ssh_config_path_link)
except:
pass
os.symlink(ssh_config_path_target, ssh_config_path_link)
#
self.log("Finished")
#
def require_symlink_or_none(self, file_path):
#
if ( not os.path.isfile(file_path) or os.path.islink(file_path) ):
return True
#
self.die("For safety, we cannot continue if the target link exists and is a file (" + file_path + ")")
#
def determine_ssh_config_name(self):
#
self.log("Attempting to determine SSH config name")
# Check each section
found_ssh_config_name = None
for section_name in self._targets:
#
section = self._targets[section_name]
self.log("Examining section: " + str(section_name))
# Don't examine default if anything is picked already
if section_name == "default" and found_ssh_config_name:
self.log("Skipping default section because we've already found at least one match: " + str(found_ssh_config_name))
continue
# Check the interface
interface_matched = False
if (
# Matches, if current interface found in adapters
self._action_interface in section["adapters"]
# Can also match if we're in section "default"
or section_name == "default"
):
interface_matched = True
if not interface_matched:
self.log("Section \"" + str(section_name) + "\" didn't match any interfaces; Skipping")
continue
# Grab the SSID this adapter is currently connected to
interface_ssid = self.get_interface_ssid(self._action_interface)
if not interface_ssid:
self.log("Interface \"" + str(interface_ssid) + "\" isn't connected to anything ... ")
self.log("Interface \"" + str(self._action_interface) + "\" is currently connected to: \"" + str(interface_ssid) + "\"")
# Must also match at least one SSID,
# OR we're in the default section
if interface_ssid not in section["ssids"] and section_name != "default":
self.log("Did not find SSID \"" + interface_ssid + "\" in section ssids: " + str(section["ssids"]))
continue
# Found a match!
found_ssh_config_name = section["ssh_config_name"]
self.log("Found matching ssh config name: " + str(found_ssh_config_name))
# Didn't find anything? Go default ...
if (not found_ssh_config_name):
if ( "default" in self._targets.keys() ):
if ( "ssh_config_name" in self._targets["default"].keys() ):
found_ssh_config_name = self._targets["default"]["ssh_config_name"]
self.log("No matches found; Defaulting to:" + found_ssh_config_name)
#
return found_ssh_config_name
#
def get_interface_ssid(self, interface):
#
p = subprocess.Popen(["nmcli", "dev", "show", interface], stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
stdout = stdout.decode()
#
r = re.compile("""^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$""", re.MULTILINE)
match = r.search(stdout)
return match.group("ssid")
#
def parse_config(self):
#
self.log("Parsing config: " + self._config_file_path)
parser = configparser.ConfigParser()
parser.read( self._config_file_path )
# Attempt to grab global config
if ( "config" not in parser.sections() ):
self.die("config section not found")
config = parser["config"]
# Targets
targets = {}
for section in parser.sections():
# Skip the config section
if ( section == "config" ):
continue
#
self.log("Parsing config target: \"" + section + "\"")
target = {
"adapters" : [],
"ssids" : []
}
# Adapters
if ( parser.has_option(section, "adapters") ):
target["adapters"] = json.loads( parser[section]["adapters"] )
if ( parser.has_option(section, "adapter") ):
target["adapters"].append( parser[section]["adapter"] )
# SSIDs
if ( parser.has_option(section, "ssids") ):
target["ssids"] = json.loads( parser[section]["ssids"] )
if ( parser.has_option(section, "ssid") ):
target["ssids"].append( parser[section]["ssid"] )
# ssh_config_name
if ( parser.has_option(section, "ssh_config_name") ):
target["ssh_config_name"] = parser[section]["ssh_config_name"]
else:
raise Exception("ssh_config_name key missing from section: " + section)
#
targets[section] = target
self.log("Parsed config for config target \"" + section + "\": " + str(target))
#
self._config = config
self._targets = targets
return True
# Main Entry
if (__name__ == "__main__"):
# Script name
script_name = sys.argv[0]
# Network Manager action info
action_interface = sys.argv[1]
action_command = sys.argv[2]
# Config file
config_file_path = sys.argv[3]
#
configger = SSHConfiger(action_interface, action_command, config_file_path)
configger.run()