Compare commits
27 Commits
Author | SHA1 | Date | |
---|---|---|---|
3b11b46a39 | |||
1dcb08352a | |||
e96c505367 | |||
ec03d095d9 | |||
c8086b0901 | |||
83d2b7bc9b | |||
b3d4729432 | |||
c660fbeec5 | |||
e86f96d16d | |||
c7ab0bcb5d | |||
19f3be3c91 | |||
966f5d48ab | |||
e590ea5aaa | |||
6f4cff383f | |||
47a725ed4d | |||
720c96f494 | |||
e00775cd67 | |||
f7bbc52cfb | |||
d26f7169ec | |||
ec7d5c5fcc | |||
0ff18ea0de | |||
ce5dbc6886 | |||
2487c4141a | |||
71125cd072 | |||
43dab05942 | |||
0bcbf12212 | |||
c1fcebb5e5 |
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
|
||||
.idea/
|
||||
|
1
.python-version
Normal file
1
.python-version
Normal file
@ -0,0 +1 @@
|
||||
3.12.4
|
12
Pipfile
Normal file
12
Pipfile
Normal file
@ -0,0 +1,12 @@
|
||||
[[source]]
|
||||
url = "https://pypi.org/simple"
|
||||
verify_ssl = true
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
pyaml = "*"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
[requires]
|
||||
python_version = "3.12"
|
87
Pipfile.lock
generated
Normal file
87
Pipfile.lock
generated
Normal file
@ -0,0 +1,87 @@
|
||||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "ca9ff7c7d2ec2dd861e573b423daae217bde726c51532a624cd495d82bdc4662"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
"python_version": "3.12"
|
||||
},
|
||||
"sources": [
|
||||
{
|
||||
"name": "pypi",
|
||||
"url": "https://pypi.org/simple",
|
||||
"verify_ssl": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"default": {
|
||||
"pyaml": {
|
||||
"hashes": [
|
||||
"sha256:0e483d9289010e747a325dc43171bcc39d6562dd1dd4719e8cc7e7c96c99fce6",
|
||||
"sha256:acc2b39c55cb0cbe4f694a6d3886f89ad3d2a5b3efcece526202f8de9a6b27de"
|
||||
],
|
||||
"index": "pypi",
|
||||
"markers": "python_version >= '3.8'",
|
||||
"version": "==24.4.0"
|
||||
},
|
||||
"pyyaml": {
|
||||
"hashes": [
|
||||
"sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5",
|
||||
"sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc",
|
||||
"sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df",
|
||||
"sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741",
|
||||
"sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206",
|
||||
"sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27",
|
||||
"sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595",
|
||||
"sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62",
|
||||
"sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98",
|
||||
"sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696",
|
||||
"sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290",
|
||||
"sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9",
|
||||
"sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d",
|
||||
"sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6",
|
||||
"sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867",
|
||||
"sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47",
|
||||
"sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486",
|
||||
"sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6",
|
||||
"sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3",
|
||||
"sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007",
|
||||
"sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938",
|
||||
"sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0",
|
||||
"sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c",
|
||||
"sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735",
|
||||
"sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d",
|
||||
"sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28",
|
||||
"sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4",
|
||||
"sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba",
|
||||
"sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8",
|
||||
"sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef",
|
||||
"sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5",
|
||||
"sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd",
|
||||
"sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3",
|
||||
"sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0",
|
||||
"sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515",
|
||||
"sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c",
|
||||
"sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c",
|
||||
"sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924",
|
||||
"sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34",
|
||||
"sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43",
|
||||
"sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859",
|
||||
"sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673",
|
||||
"sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54",
|
||||
"sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a",
|
||||
"sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b",
|
||||
"sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab",
|
||||
"sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa",
|
||||
"sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c",
|
||||
"sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585",
|
||||
"sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d",
|
||||
"sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"
|
||||
],
|
||||
"markers": "python_version >= '3.6'",
|
||||
"version": "==6.0.1"
|
||||
}
|
||||
},
|
||||
"develop": {}
|
||||
}
|
@ -40,11 +40,13 @@ You can also try using this script with a different daemon (or your own custom s
|
||||
6. Create a short bash script inside the dispatcher directory that will launch connection-specific-ssh-config. For instance, you might name this file ```/etc/NetworkManager/dispatcher.d/99-launch-connection-specific-ssh-config```
|
||||
|
||||
7. Inside your launcher script, put the following contents:
|
||||
|
||||
```
|
||||
#!/bin/bash
|
||||
|
||||
/path/to/connection-specific-ssh-config "$1" "$2" "/path/to/connection-specific-ssh-config.ini"
|
||||
```
|
||||
|
||||
This will take the two important variables which NetworkManager has passed along to your launcher script, and pass them along to connection-specific-ssh-config
|
||||
|
||||
8. Repeat steps 2-7 for each additional user who would like connection based ssh configuration files.
|
||||
@ -101,4 +103,6 @@ ssh_config_name = config-home
|
||||
|
||||
The above configuration file will instruct the script to use the ssh configuration file */home/mike/.ssh/config-work* while you're connected to one of your work connections *Work Connection - Main Office* or *Work Connection - Garys Office*, on either of your wireless adapters *wlan0* or *wlan1*. When you get home and connect to your *My Home Connection (Oh Joy)* connection on *wlan0*, the config file */home/mike/.ssh/config-home* will be used instead. Finally, the configuration file */home/mike/.ssh/config-default* will be used when *wlan0* connects to some undefined network.
|
||||
|
||||
The script works by simply creating a symlink where your original ssh configuration file was (typically *~/.ssh/config*), pointing to the ssh configuration determined to be the one you want active. Note that this of course means the script will fail (for safety reasons), if your original ssh configuration file still exists as a normal file.
|
||||
|
||||
|
||||
|
@ -1,229 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
|
||||
#
|
||||
import configparser
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import syslog
|
||||
|
||||
|
||||
#
|
||||
class SSHConfiger:
|
||||
|
||||
#
|
||||
_action_interface = None
|
||||
_action_command = None
|
||||
#
|
||||
_config_file_path = None
|
||||
_targets = None
|
||||
_config = None
|
||||
|
||||
#
|
||||
def __init__(self, action_interface, action_command, config_file_path):
|
||||
|
||||
# Grab args
|
||||
self._action_interface = action_interface
|
||||
self._action_command = action_command
|
||||
self._config_file_path = config_file_path
|
||||
|
||||
#
|
||||
def log(self, s):
|
||||
|
||||
#
|
||||
print ("[SSHConfiger]", s)
|
||||
syslog.syslog("[SSHConfiger] " + s)
|
||||
|
||||
#
|
||||
def complain(self, s):
|
||||
|
||||
#
|
||||
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
|
||||
print("[SSHConfiger]", s, file=sys.stderr)
|
||||
|
||||
#
|
||||
def die(self, s):
|
||||
|
||||
#
|
||||
self.complain(s)
|
||||
raise Exception(s)
|
||||
|
||||
#
|
||||
def run(self):
|
||||
|
||||
#
|
||||
self.log("Running for interface \""+ self._action_interface +"\": " + self._action_command)
|
||||
|
||||
# Parse the config
|
||||
self.parse_config()
|
||||
|
||||
# Grab the specified ssh directory
|
||||
if ( "ssh_dir" not in self._config.keys() ):
|
||||
self.die("Config file needs key \"ssh_dir\" inside \"config\" section")
|
||||
ssh_dir = self._config["ssh_dir"]
|
||||
|
||||
# Determine which ssh config file we should use
|
||||
ssh_config_name = self.determine_ssh_config_name()
|
||||
|
||||
# Make paths
|
||||
ssh_config_path_link = os.path.join(ssh_dir, "config")
|
||||
ssh_config_path_target = os.path.join(ssh_dir, ssh_config_name)
|
||||
self.log("Selecting source config file \"" + ssh_config_path_target + "\" for link \"" + ssh_config_path_link + "\"")
|
||||
|
||||
# Don't run unless current ssh config is a symlink or not a file (for safety)
|
||||
self.require_symlink_or_none(ssh_config_path_link)
|
||||
|
||||
# Set the symlink
|
||||
try:
|
||||
os.unlink(ssh_config_path_link)
|
||||
except:
|
||||
pass
|
||||
os.symlink(ssh_config_path_target, ssh_config_path_link)
|
||||
|
||||
#
|
||||
self.log("Finished")
|
||||
|
||||
#
|
||||
def require_symlink_or_none(self, file_path):
|
||||
|
||||
#
|
||||
if ( not os.path.isfile(file_path) or os.path.islink(file_path) ):
|
||||
return True
|
||||
|
||||
#
|
||||
self.die("For safety, we cannot continue if the target link exists and is a file (" + file_path + ")")
|
||||
|
||||
|
||||
#
|
||||
def determine_ssh_config_name(self):
|
||||
|
||||
# Only run if an interface is coming up
|
||||
if ( self._action_command != "up" ):
|
||||
return None
|
||||
|
||||
# Check each section
|
||||
found_ssh_config_name = None
|
||||
for section_name in self._targets:
|
||||
|
||||
#
|
||||
section = self._targets[section_name]
|
||||
|
||||
# Must match at least one interface
|
||||
if ( self._action_interface not in section["adapters"] ):
|
||||
continue
|
||||
|
||||
#
|
||||
interface_ssid = self.get_interface_ssid(self._action_interface)
|
||||
|
||||
# Must also match at least one SSID
|
||||
if ( interface_ssid not in section["ssids"] ):
|
||||
continue
|
||||
|
||||
# Found a match!
|
||||
found_ssh_config_name = section["ssh_config_name"]
|
||||
|
||||
# Didn't find anything? Go default ...
|
||||
if (found_ssh_config_name == None):
|
||||
if ( "default" in self._targets.keys() ):
|
||||
if ( "ssh_config_name" in self._targets["default"].keys() ):
|
||||
found_ssh_config_name = self._targets["default"]["ssh_config_name"]
|
||||
self.log("No matches found; Defaulting to:" + found_ssh_config_name)
|
||||
|
||||
#
|
||||
return found_ssh_config_name
|
||||
|
||||
#
|
||||
def get_interface_ssid(self, interface):
|
||||
|
||||
#
|
||||
p = subprocess.Popen(["nmcli", "dev", "show", interface], stdout=subprocess.PIPE)
|
||||
(stdout, stderr) = p.communicate()
|
||||
stdout = stdout.decode()
|
||||
|
||||
#
|
||||
r = re.compile("""^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$""", re.MULTILINE)
|
||||
match = r.search(stdout)
|
||||
|
||||
return match.group("ssid")
|
||||
|
||||
#
|
||||
def parse_config(self):
|
||||
|
||||
#
|
||||
self.log("Parsing config: " + self._config_file_path)
|
||||
parser = configparser.ConfigParser()
|
||||
parser.read( self._config_file_path )
|
||||
|
||||
# Attempt to grab global config
|
||||
if ( "config" not in parser.sections() ):
|
||||
self.die("config section not found")
|
||||
config = parser["config"]
|
||||
|
||||
# Targets
|
||||
targets = {}
|
||||
for section in parser.sections():
|
||||
|
||||
# Skip the config section
|
||||
if ( section == "config" ):
|
||||
continue
|
||||
|
||||
#
|
||||
target = {
|
||||
"adapters" : [],
|
||||
"ssids" : []
|
||||
}
|
||||
|
||||
# Adapters
|
||||
if ( parser.has_option(section, "adapters") ):
|
||||
target["adapters"] = json.loads( parser[section]["adapters"] )
|
||||
if ( parser.has_option(section, "adapter") ):
|
||||
target["adapters"].append( parser[section]["adapter"] )
|
||||
|
||||
# SSIDs
|
||||
if ( parser.has_option(section, "ssids") ):
|
||||
target["ssids"] = json.loads( parser[section]["ssids"] )
|
||||
if ( parser.has_option(section, "ssid") ):
|
||||
target["ssids"].append( parser[section]["ssid"] )
|
||||
|
||||
# ssh_config_name
|
||||
if ( parser.has_option(section, "ssh_config_name") ):
|
||||
target["ssh_config_name"] = parser[section]["ssh_config_name"]
|
||||
else:
|
||||
raise Exception("ssh_config_name key missing from section: " + section)
|
||||
|
||||
#
|
||||
targets[section] = target
|
||||
|
||||
#
|
||||
self._config = config
|
||||
self._targets = targets
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Main Entry
|
||||
if (__name__ == "__main__"):
|
||||
|
||||
# Script name
|
||||
script_name = sys.argv[0]
|
||||
|
||||
# Network Manager action info
|
||||
action_interface = sys.argv[1]
|
||||
action_command = sys.argv[2]
|
||||
|
||||
# Config file
|
||||
config_file_path = sys.argv[3]
|
||||
|
||||
#
|
||||
configger = SSHConfiger(action_interface, action_command, config_file_path)
|
||||
configger.run()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
20
domain/Logger.py
Normal file
20
domain/Logger.py
Normal file
@ -0,0 +1,20 @@
|
||||
|
||||
|
||||
import sys
|
||||
import syslog
|
||||
|
||||
|
||||
class Logger:
|
||||
|
||||
@staticmethod
|
||||
def log(s):
|
||||
|
||||
print("[SSHConfiger]", s)
|
||||
syslog.syslog("[SSHConfiger] " + s)
|
||||
|
||||
#
|
||||
@staticmethod
|
||||
def complain(s):
|
||||
|
||||
syslog.syslog(syslog.LOG_ERR, "[SSHConfiger] " + s)
|
||||
print("[SSHConfiger]", s, file=sys.stderr)
|
196
domain/SSHConfigChanger.py
Executable file
196
domain/SSHConfigChanger.py
Executable file
@ -0,0 +1,196 @@
|
||||
|
||||
|
||||
from domain.config.Config import Config
|
||||
from domain.config.Config import Target
|
||||
from domain.Logger import Logger
|
||||
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
class SSHConfigChanger:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
action_interface, action_command,
|
||||
config_file_path,
|
||||
dry_run: bool = False,
|
||||
):
|
||||
self.__logger = Logger(
|
||||
|
||||
)
|
||||
|
||||
self.__config = Config(
|
||||
logger=self.__logger,
|
||||
file_path=config_file_path
|
||||
)
|
||||
if dry_run:
|
||||
self.__logger.complain(f"Dry run enabled at runtime")
|
||||
self.__config.dry_run = True
|
||||
|
||||
# Grab args
|
||||
self.__action_interface = action_interface
|
||||
self.__action_command = action_command
|
||||
|
||||
def die(self, s):
|
||||
|
||||
#
|
||||
self.__logger.complain(s)
|
||||
raise Exception(s)
|
||||
|
||||
def quit(self, s):
|
||||
|
||||
#
|
||||
self.__logger.log("Quitting because: " + s)
|
||||
sys.exit(0)
|
||||
|
||||
#
|
||||
def run(self):
|
||||
|
||||
self.__logger.log(
|
||||
f"Running for interface {self.__action_interface}: {self.__action_command}"
|
||||
)
|
||||
|
||||
# Only run if an interface is coming up
|
||||
if self.__action_command != "up":
|
||||
self.quit(f"We don't need to run for action command: {self.__action_command}")
|
||||
|
||||
# Determine which ssh config file we should use
|
||||
ssh_config_target = self.determine_ssh_config_target()
|
||||
if ssh_config_target is None:
|
||||
self.die("Unable to determine appropriate ssh config name; Quitting")
|
||||
self.__logger.log(
|
||||
f"Determined ssh config name: {ssh_config_target.name}"
|
||||
)
|
||||
|
||||
# Make paths
|
||||
ssh_config_path_link = self.__config.ssh_dir / self.__config.default_normal_ssh_config_file_name
|
||||
ssh_config_path_target = self.__config.ssh_dir / ssh_config_target.ssh_config_file_name
|
||||
self.__logger.log(
|
||||
f"Selecting source config file \"{ssh_config_path_target}\""
|
||||
f" for link \"{ssh_config_path_link}\""
|
||||
)
|
||||
|
||||
# Don't run unless current ssh config is a symlink or not a file (for safety)
|
||||
self.require_symlink_or_none(ssh_config_path_link)
|
||||
|
||||
if self.__config.dry_run:
|
||||
|
||||
self.__logger.complain(
|
||||
f"Dry run enabled; Won't unlink existing symlink: {ssh_config_path_link}"
|
||||
)
|
||||
self.__logger.complain(
|
||||
f"Dry run enabled; Won't create symlink: {ssh_config_path_link} ==> {ssh_config_path_target}"
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
# Set the symlink
|
||||
if ssh_config_path_link.exists():
|
||||
try:
|
||||
ssh_config_path_link.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
ssh_config_path_link.symlink_to(ssh_config_path_target, target_is_directory=False)
|
||||
|
||||
#
|
||||
self.__logger.log("Finished")
|
||||
|
||||
#
|
||||
def require_symlink_or_none(self, file_path: Path):
|
||||
|
||||
if file_path.is_file() and file_path.exists() and not file_path.is_symlink():
|
||||
self.die(
|
||||
f"For safety, refusing to continue because the target ssh config file exists and is not a symlink:"
|
||||
f" {file_path}"
|
||||
)
|
||||
|
||||
def determine_ssh_config_target(self) -> Target:
|
||||
|
||||
#
|
||||
self.__logger.log("Attempting to determine SSH config name")
|
||||
|
||||
# Start off by assuming the default target
|
||||
# noinspection PyTypeChecker
|
||||
selected_target = None
|
||||
selected_target: Target
|
||||
|
||||
# Check each section
|
||||
for target_name in self.__config.targets:
|
||||
|
||||
self.__logger.log(f"Examining target: {target_name}")
|
||||
|
||||
if selected_target is not None and target_name == selected_target.name:
|
||||
self.__logger.log(f"Ignoring target because it is already selected: {target_name}")
|
||||
continue
|
||||
|
||||
target = self.__config.targets[target_name]
|
||||
|
||||
# Matches, if current interface found in adapters
|
||||
if self.__action_interface not in target.adapters:
|
||||
self.__logger.log(
|
||||
f"Target \"{target_name}\" didn't match any interfaces; Skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
# Grab the SSID this adapter is currently connected to
|
||||
interface_ssid = self.get_interface_ssid(self.__action_interface)
|
||||
if not interface_ssid:
|
||||
self.__logger.log(
|
||||
f"Interface \"{interface_ssid}\" isn't connected to anything; Done looking"
|
||||
)
|
||||
break
|
||||
self.__logger.log(
|
||||
f"Interface \"{self.__action_interface}\" is currently connected to: \"{interface_ssid}\""
|
||||
)
|
||||
|
||||
# Must also match at least one SSID
|
||||
if interface_ssid in target.ssids:
|
||||
|
||||
self.__logger.log(
|
||||
f"Found SSID \"{interface_ssid}\" in target {target_name}"
|
||||
)
|
||||
|
||||
# Only override selected target if this one has less SSIDs,
|
||||
# or there is no currently selected target
|
||||
if selected_target is None:
|
||||
self.__logger.log(
|
||||
f"Found first suitable target: {target_name}"
|
||||
)
|
||||
selected_target = target
|
||||
if len(target.ssids) < len(selected_target.ssids):
|
||||
self.__logger.log(
|
||||
f"Target \"{target_name}\""
|
||||
f" seems to be a better match than \"{selected_target.name}\""
|
||||
f" because it has fewer specified SSIDs"
|
||||
f" ({len(target.ssids)} vs. {len(selected_target.ssids)})"
|
||||
)
|
||||
selected_target = target
|
||||
|
||||
if selected_target is None:
|
||||
selected_target = self.__config.targets[self.__config.default_target_name]
|
||||
self.__logger.log(f"No suitable target found; Defaulting to: {selected_target.name}")
|
||||
|
||||
return selected_target
|
||||
|
||||
@staticmethod
|
||||
def get_interface_ssid(interface_name):
|
||||
|
||||
#
|
||||
p = subprocess.Popen(["nmcli", "dev", "show", interface_name], stdout=subprocess.PIPE)
|
||||
(stdout, stderr) = p.communicate()
|
||||
stdout = stdout.decode()
|
||||
|
||||
#
|
||||
r = re.compile(
|
||||
pattern=r"^GENERAL.CONNECTION:\s+(?P<ssid>[^\s].+)$",
|
||||
flags=re.MULTILINE
|
||||
)
|
||||
match = r.search(stdout)
|
||||
|
||||
return match.group("ssid")
|
278
domain/config/Config.py
Normal file
278
domain/config/Config.py
Normal file
@ -0,0 +1,278 @@
|
||||
|
||||
|
||||
from domain.Logger import Logger
|
||||
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
|
||||
class Target:
|
||||
|
||||
def __init__(self, logger: Logger, name):
|
||||
|
||||
self.__logger = logger
|
||||
|
||||
self.__name = name
|
||||
self.__data = {}
|
||||
|
||||
self.__adapters_names = []
|
||||
self.__ssids = []
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
self.__ssh_config_file_name: str = None
|
||||
|
||||
def __str__(self):
|
||||
|
||||
s = ""
|
||||
|
||||
s += f"Target: {self.__name}"
|
||||
s += f"\n> SSH config file name: {self.__ssh_config_file_name}"
|
||||
s += f"\n> Adapters: "
|
||||
|
||||
if len(self.__adapters_names) > 0:
|
||||
s += ", ".join(self.__adapters_names)
|
||||
else:
|
||||
s += "[none]"
|
||||
s += f"\n> SSIDs: "
|
||||
|
||||
if len(self.__ssids) > 0:
|
||||
s += ", ".join(self.__ssids)
|
||||
else:
|
||||
s += "[none]"
|
||||
|
||||
return s
|
||||
|
||||
def log(self, s):
|
||||
|
||||
self.__logger.log(
|
||||
f"[Target::{self.__name}] {s}"
|
||||
)
|
||||
|
||||
def complain(self, s):
|
||||
|
||||
self.__logger.complain(
|
||||
f"[Target::{self.__name}] {s}"
|
||||
)
|
||||
|
||||
def consume_data(self, data: dict):
|
||||
|
||||
assert isinstance(data, dict), (
|
||||
f"Data should be a dict (found: {type(data).__name__}) "
|
||||
)
|
||||
|
||||
self.__data = data
|
||||
|
||||
assert "config-file-name" in self.__data.keys(), (
|
||||
f"Name of ssh config file must be present at config-file-name"
|
||||
)
|
||||
config_file_name = self.__data["config-file-name"]
|
||||
assert isinstance(config_file_name, str), (
|
||||
f"config-file-name must be a string, but got: {type(config_file_name).__name__}"
|
||||
)
|
||||
self.__ssh_config_file_name = config_file_name
|
||||
|
||||
if "adapters" in self.__data.keys():
|
||||
adapters = self.__data["adapters"]
|
||||
if isinstance(adapters, list):
|
||||
pass
|
||||
elif isinstance(adapters, str):
|
||||
adapters = [adapters]
|
||||
else:
|
||||
raise AssertionError(f"Unsupported adapters data type: {type(adapters).__name__}")
|
||||
self.__adapters_names.extend(adapters)
|
||||
|
||||
if "adapter" in self.__data.keys():
|
||||
adapters = self.__data["adapter"]
|
||||
if isinstance(adapters, list):
|
||||
pass
|
||||
elif isinstance(adapters, str):
|
||||
adapters = [adapters]
|
||||
else:
|
||||
raise AssertionError(f"Unsupported adapter data type: {type(adapters).__name__}")
|
||||
self.__adapters_names.extend(adapters)
|
||||
|
||||
if "ssids" in self.__data.keys():
|
||||
ssids = self.__data["ssids"]
|
||||
if isinstance(ssids, list):
|
||||
pass
|
||||
elif isinstance(ssids, str):
|
||||
ssids = [ssids]
|
||||
else:
|
||||
raise AssertionError(f"Unsupported ssids data type: {type(ssids).__name__}")
|
||||
self.__ssids.extend(ssids)
|
||||
|
||||
if "ssid" in self.__data.keys():
|
||||
ssids = self.__data["ssid"]
|
||||
if isinstance(ssids, list):
|
||||
pass
|
||||
elif isinstance(ssids, str):
|
||||
ssids = [ssids]
|
||||
else:
|
||||
raise AssertionError(f"Unsupported ssid data type: {type(ssids).__name__}")
|
||||
self.__ssids.extend(ssids)
|
||||
|
||||
assert len(self.__adapters_names) > 0, (
|
||||
f"At least one adapter must be configured at target-name::adapters"
|
||||
)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.__name
|
||||
|
||||
@property
|
||||
def ssh_config_file_name(self) -> str:
|
||||
return self.__ssh_config_file_name
|
||||
|
||||
@property
|
||||
def adapters(self) -> list[str]:
|
||||
return self.__adapters_names
|
||||
|
||||
@property
|
||||
def ssids(self) -> list[str]:
|
||||
return self.__ssids
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME = "config"
|
||||
__DEFAULT_SSH_DIRECTORY_NAME = ".ssh"
|
||||
|
||||
def __init__(self, logger: Logger, file_path: str):
|
||||
|
||||
self.__logger = logger
|
||||
|
||||
if isinstance(file_path, str):
|
||||
file_path = Path(file_path)
|
||||
elif isinstance(file_path, Path):
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("File path should be a string or Path object")
|
||||
|
||||
self.__file_path = file_path
|
||||
|
||||
self.__data = {}
|
||||
|
||||
self.__dry_run = False
|
||||
self.__ssh_dir = Path(os.path.expanduser("~")) / self.__DEFAULT_SSH_DIRECTORY_NAME
|
||||
# noinspection PyTypeChecker
|
||||
self.__default_target_name: str = None
|
||||
self.__targets = {}
|
||||
|
||||
self._load_config()
|
||||
self._consume_config()
|
||||
|
||||
print(self)
|
||||
|
||||
def __str__(self):
|
||||
|
||||
s = ""
|
||||
|
||||
s += "*** Config ***"
|
||||
s += "\n Dry run: " + "True" if self.__dry_run else "False"
|
||||
for target in self.__targets.values():
|
||||
s += "\n" + str(target)
|
||||
|
||||
return s
|
||||
|
||||
def _load_config(self):
|
||||
|
||||
assert self.__file_path.exists(), "Config file must exist"
|
||||
|
||||
with open(self.__file_path) as f:
|
||||
|
||||
self.__data = yaml.safe_load(f)
|
||||
|
||||
def _consume_config(self):
|
||||
|
||||
assert isinstance(self.__data, dict), (
|
||||
f"Config data must be a dict"
|
||||
)
|
||||
|
||||
assert "options" in self.__data.keys(), (
|
||||
f"Options key missing from config"
|
||||
)
|
||||
options = self.__data["options"]
|
||||
assert isinstance(options, dict), "Config options must be a dict!"
|
||||
|
||||
if "dry-run" in options.keys():
|
||||
d = options["dry-run"]
|
||||
assert isinstance(d, bool), "options::dry-run must be a bool"
|
||||
if d:
|
||||
self.__logger.complain(f"Dry run enabled in config")
|
||||
self.__dry_run = d
|
||||
|
||||
if "ssh-dir" in options.keys():
|
||||
ssh_dir = Path(options["ssh-dir"])
|
||||
assert ssh_dir.exists(), f"options::ssh-dir must be a valid directory"
|
||||
self.__ssh_dir = ssh_dir
|
||||
self.__logger.log(f"Found ssh dir: {self.__ssh_dir}")
|
||||
else:
|
||||
self.__logger.log(f"options::ssh-dir not found")
|
||||
|
||||
assert "default-target" in options.keys(), (
|
||||
f"Must specify the name of the default target at options::default-target"
|
||||
)
|
||||
default_target_name = options["default-target"]
|
||||
assert isinstance(default_target_name, str), (
|
||||
f"Default target name must be a string but got: {type(default_target_name).__name__}"
|
||||
)
|
||||
self.__default_target_name = default_target_name
|
||||
|
||||
self.__targets = {}
|
||||
|
||||
assert "targets" in self.__data.keys(), "Config should specify targets"
|
||||
targets = self.__data["targets"]
|
||||
assert isinstance(targets, dict), "Targets should be a dict, where each key is one target"
|
||||
for target_name in targets.keys():
|
||||
|
||||
self.__logger.log(f"Parsing target: {target_name}")
|
||||
|
||||
try:
|
||||
t = Target(
|
||||
logger=self.__logger,
|
||||
name=target_name,
|
||||
)
|
||||
t.consume_data(data=targets[target_name])
|
||||
except AssertionError as e:
|
||||
self.__logger.complain(
|
||||
f"Failed to parse target \"{target_name}\""
|
||||
f"\n{e}"
|
||||
)
|
||||
raise e
|
||||
|
||||
self.__targets[target_name] = t
|
||||
|
||||
if self.__default_target_name not in self.__targets.keys():
|
||||
raise AssertionError(
|
||||
f"Default target specified as {self.__default_target_name} but was not found in dict of targets"
|
||||
)
|
||||
|
||||
@property
|
||||
def default_normal_ssh_config_file_name(self) -> str:
|
||||
return self.__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME
|
||||
|
||||
@property
|
||||
def file_path(self) -> Path:
|
||||
return self.__file_path
|
||||
|
||||
@property
|
||||
def dry_run(self) -> bool:
|
||||
return self.__dry_run
|
||||
|
||||
@dry_run.setter
|
||||
def dry_run(self, b: bool):
|
||||
self.__dry_run = b
|
||||
|
||||
@property
|
||||
def default_target_name(self) -> str:
|
||||
return self.__default_target_name
|
||||
|
||||
@property
|
||||
def ssh_dir(self) -> Path | None:
|
||||
return self.__ssh_dir
|
||||
|
||||
@property
|
||||
def targets(self) -> [Target]:
|
||||
return self.__targets
|
41
init-environment
Executable file
41
init-environment
Executable file
@ -0,0 +1,41 @@
|
||||
#!/bin/bash
|
||||
|
||||
log()
|
||||
{
|
||||
echo "[Connection Specific SSH Config - Init Env] $1"
|
||||
}
|
||||
complain()
|
||||
{
|
||||
echo "[Connection Specific SSH Config - Init Env] $1" 1>&2
|
||||
}
|
||||
die()
|
||||
{
|
||||
complain "Fatal: $1"
|
||||
exit 1
|
||||
}
|
||||
|
||||
SCRIPT_PATH=$(readlink -f "$0")
|
||||
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
|
||||
SCRIPT_NAME=$(basename "$SCRIPT_PATH")
|
||||
|
||||
log "Begin ${SCRIPT_NAME}"
|
||||
log "Script path: ${SCRIPT_PATH}"
|
||||
log "Script dir: ${SCRIPT_DIR}"
|
||||
log "PATH: ${PATH}"
|
||||
|
||||
log "PWD before switching: $(pwd)"
|
||||
cd "${SCRIPT_DIR}" || die "Failed to switch to project directory: ${SCRIPT_DIR}"
|
||||
log "PWD after switching: $(pwd)"
|
||||
|
||||
log "Ensuring python installation with pyenv"
|
||||
pyenv versions
|
||||
pyenv install --skip-existing || die "Failed to ensure python installation with pyenv"
|
||||
|
||||
log "Installing/upgrading pip and pipenv"
|
||||
pip install --upgrade pip pipenv || die "Failed to install/upgrade pip and pipenv"
|
||||
|
||||
log "Syncing pip environment with pipenv"
|
||||
pipenv --rm # Don't die because this will return an error if the env didn't already exist
|
||||
pipenv install || die "Failed to install pip environment with pipenv"
|
||||
pipenv sync || die "Failed to sync pip environment with pipenv"
|
||||
|
58
main.py
Normal file
58
main.py
Normal file
@ -0,0 +1,58 @@
|
||||
|
||||
|
||||
from domain.SSHConfigChanger import SSHConfigChanger
|
||||
|
||||
|
||||
import argparse
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="Mike's SSH Config Changer"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--dry-run", "-d",
|
||||
dest="dry_run",
|
||||
default=False,
|
||||
action="store_true",
|
||||
help="Pass this flag to print what would be changed, without actually changing anything"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--config", "-c",
|
||||
dest="config_file",
|
||||
required=True,
|
||||
help="Specify path to configuration file",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--interface", "--action-interface",
|
||||
dest="action_interface",
|
||||
required=True,
|
||||
help="Specify the interface specified by NetworkManager's action"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--command", "--action-command",
|
||||
dest="action_command",
|
||||
required=True,
|
||||
help="Specify the command specified by NetworkManager's action"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
configger = SSHConfigChanger(
|
||||
dry_run=args.dry_run,
|
||||
action_interface=args.action_interface,
|
||||
action_command=args.action_command,
|
||||
config_file_path=args.config_file,
|
||||
)
|
||||
configger.run()
|
||||
|
||||
|
||||
# Main Entry
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Reference in New Issue
Block a user