connection-specific-ssh-config/domain/SSHConfigChanger.py

197 lines
5.2 KiB
Python
Raw Normal View History

2024-07-06 20:01:45 -07:00
from domain.config.Config import Config
2024-07-06 20:43:11 -07:00
from domain.config.Config import Target
2024-07-06 20:01:45 -07:00
from domain.Logger import Logger
import os
2024-07-06 20:43:11 -07:00
from pathlib import Path
2024-07-06 20:01:45 -07:00
import re
import subprocess
import sys
class SSHConfigChanger:
def __init__(
self,
action_interface, action_command,
config_file_path,
dry_run: bool = False,
):
self.__logger = Logger(
)
self.__config = Config(
logger=self.__logger,
file_path=config_file_path
)
if dry_run:
self.__logger.complain(f"Dry run enabled at runtime")
self.__config.dry_run = True
# Grab args
self.__action_interface = action_interface
self.__action_command = action_command
def die(self, s):
#
self.__logger.complain(s)
raise Exception(s)
def quit(self, s):
#
self.__logger.log("Quitting because: " + s)
sys.exit(0)
#
def run(self):
self.__logger.log(
f"Running for interface {self.__action_interface}: {self.__action_command}"
)
# Only run if an interface is coming up
if self.__action_command != "up":
self.quit(f"We don't need to run for action command: {self.__action_command}")
# Determine which ssh config file we should use
2024-07-06 20:43:11 -07:00
ssh_config_target = self.determine_ssh_config_target()
if ssh_config_target is None:
2024-07-06 20:01:45 -07:00
self.die("Unable to determine appropriate ssh config name; Quitting")
self.__logger.log(
2024-07-06 20:43:11 -07:00
f"Determined ssh config name: {ssh_config_target.name}"
2024-07-06 20:01:45 -07:00
)
# Make paths
2024-07-06 20:43:11 -07:00
ssh_config_path_link = self.__config.ssh_dir / self.__config.default_normal_ssh_config_file_name
ssh_config_path_target = self.__config.ssh_dir / ssh_config_target.ssh_config_file_name
2024-07-06 20:01:45 -07:00
self.__logger.log(
f"Selecting source config file \"{ssh_config_path_target}\""
f" for link \"{ssh_config_path_link}\""
)
# Don't run unless current ssh config is a symlink or not a file (for safety)
self.require_symlink_or_none(ssh_config_path_link)
if self.__config.dry_run:
self.__logger.complain(
f"Dry run enabled; Won't unlink existing symlink: {ssh_config_path_link}"
)
self.__logger.complain(
f"Dry run enabled; Won't create symlink: {ssh_config_path_link} ==> {ssh_config_path_target}"
)
else:
# Set the symlink
if ssh_config_path_link.exists():
try:
ssh_config_path_link.unlink()
except FileNotFoundError:
pass
ssh_config_path_link.symlink_to(ssh_config_path_target, target_is_directory=False)
#
self.__logger.log("Finished")
#
2024-07-06 20:43:11 -07:00
def require_symlink_or_none(self, file_path: Path):
2024-07-06 20:01:45 -07:00
2024-07-06 20:43:11 -07:00
if file_path.is_file() and file_path.exists() and not file_path.is_symlink():
self.die(
f"For safety, refusing to continue because the target ssh config file exists and is not a symlink:"
f" {file_path}"
)
2024-07-06 20:01:45 -07:00
2024-07-06 20:43:11 -07:00
def determine_ssh_config_target(self) -> Target:
2024-07-06 20:01:45 -07:00
#
self.__logger.log("Attempting to determine SSH config name")
2024-07-06 20:43:11 -07:00
# Start off by assuming the default target
# noinspection PyTypeChecker
selected_target = None
selected_target: Target
2024-07-06 20:01:45 -07:00
# Check each section
for target_name in self.__config.targets:
self.__logger.log(f"Examining target: {target_name}")
2024-07-06 20:43:11 -07:00
if selected_target is not None and target_name == selected_target.name:
self.__logger.log(f"Ignoring target because it is already selected: {target_name}")
2024-07-06 20:01:45 -07:00
continue
2024-07-06 20:43:11 -07:00
target = self.__config.targets[target_name]
# Matches, if current interface found in adapters
if self.__action_interface not in target.adapters:
2024-07-06 20:01:45 -07:00
self.__logger.log(
f"Target \"{target_name}\" didn't match any interfaces; Skipping"
)
continue
# Grab the SSID this adapter is currently connected to
interface_ssid = self.get_interface_ssid(self.__action_interface)
if not interface_ssid:
self.__logger.log(
2024-07-06 20:43:11 -07:00
f"Interface \"{interface_ssid}\" isn't connected to anything; Done looking"
2024-07-06 20:01:45 -07:00
)
2024-07-06 20:43:11 -07:00
break
2024-07-06 20:01:45 -07:00
self.__logger.log(
f"Interface \"{self.__action_interface}\" is currently connected to: \"{interface_ssid}\""
)
2024-07-06 20:43:11 -07:00
# Must also match at least one SSID
if interface_ssid in target.ssids:
2024-07-06 20:01:45 -07:00
self.__logger.log(
2024-07-06 20:43:11 -07:00
f"Found SSID \"{interface_ssid}\" in target {target_name}"
2024-07-06 20:01:45 -07:00
)
2024-07-06 20:43:11 -07:00
# Only override selected target if this one has less SSIDs,
# or there is no currently selected target
if selected_target is None:
self.__logger.log(
f"Found first suitable target: {target_name}"
)
selected_target = target
if len(target.ssids) < len(selected_target.ssids):
self.__logger.log(
f"Target \"{target_name}\""
f" seems to be a better match than \"{selected_target.name}\""
f" because it has fewer specified SSIDs"
f" ({len(target.ssids)} vs. {len(selected_target.ssids)})"
)
selected_target = target
if selected_target is None:
selected_target = self.__config.targets[self.__config.default_target_name]
self.__logger.log(f"No suitable target found; Defaulting to: {selected_target.name}")
return selected_target
2024-07-06 20:01:45 -07:00
@staticmethod
def get_interface_ssid(interface_name):
#
p = subprocess.Popen(["nmcli", "dev", "show", interface_name], stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
stdout = stdout.decode()
#
r = re.compile(
pattern=r"^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$",
flags=re.MULTILINE
)
match = r.search(stdout)
return match.group("ssid")