import giteapy import logging import sys class Migrator: __DEFAULT_API_PATH = "/api/v1" __REPO_ORIGINAL_NAME_TOKEN = "%N%" def __init__( self, source_host, source_port, source_token, destination_host, destination_port, destination_token, ): # noinspection PyTypeChecker self.__logger: logging.Logger = None self._init_logger() self.__verify_ssl = True self.__source_host = source_host self.__source_port = source_port self.__source_token = source_token self.__destination_host = destination_host self.__destination_port = destination_port self.__destination_token = destination_token def _init_logger(self): logger = logging.Logger(name=f"{type(self).__name__}", level=logging.INFO) stdout_handler = logging.StreamHandler(stream=sys.stdout) logger.addHandler(stdout_handler) self.__logger = logger def _get_user_api(self, hostname, port, token) -> giteapy.UserApi: conf = giteapy.Configuration() conf.api_key['access_token'] = token conf.host = self._make_api_base(hostname=hostname, port=port) conf.verify_ssl = self.__verify_ssl api = giteapy.UserApi(giteapy.ApiClient(conf)) return api def _get_repo_api(self, hostname, port, token) -> giteapy.RepositoryApi: conf = giteapy.Configuration() conf.api_key['access_token'] = token conf.host = self._make_api_base(hostname=hostname, port=port) conf.verify_ssl = self.__verify_ssl api = giteapy.RepositoryApi(giteapy.ApiClient(conf)) return api def _get_org_apis(self) -> (giteapy.OrganizationApi, giteapy.OrganizationApi): api_source = self._get_org_api( hostname=self.__source_host, port=self.__source_port, token=self.__source_token ) api_destination = self._get_org_api( hostname=self.__destination_host, port=self.__destination_port, token=self.__destination_token ) return api_source, api_destination def _get_org_api(self, hostname, port, token) -> giteapy.OrganizationApi: conf = giteapy.Configuration() conf.api_key['access_token'] = token conf.host = self._make_api_base(hostname=hostname, port=port) conf.verify_ssl = self.__verify_ssl api = giteapy.OrganizationApi(giteapy.ApiClient(conf)) return api def _make_api_base(self, hostname, port): base = f"https://{hostname}" if port is not None: base += f":{port}" base += self.__DEFAULT_API_PATH return base def _make_destination_repo_name(self, pattern: str, repo: giteapy.Repository): repo_name = pattern.replace(self.__REPO_ORIGINAL_NAME_TOKEN, repo.name) return repo_name def set_verify_ssl(self, b: bool): self.__verify_ssl = b def migrate_entire_org( self, interactive: bool = True, source_org: str = None, source_topics: list[str] = None, destination_org: str = None, destination_repo_name: str = None, destination_topics: list = None ): assert source_org is not None, "Source org must be specified" assert destination_org is not None, "Destination org must be specified" assert destination_repo_name is not None, "Destination repo name must be specified" assert destination_topics is not None, "Destination topics must be specified" api_source, api_destination = self._get_org_apis() api_source: giteapy.OrganizationApi api_destination: giteapy.OrganizationApi # Grab all org repos source_repos = self._fetch_all_org_repos(org=source_org) self.__logger.info(f"Found {len(source_repos)} repos on source:") for repo in source_repos: repo: giteapy.Repository self.__logger.info(f"- #{repo.id} {repo.full_name}") print() # Filter source_repos = self._filter_repos_for_required_topics(repos=source_repos, topics=source_topics) repos_migrate = [] repos_ignore = [] go_right_now = False for repo in source_repos: repo: giteapy.Repository while True: if interactive: response = input(f"Migrate repo #{repo.id} \"{repo.full_name}\" ? (Y)es, (N)o, (G)o right now, (Q)uit ==> ") response = response.lower() else: response = "y" valid_input = True if response == "y": repos_migrate.append(repo) elif response == "n": repos_ignore.append(repo) elif response == "g": self.__logger.info("Okay, done asking questions, migrating existing selections.") go_right_now = True elif response == "q": go_right_now = True repos_migrate.clear() repos_ignore.clear() self.__logger.info("Okay, quitting instead.") else: valid_input = False self.__logger.warning(f"Invalid input: {response}") if valid_input: break if go_right_now: break # self.__logger.info("") if len(repos_migrate): self.__logger.info("Repos to migrate:") for repo in repos_migrate: repo: giteapy.Repository destination_name = self._make_destination_repo_name(pattern=destination_repo_name, repo=repo) self.__logger.info(f"#{repo.id} \"{repo.name}\" ==> \"{destination_name}\"") else: self.__logger.info("No repos marked to migrate") self.__logger.info("") if len(repos_ignore): self.__logger.info("Repos to ignore:") for repo in repos_ignore: repo: giteapy.Repository self.__logger.info(f"#{repo.id} \"{repo.name}\"") else: self.__logger.info("No repos marked to ignore") if len(repos_migrate): confirmation = input("Do you confirm the above selections? Enter MIGRATE ==> ") if confirmation == "MIGRATE": self.__logger.info("Confirmation received; Processing ... ") source_repos_successful = self._migrate_repos( destination_org_name=destination_org, destination_repo_name=destination_repo_name, destination_topics=destination_topics, repos=repos_migrate ) self.__logger.info(f"{len(source_repos_successful)} of {len(repos_migrate)} repos successfully migrated.") self._delete_migrated_repos(source_org_name=source_org, repos=source_repos_successful) else: self.__logger.info("Confirmation not received; Won't do anything.") def _fetch_all_org_repos(self, org: str): api_source, api_destination = self._get_org_apis() api_source: giteapy.OrganizationApi source_repos = [] page = 0 while True: page += 1 # Starts at 1 for some reason source_repos_page = api_source.org_list_repos(org, page=page, limit=25) if len(source_repos_page) == 0: break source_repos.extend(source_repos_page) return source_repos def _filter_repos_for_required_topics(self, repos: list[giteapy.Repository], topics: list[str]) -> list[giteapy.Repository]: self.__logger.info(f"Filtering source repos for required topics: {topics}") repos_keep = [] repos_reject = [] repo_topics = {} api_source_repos = self._get_repo_api( hostname=self.__source_host, port=self.__source_port, token=self.__source_token ) for repo in repos: repo_topics[repo.id] = api_source_repos.repo_list_topics(owner=repo.owner.login, repo=repo.name) repo_topics[repo.id] = repo_topics[repo.id].topics self.__logger.error(f"Repo topics: {repo_topics[repo.id]}") if self._check_required_topics(topics_present=repo_topics[repo.id], topics_required=topics): repos_keep.append(repo) else: repos_reject.append(repo) self.__logger.info(f"Keeping the following repos because they contain all required topics ({topics}):") if len(repos_keep) > 0: for repo in repos_keep: self.__logger.info(f"> {repo.full_name}") else: self.__logger.info("> None") self.__logger.info("Rejecting the following repos because they don't contain all required topics:") if len(repos_reject) > 0: for repo in repos_reject: self.__logger.info(f"> {repo.full_name} ({repo_topics[repo.id]})") else: self.__logger.info("> None") return repos_keep def _check_required_topics(self, topics_present: list[str], topics_required: list[str]) -> bool: for topic in topics_required: if topic not in topics_present: return False return True def _migrate_repos( self, destination_org_name: str, destination_repo_name: str, destination_topics: list, repos: list ): api_source, api_destination = self._get_org_apis() destination_org = api_destination.org_get(org=destination_org_name) destination_org: giteapy.Organization self.__logger.info(f"Destination organization: {destination_org.full_name}") source_repos_successful = [] for source_repo in repos: source_repo: giteapy.Repository this_destination_repo_name = destination_repo_name.replace("%N%", source_repo.name) migrate_body = giteapy.MigrateRepoForm( mirror=False, clone_addr=source_repo.clone_url, uid=destination_org.id, private=source_repo.private, repo_name=this_destination_repo_name, description=source_repo.description, labels=True, issues=True, pull_requests=True, releases=True, milestones=True, wiki=True ) # TODO: These three lines represent feature request to giteapy authors migrate_body.auth_token = self.__source_token migrate_body.swagger_types["auth_token"] = "str" migrate_body.attribute_map["auth_token"] = "auth_token" self.__logger.debug("Migrate body:") self.__logger.debug(migrate_body) destination_api = self._get_repo_api( hostname=self.__destination_host, port=self.__destination_port, token=self.__destination_token, ) repo_new = destination_api.repo_migrate(body=migrate_body) self.__logger.debug(f"Migration result: {repo_new}") repo_new: giteapy.Repository assert repo_new.name == this_destination_repo_name,\ "New repository didn't end up with the correct name. Failure?" for topic in destination_topics: self.__logger.debug(f"Appending topic to new repo: {topic}") destination_api.repo_add_topc( owner=destination_org.username, repo=repo_new.name, topic=topic, ) source_repos_successful.append(source_repo) return source_repos_successful def _delete_migrated_repos(self, source_org_name: str, repos: list[giteapy.Repository]): repo_api = self._get_repo_api( hostname=self.__source_host, port=self.__source_port, token=self.__source_token ) repo_api: giteapy.RepositoryApi self.__logger.info("") self.__logger.info("Can now delete the following successfully migrated repos:") for r in repos: self.__logger.info(f"> #{r.id} \"{r.full_name}\" ==> {r.clone_url}") response = input("Would you like to delete the successfully migrated repos? Type DELETE ==> ") if response != "DELETE": self.__logger.info("Okay, won't delete migrated repos.") return do_quit = False do_delete_all = False for repo in repos: self.__logger.info(f"Next repo to delete: #{repo.id} \"{repo.full_name}\"") do_delete = True if do_delete_all else False if do_delete is False: response = input("Delete this repo? (Y)es, (N), (A)ll, (Q)uit ==> ") response = response.lower() valid_response = False while valid_response is False: valid_response = True if response == "y": self.__logger.info(f"Okay, deleting {repo.name} from source") do_delete = True elif response == "n": self.__logger.info(f"Okay, won't delete {repo.name}") elif response == "a": self.__logger.info(f"Okay, deleting ALL remaining repos") do_delete_all = True elif response == "q": do_quit = True else: valid_response = False self.__logger.warning(f"Invalid response: {response}") if do_quit: break if do_delete or do_delete_all: self.__logger.info(f"Deleting repo: {repo.full_name}") repo_api.repo_delete( owner=source_org_name, repo=repo.name )