gitea-repo-migrator/domain/Migrator.py
2024-06-17 23:45:40 -07:00

513 lines
14 KiB
Python

from domain.API import API
import gitea
import logging
import sys
import certifi
class Migrator:
__REPO_ORIGINAL_NAME_TOKEN = "%N%"
def __init__(
self,
source_host, source_port, source_token,
destination_host, destination_port, destination_token,
):
# noinspection PyTypeChecker
self.__logger: logging.Logger = None
self._init_logger()
self.__verify_ssl = True
self.__source_host = source_host
self.__source_port = source_port
self.__source_token = source_token
self.__destination_host = destination_host
self.__destination_port = destination_port
self.__destination_token = destination_token
self.__source_api = API.factory(
hostname=self.__source_host,
port=self.__source_port,
token=self.__source_token
)
self.__destination_api = API.factory(
hostname=self.__destination_host,
port=self.__destination_port,
token=self.__destination_token
)
def _init_logger(self):
logger = logging.Logger(name=f"{type(self).__name__}", level=logging.INFO)
stdout_handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(stdout_handler)
self.__logger = logger
"""
def _get_user_api(self, hostname, port, token) -> giteapy.UserApi:
conf = giteapy.Configuration()
conf.api_key['access_token'] = token
conf.host = self._make_api_base(hostname=hostname, port=port)
conf.verify_ssl = self.__verify_ssl
api = giteapy.UserApi(giteapy.ApiClient(conf))
return api
"""
"""
def _get_repo_api(self, hostname, port, token) -> giteapy.RepositoryApi:
conf = giteapy.Configuration()
conf.api_key['access_token'] = token
conf.host = self._make_api_base(hostname=hostname, port=port)
conf.verify_ssl = self.__verify_ssl
api = giteapy.RepositoryApi(giteapy.ApiClient(conf))
return api
"""
"""
def _get_org_apis(self) -> (giteapy.OrganizationApi, giteapy.OrganizationApi):
api_source = self._get_org_api(
hostname=self.__source_host, port=self.__source_port,
token=self.__source_token
)
api_destination = self._get_org_api(
hostname=self.__destination_host, port=self.__destination_port,
token=self.__destination_token
)
return api_source, api_destination
"""
"""
def _get_org_api(self, hostname, port, token) -> giteapy.OrganizationApi:
conf = giteapy.Configuration()
conf.api_key['access_token'] = token
conf.host = self._make_api_base(hostname=hostname, port=port)
conf.verify_ssl = self.__verify_ssl
api = giteapy.OrganizationApi(giteapy.ApiClient(conf))
return api
"""
def _make_destination_repo_name(self, pattern: str, repo: gitea.Repository):
repo_name = pattern.replace(self.__REPO_ORIGINAL_NAME_TOKEN, repo.name)
return repo_name
def set_verify_ssl(self, b: bool):
self.__verify_ssl = b
def set_ca_bundle(self, bundle_path: str):
self.__logger.info("Setting certificate bundle path")
# Hacky but oh well
self.__logger.info(f"Old path: {certifi.where()}")
certifi.core._CACERT_PATH = bundle_path
self.__logger.info(f"New path: {certifi.where()}")
def migrate_entire_org(
self,
interactive: bool = True,
source_org: str = None, source_topics: list[str] = None,
destination_org: str = None, destination_repo_name: str = None, destination_topics: list = None,
do_destination_copy_topics: bool = True
):
assert source_org is not None, "Source org must be specified"
assert destination_org is not None, "Destination org must be specified"
assert destination_repo_name is not None, "Destination repo name must be specified"
assert destination_topics is not None, "Destination topics must be specified"
assert do_destination_copy_topics is not None, "Destination directive to copy source topics should be specified"
# api_source, api_destination = self._get_org_apis()
# api_source: giteapy.OrganizationApi
# api_destination: giteapy.OrganizationApi
# Tattle on certify
self.__logger.info(f"Certifi is currently using CA bundle: {certifi.where()}")
# Grab all org repos
source_repos = self._fetch_all_org_repos(
org_name=source_org
)
self.__logger.info(f"Found {len(source_repos)} repos on source:")
for repo in source_repos:
repo: gitea.Repository
self.__logger.info(f"- {repo.get_full_name()}")
print()
# Filter
source_repos = self._filter_repos_for_required_topics(
repos=source_repos,
topics_required=source_topics
)
print()
self.__logger.info(f"Have {len(source_repos)} remaining repos after topic filtering:")
for repo in source_repos:
repo: gitea.Repository
self.__logger.info(f"- {repo.get_full_name()}")
assert False, "Poop"
repos_migrate = []
repos_ignore = []
go_right_now = False
for repo in source_repos:
repo: giteapy.Repository
while True:
if interactive:
response = input(f"Migrate repo #{repo.id} \"{repo.full_name}\" ? (Y)es, (N)o, (G)o right now, (Q)uit ==> ")
response = response.lower()
else:
response = "y"
valid_input = True
if response == "y":
repos_migrate.append(repo)
elif response == "n":
repos_ignore.append(repo)
elif response == "g":
self.__logger.info("Okay, done asking questions, migrating existing selections.")
go_right_now = True
elif response == "q":
go_right_now = True
repos_migrate.clear()
repos_ignore.clear()
self.__logger.info("Okay, quitting instead.")
else:
valid_input = False
self.__logger.warning(f"Invalid input: {response}")
if valid_input:
break
if go_right_now:
break
#
self.__logger.info("")
if len(repos_migrate):
self.__logger.info("Repos to migrate:")
for repo in repos_migrate:
repo: giteapy.Repository
destination_name = self._make_destination_repo_name(pattern=destination_repo_name, repo=repo)
self.__logger.info(
f"#{repo.id} \"{repo.name}\"\n> \"{destination_name}\""
)
else:
self.__logger.info("No repos marked to migrate")
self.__logger.info("")
if len(repos_ignore):
self.__logger.info("Repos to ignore:")
for repo in repos_ignore:
repo: giteapy.Repository
self.__logger.info(f"#{repo.id} \"{repo.name}\"")
else:
self.__logger.info("No repos marked to ignore")
if len(repos_migrate):
confirmation = input("Do you confirm the above selections? Enter MIGRATE ==> ")
if confirmation == "MIGRATE":
self.__logger.info("Confirmation received; Processing ... ")
source_repos_successful, source_repos_failed = self._migrate_repos(
destination_org_name=destination_org,
destination_repo_name=destination_repo_name,
destination_topics=destination_topics,
do_destination_copy_topics=do_destination_copy_topics,
repos=repos_migrate
)
self.__logger.info(
f"{len(source_repos_successful)} of {len(repos_migrate)} repos successfully migrated."
)
if len(source_repos_failed) > 0:
self.__logger.error(f"Failed to migrate {len(source_repos_failed)} repos:")
for repo, exception in source_repos_failed:
self.__logger.error(
f"> {repo.name}"
)
self.__logger.error(f"Captured exception data:")
for repo, exception in source_repos_failed:
self.__logger.error(
f"Failed to migrate repo: {repo.name}\n> {exception}"
)
self._delete_migrated_repos(source_org_name=source_org, repos=source_repos_successful)
else:
self.__logger.info("Confirmation not received; Won't do anything.")
def _fetch_all_org_repos(self, org_name: str):
org = gitea.Organization.request(
gitea=self.__source_api,
name=org_name
)
# Grabs all pages automatically
repos = org.get_repositories()
return repos
def _filter_repos_for_required_topics(
self,
repos: list[gitea.Repository],
topics_required: list[str]
) -> list[gitea.Repository]:
self.__logger.info(
f"Filtering source repos for required topics: {topics_required}"
)
repos_keep = []
repos_reject = []
repo_topics = {}
for repo in repos:
repo: gitea.Repository
repo_key = repo.get_full_name()
topics_present = repo.get_topics()
print(topics_present)
repo_topics[repo_key] = topics_present["topics"]
print(repo_topics[repo_key])
if self._check_required_topics(
topics_present=repo_topics[repo_key],
topics_required=topics_required
):
repos_keep.append(repo)
else:
repos_reject.append(repo)
self.__logger.info("")
self.__logger.info(
f"\nKeeping {len(repos_keep)} repos"
f" because they contain all required topics ({topics_required}):"
)
if len(repos_keep) > 0:
for repo in repos_keep:
self.__logger.info(f"> {repo.full_name}")
else:
self.__logger.info("> None")
self.__logger.info("")
self.__logger.info(
f"Rejecting {len(repos_reject)} repos because they don't contain all required topics:"
)
if len(repos_reject) > 0:
for repo in repos_reject:
self.__logger.info(f"> {repo.full_name}")
else:
self.__logger.info("> None")
return repos_keep
@staticmethod
def _check_required_topics(topics_present: list[str], topics_required: list[str]) -> bool:
print("Required topics:", topics_required)
print("Present topics:", topics_present)
for topic in topics_required:
if topic not in topics_present:
return False
return True
def _migrate_repos(
self,
destination_org_name: str,
destination_repo_name: str,
destination_topics: list,
do_destination_copy_topics: bool,
repos: list
):
api_source, api_destination = self._get_org_apis()
destination_org = api_destination.org_get(org=destination_org_name)
destination_org: giteapy.Organization
self.__logger.info(f"Destination organization: {destination_org.full_name}")
api_source_repos = self._get_repo_api(
hostname=self.__source_host, port=self.__source_port,
token=self.__source_token
)
source_repos_successful = []
source_repos_failed = []
for source_repo in repos:
source_repo: giteapy.Repository
this_destination_repo_name = destination_repo_name.replace("%N%", source_repo.name)
self.__logger.info(f"Migrating: {source_repo.name} ==> {this_destination_repo_name}")
source_repo_topics = api_source_repos.repo_list_topics(owner=source_repo.owner.login, repo=source_repo.name)
source_repo_topics = source_repo_topics.topics
migrate_body = giteapy.MigrateRepoForm(
mirror=False,
clone_addr=source_repo.clone_url,
uid=destination_org.id,
private=source_repo.private,
repo_name=this_destination_repo_name,
description=source_repo.description,
labels=True, issues=True, pull_requests=True, releases=True, milestones=True, wiki=True
)
# TODO: These three lines represent feature request to giteapy authors
#migrate_body.auth_token = self.__source_token
#migrate_body.swagger_types["auth_token"] = "str"
#migrate_body.attribute_map["auth_token"] = "auth_token"
self.__logger.debug("Migrate body:")
self.__logger.debug(migrate_body)
try:
destination_api = self._get_repo_api(
hostname=self.__destination_host,
port=self.__destination_port,
token=self.__destination_token,
)
except giteapy.rest.ApiException as e:
self.__logger.error(f"Failed to generate destination API: {e}")
source_repos_failed.append(
(source_repo, e)
)
continue
try:
repo_new = destination_api.repo_migrate(body=migrate_body)
except giteapy.rest.ApiException as e:
self.__logger.error(f"Failed to execute repo migration request via API: {e}")
source_repos_failed.append(
(source_repo, e)
)
continue
self.__logger.debug(f"Migration result: {repo_new}")
repo_new: giteapy.Repository
assert repo_new.name == this_destination_repo_name,\
"New repository didn't end up with the correct name. Failure?"
# Copy source topics?
if do_destination_copy_topics:
for topic in source_repo_topics:
self.__logger.debug(f"Appending source topic to new repo: {topic}")
destination_api.repo_add_topc(
owner=destination_org.username,
repo=repo_new.name,
topic=topic,
)
# Add specified topics
for topic in destination_topics:
self.__logger.debug(f"Appending topic to new repo: {topic}")
destination_api.repo_add_topc(
owner=destination_org.username,
repo=repo_new.name,
topic=topic,
)
source_repos_successful.append(source_repo)
return source_repos_successful, source_repos_failed
def _delete_migrated_repos(self, source_org_name: str, repos: list[gitea.Repository]):
if len(repos) == 0:
self.__logger.warning(f"Cannot delete any migrated repos because none were successful!")
return
repo_api = self._get_repo_api(
hostname=self.__source_host,
port=self.__source_port,
token=self.__source_token
)
repo_api: giteapy.RepositoryApi
self.__logger.info("")
self.__logger.info(f"Can now delete {len(repos)} successfully migrated repos:")
for r in repos:
self.__logger.info(f"> #{r.id} \"{r.full_name}\" ==> {r.clone_url}")
response = input("Would you like to delete the successfully migrated repos? Type DELETE ==> ")
if response != "DELETE":
self.__logger.info("Okay, won't delete migrated repos.")
return
do_quit = False
do_delete_all = False
for repo in repos:
self.__logger.info(f"Next repo to delete: #{repo.id} \"{repo.full_name}\"")
do_delete = True if do_delete_all else False
if do_delete is False:
response = input("Delete this repo? (Y)es, (N), (A)ll, (Q)uit ==> ")
response = response.lower()
valid_response = False
while valid_response is False:
valid_response = True
if response == "y":
self.__logger.info(f"Okay, deleting {repo.name} from source")
do_delete = True
elif response == "n":
self.__logger.info(f"Okay, won't delete {repo.name}")
elif response == "a":
self.__logger.info(f"Okay, deleting ALL remaining repos")
do_delete_all = True
elif response == "q":
do_quit = True
else:
valid_response = False
self.__logger.warning(f"Invalid response: {response}")
if do_quit:
break
if do_delete or do_delete_all:
self.__logger.info(f"Deleting repo: {repo.full_name}")
repo_api.repo_delete(
owner=source_org_name,
repo=repo.name
)