2023-03-27 19:40:52 -07:00
|
|
|
|
|
|
|
|
|
|
|
from domain.Logger import Logger
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
|
|
class Config:
|
|
|
|
|
|
|
|
__DEFAULT_VALID_EXTENSIONS = [
|
|
|
|
"yaml",
|
|
|
|
"yml"
|
|
|
|
]
|
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
def __init__(self, logger):
|
2023-03-27 19:40:52 -07:00
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
self.__logger = logger
|
2023-03-27 19:40:52 -07:00
|
|
|
self.__valid_extensions = self.__DEFAULT_VALID_EXTENSIONS
|
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
def debug(self, s):
|
|
|
|
self.__logger.debug(f"[{type(self).__name__}] {s}")
|
2023-03-27 19:40:52 -07:00
|
|
|
def info(self, s):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.__logger.info(f"[{type(self).__name__}] {s}")
|
2023-03-27 19:40:52 -07:00
|
|
|
def warn(self, s):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.__logger.warn(f"[{type(self).__name__}] {s}")
|
2023-03-27 19:40:52 -07:00
|
|
|
def error(self, s):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.__logger.error(f"[{type(self).__name__}] {s}")
|
2023-03-27 19:40:52 -07:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_dir_files_recursive(path: str):
|
|
|
|
|
|
|
|
files_paths = []
|
|
|
|
|
|
|
|
for dir_path, dirnames, filenames in os.walk(path):
|
|
|
|
|
|
|
|
for file_name in filenames:
|
|
|
|
|
|
|
|
file_path = os.path.join(dir_path, file_name)
|
|
|
|
files_paths.append(file_path)
|
|
|
|
# print("Uhm yeah", dir_path, "--", dirnames, "--", file_name)
|
|
|
|
# print("==>", file_path)
|
|
|
|
|
|
|
|
return files_paths
|
|
|
|
|
|
|
|
def gather_valid_configs(self, paths: list=None):
|
|
|
|
|
|
|
|
assert paths is not None, "Config paths cannot be None"
|
|
|
|
assert len(paths) > 0, "Must provide at least one config file path"
|
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info("Gathering valid configs")
|
2023-03-27 19:40:52 -07:00
|
|
|
|
|
|
|
file_paths = []
|
|
|
|
configs = []
|
|
|
|
not_configs = []
|
|
|
|
|
|
|
|
# First gather all files that are potential configs
|
|
|
|
for path in paths:
|
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info(f"Inspecting path: {path}")
|
2023-03-27 19:40:52 -07:00
|
|
|
|
|
|
|
if os.path.isfile(path):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.debug(f"Path is a file; Adding directly to potential config candidates: {path}")
|
2023-03-27 19:40:52 -07:00
|
|
|
file_paths.append(path)
|
2023-03-27 19:59:45 -07:00
|
|
|
|
2023-03-27 19:40:52 -07:00
|
|
|
elif os.path.isdir(path):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.debug(f"Path is a dir; Scanning recursively for potential config candidate files: {path}")
|
2023-03-27 19:40:52 -07:00
|
|
|
for file_path in Config.get_dir_files_recursive(path=path):
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info(f"> Candidate file: {file_path}")
|
2023-03-27 19:40:52 -07:00
|
|
|
file_paths.append(file_path)
|
|
|
|
|
|
|
|
else:
|
|
|
|
raise AssertionError(f"Don't know how to handle path that isn't a file or dir: {path}")
|
|
|
|
|
|
|
|
# Now, filter for files with valid YAML extensions
|
|
|
|
for file_path in file_paths:
|
|
|
|
if self.check_file_extension(file_path=file_path, extensions=None):
|
|
|
|
configs.append(file_path)
|
|
|
|
else:
|
|
|
|
not_configs.append(file_path)
|
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info("Filtered out non-config files:")
|
2023-03-27 19:40:52 -07:00
|
|
|
if len(not_configs) > 0:
|
|
|
|
for not_config in not_configs:
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info(f"> {not_config}")
|
2023-03-27 19:40:52 -07:00
|
|
|
else:
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info("> [none]")
|
2023-03-27 19:40:52 -07:00
|
|
|
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info("Kept config-looking files:")
|
2023-03-27 19:40:52 -07:00
|
|
|
if len(configs) > 0:
|
|
|
|
for config in configs:
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info(f"> {config}")
|
2023-03-27 19:40:52 -07:00
|
|
|
else:
|
2023-03-27 19:59:45 -07:00
|
|
|
self.info("> [none]")
|
2023-03-27 19:40:52 -07:00
|
|
|
|
|
|
|
return configs
|
|
|
|
|
|
|
|
def check_file_extension(self, file_path, extensions: list=None):
|
|
|
|
|
|
|
|
if extensions is None:
|
|
|
|
extensions = self.__valid_extensions
|
|
|
|
|
|
|
|
file_name, file_extension = os.path.splitext(file_path)
|
|
|
|
if len(file_extension) > 0 and file_extension[0] == ".":
|
|
|
|
file_extension = file_extension[1:]
|
|
|
|
file_extension = file_extension.lower()
|
|
|
|
|
|
|
|
for valid_extension in extensions:
|
|
|
|
#print(file_name, "---", file_extension, "---", valid_extension)
|
|
|
|
if file_extension == valid_extension:
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
|