27 Commits
v1.0.0 ... dev

Author SHA1 Message Date
3b11b46a39 Might be done upgrading 2024-07-06 20:43:11 -07:00
1dcb08352a refactor/upgrade work 2024-07-06 20:01:45 -07:00
e96c505367 Environment initializer 2024-07-06 17:36:51 -07:00
ec03d095d9 move main script 2024-07-06 17:36:45 -07:00
c8086b0901 Start using pipenv 2024-07-06 17:36:33 -07:00
83d2b7bc9b start git ignore 2024-07-06 17:36:26 -07:00
b3d4729432 Noop to test hook 2019-08-06 16:55:07 -07:00
c660fbeec5 Merge branch 'dev' 2018-10-02 02:27:14 -07:00
e86f96d16d Exit more gracefully when the action command isn't "up" 2018-10-02 02:26:42 -07:00
c7ab0bcb5d Track down bug where default section didn't work without SSIDs
(also increase verbosity)
2018-10-02 02:21:40 -07:00
19f3be3c91 Merge branch 'dev' 2018-05-27 04:35:36 -07:00
966f5d48ab Uhhh 2018-05-27 04:32:53 -07:00
e590ea5aaa Removed test file 2018-05-27 04:25:45 -07:00
6f4cff383f Just testing 2018-05-27 04:25:05 -07:00
47a725ed4d Merge branch 'dev' 2018-05-27 04:04:18 -07:00
720c96f494 Updates 2018-05-27 04:03:47 -07:00
e00775cd67 Updates 2018-05-27 04:03:47 -07:00
f7bbc52cfb Merge branch 'master' into dev 2018-05-27 03:56:03 -07:00
d26f7169ec Merge branch 'master' of https://github.com/mikeperalta1/connection-specific-ssh-config 2018-05-27 03:55:51 -07:00
ec7d5c5fcc Update README.md 2018-05-27 03:54:54 -07:00
0ff18ea0de Update README.md 2018-05-27 03:50:45 -07:00
ce5dbc6886 Update README.md 2018-05-27 03:49:48 -07:00
2487c4141a Update README.md 2018-05-27 03:45:02 -07:00
71125cd072 Update README.md 2018-05-27 03:28:41 -07:00
43dab05942 Add exec perms 2018-05-27 02:41:12 -07:00
0bcbf12212 Add files via upload 2018-05-27 02:37:12 -07:00
c1fcebb5e5 Initial commit 2018-05-27 02:36:31 -07:00
11 changed files with 700 additions and 229 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea/

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.12.4

12
Pipfile Normal file
View File

@ -0,0 +1,12 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
pyaml = "*"
[dev-packages]
[requires]
python_version = "3.12"

87
Pipfile.lock generated Normal file
View File

@ -0,0 +1,87 @@
{
"_meta": {
"hash": {
"sha256": "ca9ff7c7d2ec2dd861e573b423daae217bde726c51532a624cd495d82bdc4662"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.12"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"pyaml": {
"hashes": [
"sha256:0e483d9289010e747a325dc43171bcc39d6562dd1dd4719e8cc7e7c96c99fce6",
"sha256:acc2b39c55cb0cbe4f694a6d3886f89ad3d2a5b3efcece526202f8de9a6b27de"
],
"index": "pypi",
"markers": "python_version >= '3.8'",
"version": "==24.4.0"
},
"pyyaml": {
"hashes": [
"sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5",
"sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc",
"sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df",
"sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741",
"sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206",
"sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27",
"sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595",
"sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62",
"sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98",
"sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696",
"sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290",
"sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9",
"sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d",
"sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6",
"sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867",
"sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47",
"sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486",
"sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6",
"sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3",
"sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007",
"sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938",
"sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0",
"sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c",
"sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735",
"sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d",
"sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28",
"sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4",
"sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba",
"sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8",
"sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef",
"sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5",
"sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd",
"sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3",
"sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0",
"sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515",
"sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c",
"sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c",
"sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924",
"sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34",
"sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43",
"sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859",
"sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673",
"sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54",
"sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a",
"sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b",
"sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab",
"sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa",
"sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c",
"sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585",
"sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d",
"sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"
],
"markers": "python_version >= '3.6'",
"version": "==6.0.1"
}
},
"develop": {}
}

View File

@ -40,11 +40,13 @@ You can also try using this script with a different daemon (or your own custom s
6. Create a short bash script inside the dispatcher directory that will launch connection-specific-ssh-config. For instance, you might name this file ```/etc/NetworkManager/dispatcher.d/99-launch-connection-specific-ssh-config```
7. Inside your launcher script, put the following contents:
```
#!/bin/bash
/path/to/connection-specific-ssh-config "$1" "$2" "/path/to/connection-specific-ssh-config.ini"
```
This will take the two important variables which NetworkManager has passed along to your launcher script, and pass them along to connection-specific-ssh-config
8. Repeat steps 2-7 for each additional user who would like connection based ssh configuration files.
@ -101,4 +103,6 @@ ssh_config_name = config-home
The above configuration file will instruct the script to use the ssh configuration file */home/mike/.ssh/config-work* while you're connected to one of your work connections *Work Connection - Main Office* or *Work Connection - Garys Office*, on either of your wireless adapters *wlan0* or *wlan1*. When you get home and connect to your *My Home Connection (Oh Joy)* connection on *wlan0*, the config file */home/mike/.ssh/config-home* will be used instead. Finally, the configuration file */home/mike/.ssh/config-default* will be used when *wlan0* connects to some undefined network.
The script works by simply creating a symlink where your original ssh configuration file was (typically *~/.ssh/config*), pointing to the ssh configuration determined to be the one you want active. Note that this of course means the script will fail (for safety reasons), if your original ssh configuration file still exists as a normal file.

View File

@ -1,229 +0,0 @@
#!/usr/bin/env python3
#
import configparser
import json
import logging
import os
import re
import subprocess
import sys
import syslog
#
class SSHConfiger:
#
_action_interface = None
_action_command = None
#
_config_file_path = None
_targets = None
_config = None
#
def __init__(self, action_interface, action_command, config_file_path):
# Grab args
self._action_interface = action_interface
self._action_command = action_command
self._config_file_path = config_file_path
#
def log(self, s):
#
print ("[SSHConfiger]", s)
syslog.syslog("[SSHConfiger] " + s)
#
def complain(self, s):
#
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
print("[SSHConfiger]", s, file=sys.stderr)
#
def die(self, s):
#
self.complain(s)
raise Exception(s)
#
def run(self):
#
self.log("Running for interface \""+ self._action_interface +"\": " + self._action_command)
# Parse the config
self.parse_config()
# Grab the specified ssh directory
if ( "ssh_dir" not in self._config.keys() ):
self.die("Config file needs key \"ssh_dir\" inside \"config\" section")
ssh_dir = self._config["ssh_dir"]
# Determine which ssh config file we should use
ssh_config_name = self.determine_ssh_config_name()
# Make paths
ssh_config_path_link = os.path.join(ssh_dir, "config")
ssh_config_path_target = os.path.join(ssh_dir, ssh_config_name)
self.log("Selecting source config file \"" + ssh_config_path_target + "\" for link \"" + ssh_config_path_link + "\"")
# Don't run unless current ssh config is a symlink or not a file (for safety)
self.require_symlink_or_none(ssh_config_path_link)
# Set the symlink
try:
os.unlink(ssh_config_path_link)
except:
pass
os.symlink(ssh_config_path_target, ssh_config_path_link)
#
self.log("Finished")
#
def require_symlink_or_none(self, file_path):
#
if ( not os.path.isfile(file_path) or os.path.islink(file_path) ):
return True
#
self.die("For safety, we cannot continue if the target link exists and is a file (" + file_path + ")")
#
def determine_ssh_config_name(self):
# Only run if an interface is coming up
if ( self._action_command != "up" ):
return None
# Check each section
found_ssh_config_name = None
for section_name in self._targets:
#
section = self._targets[section_name]
# Must match at least one interface
if ( self._action_interface not in section["adapters"] ):
continue
#
interface_ssid = self.get_interface_ssid(self._action_interface)
# Must also match at least one SSID
if ( interface_ssid not in section["ssids"] ):
continue
# Found a match!
found_ssh_config_name = section["ssh_config_name"]
# Didn't find anything? Go default ...
if (found_ssh_config_name == None):
if ( "default" in self._targets.keys() ):
if ( "ssh_config_name" in self._targets["default"].keys() ):
found_ssh_config_name = self._targets["default"]["ssh_config_name"]
self.log("No matches found; Defaulting to:" + found_ssh_config_name)
#
return found_ssh_config_name
#
def get_interface_ssid(self, interface):
#
p = subprocess.Popen(["nmcli", "dev", "show", interface], stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
stdout = stdout.decode()
#
r = re.compile("""^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$""", re.MULTILINE)
match = r.search(stdout)
return match.group("ssid")
#
def parse_config(self):
#
self.log("Parsing config: " + self._config_file_path)
parser = configparser.ConfigParser()
parser.read( self._config_file_path )
# Attempt to grab global config
if ( "config" not in parser.sections() ):
self.die("config section not found")
config = parser["config"]
# Targets
targets = {}
for section in parser.sections():
# Skip the config section
if ( section == "config" ):
continue
#
target = {
"adapters" : [],
"ssids" : []
}
# Adapters
if ( parser.has_option(section, "adapters") ):
target["adapters"] = json.loads( parser[section]["adapters"] )
if ( parser.has_option(section, "adapter") ):
target["adapters"].append( parser[section]["adapter"] )
# SSIDs
if ( parser.has_option(section, "ssids") ):
target["ssids"] = json.loads( parser[section]["ssids"] )
if ( parser.has_option(section, "ssid") ):
target["ssids"].append( parser[section]["ssid"] )
# ssh_config_name
if ( parser.has_option(section, "ssh_config_name") ):
target["ssh_config_name"] = parser[section]["ssh_config_name"]
else:
raise Exception("ssh_config_name key missing from section: " + section)
#
targets[section] = target
#
self._config = config
self._targets = targets
return True
# Main Entry
if (__name__ == "__main__"):
# Script name
script_name = sys.argv[0]
# Network Manager action info
action_interface = sys.argv[1]
action_command = sys.argv[2]
# Config file
config_file_path = sys.argv[3]
#
configger = SSHConfiger(action_interface, action_command, config_file_path)
configger.run()

20
domain/Logger.py Normal file
View File

@ -0,0 +1,20 @@
import sys
import syslog
class Logger:
@staticmethod
def log(s):
print("[SSHConfiger]", s)
syslog.syslog("[SSHConfiger] " + s)
#
@staticmethod
def complain(s):
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
print("[SSHConfiger]", s, file=sys.stderr)

196
domain/SSHConfigChanger.py Executable file
View File

@ -0,0 +1,196 @@
from domain.config.Config import Config
from domain.config.Config import Target
from domain.Logger import Logger
import os
from pathlib import Path
import re
import subprocess
import sys
class SSHConfigChanger:
def __init__(
self,
action_interface, action_command,
config_file_path,
dry_run: bool = False,
):
self.__logger = Logger(
)
self.__config = Config(
logger=self.__logger,
file_path=config_file_path
)
if dry_run:
self.__logger.complain(f"Dry run enabled at runtime")
self.__config.dry_run = True
# Grab args
self.__action_interface = action_interface
self.__action_command = action_command
def die(self, s):
#
self.__logger.complain(s)
raise Exception(s)
def quit(self, s):
#
self.__logger.log("Quitting because: " + s)
sys.exit(0)
#
def run(self):
self.__logger.log(
f"Running for interface {self.__action_interface}: {self.__action_command}"
)
# Only run if an interface is coming up
if self.__action_command != "up":
self.quit(f"We don't need to run for action command: {self.__action_command}")
# Determine which ssh config file we should use
ssh_config_target = self.determine_ssh_config_target()
if ssh_config_target is None:
self.die("Unable to determine appropriate ssh config name; Quitting")
self.__logger.log(
f"Determined ssh config name: {ssh_config_target.name}"
)
# Make paths
ssh_config_path_link = self.__config.ssh_dir / self.__config.default_normal_ssh_config_file_name
ssh_config_path_target = self.__config.ssh_dir / ssh_config_target.ssh_config_file_name
self.__logger.log(
f"Selecting source config file \"{ssh_config_path_target}\""
f" for link \"{ssh_config_path_link}\""
)
# Don't run unless current ssh config is a symlink or not a file (for safety)
self.require_symlink_or_none(ssh_config_path_link)
if self.__config.dry_run:
self.__logger.complain(
f"Dry run enabled; Won't unlink existing symlink: {ssh_config_path_link}"
)
self.__logger.complain(
f"Dry run enabled; Won't create symlink: {ssh_config_path_link} ==> {ssh_config_path_target}"
)
else:
# Set the symlink
if ssh_config_path_link.exists():
try:
ssh_config_path_link.unlink()
except FileNotFoundError:
pass
ssh_config_path_link.symlink_to(ssh_config_path_target, target_is_directory=False)
#
self.__logger.log("Finished")
#
def require_symlink_or_none(self, file_path: Path):
if file_path.is_file() and file_path.exists() and not file_path.is_symlink():
self.die(
f"For safety, refusing to continue because the target ssh config file exists and is not a symlink:"
f" {file_path}"
)
def determine_ssh_config_target(self) -> Target:
#
self.__logger.log("Attempting to determine SSH config name")
# Start off by assuming the default target
# noinspection PyTypeChecker
selected_target = None
selected_target: Target
# Check each section
for target_name in self.__config.targets:
self.__logger.log(f"Examining target: {target_name}")
if selected_target is not None and target_name == selected_target.name:
self.__logger.log(f"Ignoring target because it is already selected: {target_name}")
continue
target = self.__config.targets[target_name]
# Matches, if current interface found in adapters
if self.__action_interface not in target.adapters:
self.__logger.log(
f"Target \"{target_name}\" didn't match any interfaces; Skipping"
)
continue
# Grab the SSID this adapter is currently connected to
interface_ssid = self.get_interface_ssid(self.__action_interface)
if not interface_ssid:
self.__logger.log(
f"Interface \"{interface_ssid}\" isn't connected to anything; Done looking"
)
break
self.__logger.log(
f"Interface \"{self.__action_interface}\" is currently connected to: \"{interface_ssid}\""
)
# Must also match at least one SSID
if interface_ssid in target.ssids:
self.__logger.log(
f"Found SSID \"{interface_ssid}\" in target {target_name}"
)
# Only override selected target if this one has less SSIDs,
# or there is no currently selected target
if selected_target is None:
self.__logger.log(
f"Found first suitable target: {target_name}"
)
selected_target = target
if len(target.ssids) < len(selected_target.ssids):
self.__logger.log(
f"Target \"{target_name}\""
f" seems to be a better match than \"{selected_target.name}\""
f" because it has fewer specified SSIDs"
f" ({len(target.ssids)} vs. {len(selected_target.ssids)})"
)
selected_target = target
if selected_target is None:
selected_target = self.__config.targets[self.__config.default_target_name]
self.__logger.log(f"No suitable target found; Defaulting to: {selected_target.name}")
return selected_target
@staticmethod
def get_interface_ssid(interface_name):
#
p = subprocess.Popen(["nmcli", "dev", "show", interface_name], stdout=subprocess.PIPE)
(stdout, stderr) = p.communicate()
stdout = stdout.decode()
#
r = re.compile(
pattern=r"^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$",
flags=re.MULTILINE
)
match = r.search(stdout)
return match.group("ssid")

278
domain/config/Config.py Normal file
View File

@ -0,0 +1,278 @@
from domain.Logger import Logger
import os
from pathlib import Path
import yaml
class Target:
def __init__(self, logger: Logger, name):
self.__logger = logger
self.__name = name
self.__data = {}
self.__adapters_names = []
self.__ssids = []
# noinspection PyTypeChecker
self.__ssh_config_file_name: str = None
def __str__(self):
s = ""
s += f"Target: {self.__name}"
s += f"\n> SSH config file name: {self.__ssh_config_file_name}"
s += f"\n> Adapters: "
if len(self.__adapters_names) > 0:
s += ", ".join(self.__adapters_names)
else:
s += "[none]"
s += f"\n> SSIDs: "
if len(self.__ssids) > 0:
s += ", ".join(self.__ssids)
else:
s += "[none]"
return s
def log(self, s):
self.__logger.log(
f"[Target::{self.__name}] {s}"
)
def complain(self, s):
self.__logger.complain(
f"[Target::{self.__name}] {s}"
)
def consume_data(self, data: dict):
assert isinstance(data, dict), (
f"Data should be a dict (found: {type(data).__name__}) "
)
self.__data = data
assert "config-file-name" in self.__data.keys(), (
f"Name of ssh config file must be present at config-file-name"
)
config_file_name = self.__data["config-file-name"]
assert isinstance(config_file_name, str), (
f"config-file-name must be a string, but got: {type(config_file_name).__name__}"
)
self.__ssh_config_file_name = config_file_name
if "adapters" in self.__data.keys():
adapters = self.__data["adapters"]
if isinstance(adapters, list):
pass
elif isinstance(adapters, str):
adapters = [adapters]
else:
raise AssertionError(f"Unsupported adapters data type: {type(adapters).__name__}")
self.__adapters_names.extend(adapters)
if "adapter" in self.__data.keys():
adapters = self.__data["adapter"]
if isinstance(adapters, list):
pass
elif isinstance(adapters, str):
adapters = [adapters]
else:
raise AssertionError(f"Unsupported adapter data type: {type(adapters).__name__}")
self.__adapters_names.extend(adapters)
if "ssids" in self.__data.keys():
ssids = self.__data["ssids"]
if isinstance(ssids, list):
pass
elif isinstance(ssids, str):
ssids = [ssids]
else:
raise AssertionError(f"Unsupported ssids data type: {type(ssids).__name__}")
self.__ssids.extend(ssids)
if "ssid" in self.__data.keys():
ssids = self.__data["ssid"]
if isinstance(ssids, list):
pass
elif isinstance(ssids, str):
ssids = [ssids]
else:
raise AssertionError(f"Unsupported ssid data type: {type(ssids).__name__}")
self.__ssids.extend(ssids)
assert len(self.__adapters_names) > 0, (
f"At least one adapter must be configured at target-name::adapters"
)
@property
def name(self) -> str:
return self.__name
@property
def ssh_config_file_name(self) -> str:
return self.__ssh_config_file_name
@property
def adapters(self) -> list[str]:
return self.__adapters_names
@property
def ssids(self) -> list[str]:
return self.__ssids
class Config:
__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME = "config"
__DEFAULT_SSH_DIRECTORY_NAME = ".ssh"
def __init__(self, logger: Logger, file_path: str):
self.__logger = logger
if isinstance(file_path, str):
file_path = Path(file_path)
elif isinstance(file_path, Path):
pass
else:
raise AssertionError("File path should be a string or Path object")
self.__file_path = file_path
self.__data = {}
self.__dry_run = False
self.__ssh_dir = Path(os.path.expanduser("~")) / self.__DEFAULT_SSH_DIRECTORY_NAME
# noinspection PyTypeChecker
self.__default_target_name: str = None
self.__targets = {}
self._load_config()
self._consume_config()
print(self)
def __str__(self):
s = ""
s += "*** Config ***"
s += "\n Dry run: " + "True" if self.__dry_run else "False"
for target in self.__targets.values():
s += "\n" + str(target)
return s
def _load_config(self):
assert self.__file_path.exists(), "Config file must exist"
with open(self.__file_path) as f:
self.__data = yaml.safe_load(f)
def _consume_config(self):
assert isinstance(self.__data, dict), (
f"Config data must be a dict"
)
assert "options" in self.__data.keys(), (
f"Options key missing from config"
)
options = self.__data["options"]
assert isinstance(options, dict), "Config options must be a dict!"
if "dry-run" in options.keys():
d = options["dry-run"]
assert isinstance(d, bool), "options::dry-run must be a bool"
if d:
self.__logger.complain(f"Dry run enabled in config")
self.__dry_run = d
if "ssh-dir" in options.keys():
ssh_dir = Path(options["ssh-dir"])
assert ssh_dir.exists(), f"options::ssh-dir must be a valid directory"
self.__ssh_dir = ssh_dir
self.__logger.log(f"Found ssh dir: {self.__ssh_dir}")
else:
self.__logger.log(f"options::ssh-dir not found")
assert "default-target" in options.keys(), (
f"Must specify the name of the default target at options::default-target"
)
default_target_name = options["default-target"]
assert isinstance(default_target_name, str), (
f"Default target name must be a string but got: {type(default_target_name).__name__}"
)
self.__default_target_name = default_target_name
self.__targets = {}
assert "targets" in self.__data.keys(), "Config should specify targets"
targets = self.__data["targets"]
assert isinstance(targets, dict), "Targets should be a dict, where each key is one target"
for target_name in targets.keys():
self.__logger.log(f"Parsing target: {target_name}")
try:
t = Target(
logger=self.__logger,
name=target_name,
)
t.consume_data(data=targets[target_name])
except AssertionError as e:
self.__logger.complain(
f"Failed to parse target \"{target_name}\""
f"\n{e}"
)
raise e
self.__targets[target_name] = t
if self.__default_target_name not in self.__targets.keys():
raise AssertionError(
f"Default target specified as {self.__default_target_name} but was not found in dict of targets"
)
@property
def default_normal_ssh_config_file_name(self) -> str:
return self.__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME
@property
def file_path(self) -> Path:
return self.__file_path
@property
def dry_run(self) -> bool:
return self.__dry_run
@dry_run.setter
def dry_run(self, b: bool):
self.__dry_run = b
@property
def default_target_name(self) -> str:
return self.__default_target_name
@property
def ssh_dir(self) -> Path | None:
return self.__ssh_dir
@property
def targets(self) -> [Target]:
return self.__targets

41
init-environment Executable file
View File

@ -0,0 +1,41 @@
#!/bin/bash
log()
{
echo "[Connection Specific SSH Config - Init Env] $1"
}
complain()
{
echo "[Connection Specific SSH Config - Init Env] $1" 1>&2
}
die()
{
complain "Fatal: $1"
exit 1
}
SCRIPT_PATH=$(readlink -f "$0")
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
SCRIPT_NAME=$(basename "$SCRIPT_PATH")
log "Begin ${SCRIPT_NAME}"
log "Script path: ${SCRIPT_PATH}"
log "Script dir: ${SCRIPT_DIR}"
log "PATH: ${PATH}"
log "PWD before switching: $(pwd)"
cd "${SCRIPT_DIR}" || die "Failed to switch to project directory: ${SCRIPT_DIR}"
log "PWD after switching: $(pwd)"
log "Ensuring python installation with pyenv"
pyenv versions
pyenv install --skip-existing || die "Failed to ensure python installation with pyenv"
log "Installing/upgrading pip and pipenv"
pip install --upgrade pip pipenv || die "Failed to install/upgrade pip and pipenv"
log "Syncing pip environment with pipenv"
pipenv --rm # Don't die because this will return an error if the env didn't already exist
pipenv install || die "Failed to install pip environment with pipenv"
pipenv sync || die "Failed to sync pip environment with pipenv"

58
main.py Normal file
View File

@ -0,0 +1,58 @@
from domain.SSHConfigChanger import SSHConfigChanger
import argparse
def main():
parser = argparse.ArgumentParser(
prog="Mike's SSH Config Changer"
)
parser.add_argument(
"--dry-run", "-d",
dest="dry_run",
default=False,
action="store_true",
help="Pass this flag to print what would be changed, without actually changing anything"
)
parser.add_argument(
"--config", "-c",
dest="config_file",
required=True,
help="Specify path to configuration file",
)
parser.add_argument(
"--interface", "--action-interface",
dest="action_interface",
required=True,
help="Specify the interface specified by NetworkManager's action"
)
parser.add_argument(
"--command", "--action-command",
dest="action_command",
required=True,
help="Specify the command specified by NetworkManager's action"
)
args = parser.parse_args()
configger = SSHConfigChanger(
dry_run=args.dry_run,
action_interface=args.action_interface,
action_command=args.action_command,
config_file_path=args.config_file,
)
configger.run()
# Main Entry
if __name__ == "__main__":
main()