522 lines
14 KiB
Python
522 lines
14 KiB
Python
|
|
|
|
from domain.API import API
|
|
|
|
|
|
import gitea
|
|
import logging
|
|
import sys
|
|
|
|
import certifi
|
|
|
|
|
|
class Migrator:
|
|
|
|
__REPO_ORIGINAL_NAME_TOKEN = "%N%"
|
|
|
|
def __init__(
|
|
self,
|
|
source_host, source_port, source_token,
|
|
destination_host, destination_port, destination_token,
|
|
verify_ssl: bool = True, ca_bundle: str = None
|
|
):
|
|
|
|
# noinspection PyTypeChecker
|
|
self.__logger: logging.Logger = None
|
|
self._init_logger()
|
|
|
|
self.__source_host = source_host
|
|
self.__source_port = source_port
|
|
self.__source_token = source_token
|
|
|
|
self.__destination_host = destination_host
|
|
self.__destination_port = destination_port
|
|
self.__destination_token = destination_token
|
|
|
|
self.__verify_ssl = verify_ssl
|
|
self.__ca_bundle = ca_bundle
|
|
|
|
api = API(
|
|
verify_ssl=self.__verify_ssl,
|
|
ca_bundle=self.__ca_bundle,
|
|
)
|
|
|
|
self.__source_api = api.get(
|
|
hostname=self.__source_host,
|
|
port=self.__source_port,
|
|
token=self.__source_token,
|
|
|
|
)
|
|
self.__destination_api = api.get(
|
|
hostname=self.__destination_host,
|
|
port=self.__destination_port,
|
|
token=self.__destination_token,
|
|
)
|
|
|
|
def _init_logger(self):
|
|
|
|
logger = logging.Logger(name=f"{type(self).__name__}", level=logging.INFO)
|
|
|
|
stdout_handler = logging.StreamHandler(stream=sys.stdout)
|
|
logger.addHandler(stdout_handler)
|
|
|
|
self.__logger = logger
|
|
|
|
"""
|
|
def _get_user_api(self, hostname, port, token) -> giteapy.UserApi:
|
|
|
|
conf = giteapy.Configuration()
|
|
conf.api_key['access_token'] = token
|
|
conf.host = self._make_api_base(hostname=hostname, port=port)
|
|
conf.verify_ssl = self.__verify_ssl
|
|
api = giteapy.UserApi(giteapy.ApiClient(conf))
|
|
|
|
return api
|
|
"""
|
|
|
|
"""
|
|
def _get_repo_api(self, hostname, port, token) -> giteapy.RepositoryApi:
|
|
|
|
conf = giteapy.Configuration()
|
|
conf.api_key['access_token'] = token
|
|
conf.host = self._make_api_base(hostname=hostname, port=port)
|
|
conf.verify_ssl = self.__verify_ssl
|
|
api = giteapy.RepositoryApi(giteapy.ApiClient(conf))
|
|
|
|
return api
|
|
"""
|
|
|
|
"""
|
|
def _get_org_apis(self) -> (giteapy.OrganizationApi, giteapy.OrganizationApi):
|
|
|
|
api_source = self._get_org_api(
|
|
hostname=self.__source_host, port=self.__source_port,
|
|
token=self.__source_token
|
|
)
|
|
api_destination = self._get_org_api(
|
|
hostname=self.__destination_host, port=self.__destination_port,
|
|
token=self.__destination_token
|
|
)
|
|
|
|
return api_source, api_destination
|
|
"""
|
|
|
|
"""
|
|
def _get_org_api(self, hostname, port, token) -> giteapy.OrganizationApi:
|
|
|
|
conf = giteapy.Configuration()
|
|
conf.api_key['access_token'] = token
|
|
conf.host = self._make_api_base(hostname=hostname, port=port)
|
|
conf.verify_ssl = self.__verify_ssl
|
|
api = giteapy.OrganizationApi(giteapy.ApiClient(conf))
|
|
|
|
return api
|
|
"""
|
|
|
|
def _make_destination_repo_name(self, pattern: str, repo: gitea.Repository):
|
|
|
|
repo_name = pattern.replace(self.__REPO_ORIGINAL_NAME_TOKEN, repo.name)
|
|
|
|
return repo_name
|
|
|
|
"""
|
|
def set_ca_bundle(self, bundle_path: str):
|
|
|
|
self.__logger.info("Setting certificate bundle path")
|
|
|
|
# Hacky but oh well
|
|
self.__logger.info(f"Old path: {certifi.where()}")
|
|
certifi.core._CACERT_PATH = bundle_path
|
|
self.__logger.info(f"New path: {certifi.where()}")
|
|
|
|
# TODO: JUST TESTING
|
|
self.__verify_ssl = bundle_path
|
|
"""
|
|
|
|
def migrate_entire_org(
|
|
self,
|
|
interactive: bool = True,
|
|
source_org: str = None, source_topics: list[str] = None,
|
|
destination_org: str = None, destination_repo_name: str = None, destination_topics: list = None,
|
|
do_destination_copy_topics: bool = True
|
|
):
|
|
assert source_org is not None, "Source org must be specified"
|
|
assert destination_org is not None, "Destination org must be specified"
|
|
assert destination_repo_name is not None, "Destination repo name must be specified"
|
|
assert destination_topics is not None, "Destination topics must be specified"
|
|
assert do_destination_copy_topics is not None, "Destination directive to copy source topics should be specified"
|
|
|
|
# api_source, api_destination = self._get_org_apis()
|
|
# api_source: giteapy.OrganizationApi
|
|
# api_destination: giteapy.OrganizationApi
|
|
|
|
# Tattle on certify
|
|
self.__logger.info(f"Certifi is currently using CA bundle: {certifi.where()}")
|
|
|
|
# Grab all org repos
|
|
source_repos = self._fetch_all_org_repos(
|
|
org_name=source_org
|
|
)
|
|
self.__logger.info(f"Found {len(source_repos)} repos on source:")
|
|
for repo in source_repos:
|
|
repo: gitea.Repository
|
|
self.__logger.info(f"- {repo.get_full_name()}")
|
|
|
|
print()
|
|
|
|
# Filter
|
|
source_repos = self._filter_repos_for_required_topics(
|
|
repos=source_repos,
|
|
topics_required=source_topics
|
|
)
|
|
print()
|
|
self.__logger.info(f"Have {len(source_repos)} remaining repos after topic filtering:")
|
|
for repo in source_repos:
|
|
repo: gitea.Repository
|
|
self.__logger.info(f"- {repo.get_full_name()}")
|
|
|
|
repos_migrate = []
|
|
repos_ignore = []
|
|
go_right_now = False
|
|
for repo in source_repos:
|
|
|
|
repo: gitea.Repository
|
|
|
|
while True:
|
|
|
|
if interactive:
|
|
response = input(
|
|
f"Migrate repo #{repo.id} \"{repo.full_name}\" ?"
|
|
" (Y)es, (N)o, (G)o right now, (Q)uit ==> "
|
|
)
|
|
response = response.lower()
|
|
else:
|
|
response = "y"
|
|
|
|
valid_input = True
|
|
if response == "y":
|
|
repos_migrate.append(repo)
|
|
elif response == "n":
|
|
repos_ignore.append(repo)
|
|
elif response == "g":
|
|
self.__logger.info("Okay, done asking questions, migrating existing selections.")
|
|
go_right_now = True
|
|
elif response == "q":
|
|
go_right_now = True
|
|
repos_migrate.clear()
|
|
repos_ignore.clear()
|
|
self.__logger.info("Okay, quitting instead.")
|
|
else:
|
|
valid_input = False
|
|
self.__logger.warning(f"Invalid input: {response}")
|
|
|
|
if valid_input:
|
|
break
|
|
|
|
if go_right_now:
|
|
break
|
|
|
|
# Announce repo destination names
|
|
self.__logger.info("")
|
|
if len(repos_migrate):
|
|
self.__logger.info("Repos to migrate:")
|
|
for repo in repos_migrate:
|
|
repo: gitea.Repository
|
|
destination_name = self._make_destination_repo_name(
|
|
pattern=destination_repo_name, repo=repo
|
|
)
|
|
self.__logger.info(
|
|
f"#{repo.id} \"{repo.name}\"\n> \"{destination_name}\""
|
|
)
|
|
else:
|
|
self.__logger.info("No repos marked to migrate")
|
|
|
|
# Announce manually ignored repos
|
|
self.__logger.info("")
|
|
if len(repos_ignore):
|
|
self.__logger.info("Repos to ignore:")
|
|
for repo in repos_ignore:
|
|
repo: gitea.Repository
|
|
self.__logger.info(f"#{repo.id} \"{repo.get_full_name()}\"")
|
|
else:
|
|
self.__logger.info("No repos marked to ignore")
|
|
|
|
# Migrate
|
|
if len(repos_migrate):
|
|
|
|
confirmation = input("Do you confirm the above selections? Enter MIGRATE ==> ")
|
|
|
|
if confirmation == "MIGRATE":
|
|
|
|
self.__logger.info("Confirmation received; Processing ... ")
|
|
source_repos_successful, source_repos_failed = self._migrate_repos(
|
|
destination_org_name=destination_org,
|
|
destination_repo_name=destination_repo_name,
|
|
destination_topics=destination_topics,
|
|
do_destination_copy_topics=do_destination_copy_topics,
|
|
repos=repos_migrate
|
|
)
|
|
self.__logger.info(
|
|
f"{len(source_repos_successful)} of {len(repos_migrate)}"
|
|
" repos successfully migrated."
|
|
)
|
|
if len(source_repos_failed) > 0:
|
|
self.__logger.error(f"Failed to migrate {len(source_repos_failed)} repos:")
|
|
for repo, exception in source_repos_failed:
|
|
self.__logger.error(
|
|
f"> {repo.name}"
|
|
)
|
|
self.__logger.error(f"Captured exception data:")
|
|
for repo, exception in source_repos_failed:
|
|
self.__logger.error(
|
|
f"Failed to migrate repo: {repo.name}\n> {exception}"
|
|
)
|
|
|
|
self._delete_migrated_repos(
|
|
source_org_name=source_org,
|
|
repos=source_repos_successful
|
|
)
|
|
|
|
else:
|
|
self.__logger.info("Confirmation not received; Won't do anything.")
|
|
|
|
def _fetch_all_org_repos(self, org_name: str):
|
|
|
|
org = gitea.Organization.request(
|
|
gitea=self.__source_api,
|
|
name=org_name
|
|
)
|
|
|
|
# Grabs all pages automatically
|
|
repos = org.get_repositories()
|
|
|
|
return repos
|
|
|
|
def _filter_repos_for_required_topics(
|
|
|
|
self,
|
|
repos: list[gitea.Repository],
|
|
topics_required: list[str]
|
|
|
|
) -> list[gitea.Repository]:
|
|
|
|
self.__logger.info(
|
|
f"Filtering source repos for required topics: {topics_required}"
|
|
)
|
|
|
|
repos_keep = []
|
|
repos_reject = []
|
|
repo_topics = {}
|
|
|
|
for repo in repos:
|
|
|
|
repo: gitea.Repository
|
|
|
|
repo_key = repo.get_full_name()
|
|
|
|
topics_present = repo.get_topics()
|
|
repo_topics[repo_key] = topics_present
|
|
|
|
if self._check_required_topics(
|
|
topics_present=repo_topics[repo_key],
|
|
topics_required=topics_required
|
|
):
|
|
repos_keep.append(repo)
|
|
else:
|
|
repos_reject.append(repo)
|
|
|
|
self.__logger.info("")
|
|
self.__logger.info(
|
|
f"\nKeeping {len(repos_keep)} repos"
|
|
f" because they contain all required topics ({topics_required}):"
|
|
)
|
|
if len(repos_keep) > 0:
|
|
for repo in repos_keep:
|
|
self.__logger.info(f"> {repo.full_name}")
|
|
else:
|
|
self.__logger.info("> None")
|
|
|
|
self.__logger.info("")
|
|
self.__logger.info(
|
|
f"Rejecting {len(repos_reject)} repos because they don't contain all required topics:"
|
|
)
|
|
if len(repos_reject) > 0:
|
|
for repo in repos_reject:
|
|
self.__logger.info(f"> {repo.full_name}")
|
|
else:
|
|
self.__logger.info("> None")
|
|
|
|
return repos_keep
|
|
|
|
@staticmethod
|
|
def _check_required_topics(topics_present: list[str], topics_required: list[str]) -> bool:
|
|
|
|
for topic in topics_required:
|
|
if topic not in topics_present:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _migrate_repos(
|
|
self,
|
|
destination_org_name: str,
|
|
destination_repo_name: str,
|
|
destination_topics: list,
|
|
do_destination_copy_topics: bool,
|
|
repos: list
|
|
):
|
|
|
|
# api_source, api_destination = self._get_org_apis()
|
|
# destination_org = api_destination.org_get(org=destination_org_name)
|
|
# destination_org: giteapy.Organization
|
|
|
|
api_dest_org = gitea.Organization.request(
|
|
gitea=self.__destination_api,
|
|
name=destination_org_name
|
|
)
|
|
|
|
self.__logger.info(f"Destination organization: {api_dest_org.full_name}")
|
|
|
|
source_repos_successful = []
|
|
source_repos_failed = []
|
|
for source_repo in repos:
|
|
|
|
source_repo: gitea.Repository
|
|
|
|
this_destination_repo_name = destination_repo_name.replace(
|
|
"%N%",
|
|
source_repo.name
|
|
)
|
|
|
|
self.__logger.info(
|
|
f"Migrating: {source_repo.name} ==> {this_destination_repo_name}"
|
|
)
|
|
|
|
source_repo_topics = source_repo.get_topics()
|
|
|
|
try:
|
|
|
|
repo_new = gitea.Repository.migrate_repo(
|
|
gitea=self.__destination_api,
|
|
service="gitea", # type of remote service
|
|
clone_addr=source_repo.clone_url,
|
|
repo_name=this_destination_repo_name,
|
|
description=source_repo.description,
|
|
private=source_repo.private,
|
|
auth_token=self.__source_token,
|
|
auth_username=None,
|
|
auth_password=None,
|
|
mirror=False,
|
|
mirror_interval=None,
|
|
# lfs=False,
|
|
# lfs_endpoint="",
|
|
wiki=True,
|
|
labels=True,
|
|
issues=True,
|
|
pull_requests=True,
|
|
releases=True,
|
|
milestones=True,
|
|
repo_owner=destination_org_name,
|
|
)
|
|
|
|
except Exception as e:
|
|
self.__logger.error(
|
|
f"Failed to execute repo migration request:"
|
|
f"\n{e}"
|
|
)
|
|
source_repos_failed.append(
|
|
(source_repo, e)
|
|
)
|
|
continue
|
|
|
|
self.__logger.debug(f"Migration result: {repo_new}")
|
|
repo_new: gitea.Repository
|
|
|
|
assert repo_new.name == this_destination_repo_name, \
|
|
"New repository didn't end up with the correct name. Failure?"
|
|
|
|
# Copy source topics?
|
|
if do_destination_copy_topics:
|
|
|
|
for topic in source_repo_topics:
|
|
|
|
self.__logger.debug(f"Appending source topic to new repo: {topic}")
|
|
|
|
repo_new.add_topic(topic=topic)
|
|
|
|
# Add specified topics
|
|
for topic in destination_topics:
|
|
|
|
self.__logger.debug(f"Appending topic to new repo: {topic}")
|
|
|
|
repo_new.add_topic(topic=topic)
|
|
|
|
source_repos_successful.append(source_repo)
|
|
|
|
return source_repos_successful, source_repos_failed
|
|
|
|
def _delete_migrated_repos(self, source_org_name: str, repos: list[gitea.Repository]):
|
|
|
|
if len(repos) == 0:
|
|
self.__logger.warning(f"Cannot delete any migrated repos because none were successful!")
|
|
return
|
|
|
|
self.__logger.info("")
|
|
self.__logger.info(f"Can now delete repos from source org: {source_org_name}")
|
|
self.__logger.info(f"Will delete {len(repos)} successfully migrated repos:")
|
|
for r in repos:
|
|
|
|
r: gitea.Repository
|
|
|
|
self.__logger.info(f"> #{r.id} \"{r.full_name}\" ==> {r.clone_url}")
|
|
|
|
# Ask the user to confirm deletion
|
|
response = input(
|
|
"Would you like to delete the successfully migrated repos? Type DELETE ==> "
|
|
)
|
|
if response != "DELETE":
|
|
self.__logger.info("Okay, won't delete migrated repos.")
|
|
return
|
|
|
|
do_quit = False
|
|
do_delete_all = False
|
|
for repo in repos:
|
|
|
|
repo: gitea.Repository
|
|
|
|
self.__logger.info(f"Next repo to delete: \"{repo.full_name}\"")
|
|
do_delete = True if do_delete_all else False
|
|
|
|
if do_delete is False:
|
|
|
|
response = input("Delete this repo? (Y)es, (N), (A)ll, (Q)uit ==> ")
|
|
response = response.lower()
|
|
|
|
valid_response = False
|
|
|
|
while valid_response is False:
|
|
|
|
valid_response = True
|
|
if response == "y":
|
|
self.__logger.info(f"Okay, deleting {repo.name} from source")
|
|
do_delete = True
|
|
elif response == "n":
|
|
self.__logger.info(f"Okay, won't delete {repo.name}")
|
|
elif response == "a":
|
|
self.__logger.info(f"Okay, deleting ALL remaining repos")
|
|
do_delete_all = True
|
|
elif response == "q":
|
|
do_quit = True
|
|
else:
|
|
valid_response = False
|
|
self.__logger.warning(f"Invalid response: {response}")
|
|
|
|
if do_quit:
|
|
break
|
|
|
|
if do_delete or do_delete_all:
|
|
|
|
self.__logger.info(f"Deleting repo: {repo.full_name}")
|
|
|
|
repo.delete()
|