279 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
 | 
						|
 | 
						|
from domain.Logger import Logger
 | 
						|
 | 
						|
 | 
						|
import os
 | 
						|
from pathlib import Path
 | 
						|
import yaml
 | 
						|
 | 
						|
 | 
						|
class Target:
 | 
						|
	
 | 
						|
	def __init__(self, logger: Logger, name):
 | 
						|
		
 | 
						|
		self.__logger = logger
 | 
						|
		
 | 
						|
		self.__name = name
 | 
						|
		self.__data = {}
 | 
						|
		
 | 
						|
		self.__adapters_names = []
 | 
						|
		self.__ssids = []
 | 
						|
		
 | 
						|
		# noinspection PyTypeChecker
 | 
						|
		self.__ssh_config_file_name: str = None
 | 
						|
	
 | 
						|
	def __str__(self):
 | 
						|
		
 | 
						|
		s = ""
 | 
						|
		
 | 
						|
		s += f"Target: {self.__name}"
 | 
						|
		s += f"\n> SSH config file name: {self.__ssh_config_file_name}"
 | 
						|
		s += f"\n> Adapters: "
 | 
						|
 | 
						|
		if len(self.__adapters_names) > 0:
 | 
						|
			s += ", ".join(self.__adapters_names)
 | 
						|
		else:
 | 
						|
			s += "[none]"
 | 
						|
		s += f"\n> SSIDs: "
 | 
						|
 | 
						|
		if len(self.__ssids) > 0:
 | 
						|
			s += ", ".join(self.__ssids)
 | 
						|
		else:
 | 
						|
			s += "[none]"
 | 
						|
		
 | 
						|
		return s
 | 
						|
	
 | 
						|
	def log(self, s):
 | 
						|
	
 | 
						|
		self.__logger.log(
 | 
						|
			f"[Target::{self.__name}] {s}"
 | 
						|
		)
 | 
						|
	
 | 
						|
	def complain(self, s):
 | 
						|
		
 | 
						|
		self.__logger.complain(
 | 
						|
			f"[Target::{self.__name}] {s}"
 | 
						|
		)
 | 
						|
		
 | 
						|
	def consume_data(self, data: dict):
 | 
						|
		
 | 
						|
		assert isinstance(data, dict), (
 | 
						|
			f"Data should be a dict (found: {type(data).__name__}) "
 | 
						|
		)
 | 
						|
		
 | 
						|
		self.__data = data
 | 
						|
		
 | 
						|
		assert "config-file-name" in self.__data.keys(), (
 | 
						|
			f"Name of ssh config file must be present at config-file-name"
 | 
						|
		)
 | 
						|
		config_file_name = self.__data["config-file-name"]
 | 
						|
		assert isinstance(config_file_name, str), (
 | 
						|
			f"config-file-name must be a string, but got: {type(config_file_name).__name__}"
 | 
						|
		)
 | 
						|
		self.__ssh_config_file_name = config_file_name
 | 
						|
		
 | 
						|
		if "adapters" in self.__data.keys():
 | 
						|
			adapters = self.__data["adapters"]
 | 
						|
			if isinstance(adapters, list):
 | 
						|
				pass
 | 
						|
			elif isinstance(adapters, str):
 | 
						|
				adapters = [adapters]
 | 
						|
			else:
 | 
						|
				raise AssertionError(f"Unsupported adapters data type: {type(adapters).__name__}")
 | 
						|
			self.__adapters_names.extend(adapters)
 | 
						|
		
 | 
						|
		if "adapter" in self.__data.keys():
 | 
						|
			adapters = self.__data["adapter"]
 | 
						|
			if isinstance(adapters, list):
 | 
						|
				pass
 | 
						|
			elif isinstance(adapters, str):
 | 
						|
				adapters = [adapters]
 | 
						|
			else:
 | 
						|
				raise AssertionError(f"Unsupported adapter data type: {type(adapters).__name__}")
 | 
						|
			self.__adapters_names.extend(adapters)
 | 
						|
		
 | 
						|
		if "ssids" in self.__data.keys():
 | 
						|
			ssids = self.__data["ssids"]
 | 
						|
			if isinstance(ssids, list):
 | 
						|
				pass
 | 
						|
			elif isinstance(ssids, str):
 | 
						|
				ssids = [ssids]
 | 
						|
			else:
 | 
						|
				raise AssertionError(f"Unsupported ssids data type: {type(ssids).__name__}")
 | 
						|
			self.__ssids.extend(ssids)
 | 
						|
			
 | 
						|
		if "ssid" in self.__data.keys():
 | 
						|
			ssids = self.__data["ssid"]
 | 
						|
			if isinstance(ssids, list):
 | 
						|
				pass
 | 
						|
			elif isinstance(ssids, str):
 | 
						|
				ssids = [ssids]
 | 
						|
			else:
 | 
						|
				raise AssertionError(f"Unsupported ssid data type: {type(ssids).__name__}")
 | 
						|
			self.__ssids.extend(ssids)
 | 
						|
		
 | 
						|
		assert len(self.__adapters_names) > 0, (
 | 
						|
			f"At least one adapter must be configured at target-name::adapters"
 | 
						|
		)
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def name(self) -> str:
 | 
						|
		return self.__name
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def ssh_config_file_name(self) -> str:
 | 
						|
		return self.__ssh_config_file_name
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def adapters(self) -> list[str]:
 | 
						|
		return self.__adapters_names
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def ssids(self) -> list[str]:
 | 
						|
		return self.__ssids
 | 
						|
 | 
						|
 | 
						|
class Config:
 | 
						|
	
 | 
						|
	__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME = "config"
 | 
						|
	__DEFAULT_SSH_DIRECTORY_NAME = ".ssh"
 | 
						|
	
 | 
						|
	def __init__(self, logger: Logger, file_path: str):
 | 
						|
		
 | 
						|
		self.__logger = logger
 | 
						|
		
 | 
						|
		if isinstance(file_path, str):
 | 
						|
			file_path = Path(file_path)
 | 
						|
		elif isinstance(file_path, Path):
 | 
						|
			pass
 | 
						|
		else:
 | 
						|
			raise AssertionError("File path should be a string or Path object")
 | 
						|
		
 | 
						|
		self.__file_path = file_path
 | 
						|
		
 | 
						|
		self.__data = {}
 | 
						|
		
 | 
						|
		self.__dry_run = False
 | 
						|
		self.__ssh_dir = Path(os.path.expanduser("~")) / self.__DEFAULT_SSH_DIRECTORY_NAME
 | 
						|
		# noinspection PyTypeChecker
 | 
						|
		self.__default_target_name: str = None
 | 
						|
		self.__targets = {}
 | 
						|
		
 | 
						|
		self._load_config()
 | 
						|
		self._consume_config()
 | 
						|
		
 | 
						|
		print(self)
 | 
						|
	
 | 
						|
	def __str__(self):
 | 
						|
		
 | 
						|
		s = ""
 | 
						|
		
 | 
						|
		s += "*** Config ***"
 | 
						|
		s += "\n Dry run: " + "True" if self.__dry_run else "False"
 | 
						|
		for target in self.__targets.values():
 | 
						|
			s += "\n" + str(target)
 | 
						|
		
 | 
						|
		return s
 | 
						|
	
 | 
						|
	def _load_config(self):
 | 
						|
		
 | 
						|
		assert self.__file_path.exists(), "Config file must exist"
 | 
						|
		
 | 
						|
		with open(self.__file_path) as f:
 | 
						|
			
 | 
						|
			self.__data = yaml.safe_load(f)
 | 
						|
	
 | 
						|
	def _consume_config(self):
 | 
						|
		
 | 
						|
		assert isinstance(self.__data, dict), (
 | 
						|
			f"Config data must be a dict"
 | 
						|
		)
 | 
						|
		
 | 
						|
		assert "options" in self.__data.keys(), (
 | 
						|
			f"Options key missing from config"
 | 
						|
		)
 | 
						|
		options = self.__data["options"]
 | 
						|
		assert isinstance(options, dict), "Config options must be a dict!"
 | 
						|
		
 | 
						|
		if "dry-run" in options.keys():
 | 
						|
			d = options["dry-run"]
 | 
						|
			assert isinstance(d, bool), "options::dry-run must be a bool"
 | 
						|
			if d:
 | 
						|
				self.__logger.complain(f"Dry run enabled in config")
 | 
						|
			self.__dry_run = d
 | 
						|
		
 | 
						|
		if "ssh-dir" in options.keys():
 | 
						|
			ssh_dir = Path(options["ssh-dir"])
 | 
						|
			assert ssh_dir.exists(), f"options::ssh-dir must be a valid directory"
 | 
						|
			self.__ssh_dir = ssh_dir
 | 
						|
			self.__logger.log(f"Found ssh dir: {self.__ssh_dir}")
 | 
						|
		else:
 | 
						|
			self.__logger.log(f"options::ssh-dir not found")
 | 
						|
		
 | 
						|
		assert "default-target" in options.keys(), (
 | 
						|
			f"Must specify the name of the default target at options::default-target"
 | 
						|
		)
 | 
						|
		default_target_name = options["default-target"]
 | 
						|
		assert isinstance(default_target_name, str), (
 | 
						|
			f"Default target name must be a string but got: {type(default_target_name).__name__}"
 | 
						|
		)
 | 
						|
		self.__default_target_name = default_target_name
 | 
						|
		
 | 
						|
		self.__targets = {}
 | 
						|
		
 | 
						|
		assert "targets" in self.__data.keys(), "Config should specify targets"
 | 
						|
		targets = self.__data["targets"]
 | 
						|
		assert isinstance(targets, dict), "Targets should be a dict, where each key is one target"
 | 
						|
		for target_name in targets.keys():
 | 
						|
			
 | 
						|
			self.__logger.log(f"Parsing target: {target_name}")
 | 
						|
			
 | 
						|
			try:
 | 
						|
				t = Target(
 | 
						|
					logger=self.__logger,
 | 
						|
					name=target_name,
 | 
						|
				)
 | 
						|
				t.consume_data(data=targets[target_name])
 | 
						|
			except AssertionError as e:
 | 
						|
				self.__logger.complain(
 | 
						|
					f"Failed to parse target \"{target_name}\""
 | 
						|
					f"\n{e}"
 | 
						|
				)
 | 
						|
				raise e
 | 
						|
			
 | 
						|
			self.__targets[target_name] = t
 | 
						|
		
 | 
						|
		if self.__default_target_name not in self.__targets.keys():
 | 
						|
			raise AssertionError(
 | 
						|
				f"Default target specified as {self.__default_target_name} but was not found in dict of targets"
 | 
						|
			)
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def default_normal_ssh_config_file_name(self) -> str:
 | 
						|
		return self.__DEFAULT_NORMAL_SSH_CONFIG_FILE_NAME
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def file_path(self) -> Path:
 | 
						|
		return self.__file_path
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def dry_run(self) -> bool:
 | 
						|
		return self.__dry_run
 | 
						|
	
 | 
						|
	@dry_run.setter
 | 
						|
	def dry_run(self, b: bool):
 | 
						|
		self.__dry_run = b
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def default_target_name(self) -> str:
 | 
						|
		return self.__default_target_name
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def ssh_dir(self) -> Path | None:
 | 
						|
		return self.__ssh_dir
 | 
						|
	
 | 
						|
	@property
 | 
						|
	def targets(self) -> [Target]:
 | 
						|
		return self.__targets
 |