refactor: move branch functionality into seperate files

This commit is contained in:
Sam Chau
2024-09-03 20:07:27 +09:30
parent 8fd657a3c8
commit 623f510ff9
6 changed files with 159 additions and 125 deletions

View File

@@ -0,0 +1,26 @@
# git/branches/branches.py
import git
from .create import create_branch
from .checkout import checkout_branch
from .delete import delete_branch
from .get import get_branches, get_current_branch
class Branch_Manager:
def __init__(self, repo_path):
self.repo_path = repo_path
def create(self, branch_name, base_branch='main'):
return create_branch(self.repo_path, branch_name, base_branch)
def checkout(self, branch_name):
return checkout_branch(self.repo_path, branch_name)
def delete(self, branch_name):
return delete_branch(self.repo_path, branch_name)
def get_all(self):
return get_branches(self.repo_path)
def get_current(self):
return get_current_branch(self.repo_path)

View File

@@ -0,0 +1,24 @@
# git/branches/checkout.py
import git
import logging
logger = logging.getLogger(__name__)
def checkout_branch(repo_path, branch_name):
try:
logger.debug(f"Attempting to checkout branch: {branch_name}")
repo = git.Repo(repo_path)
# Check if the branch exists
if branch_name not in repo.heads:
return False, f"Branch '{branch_name}' does not exist."
# Checkout the branch
repo.git.checkout(branch_name)
logger.debug(f"Successfully checked out branch: {branch_name}")
return True, {"message": f"Checked out branch: {branch_name}", "current_branch": branch_name}
except Exception as e:
logger.error(f"Error checking out branch: {str(e)}", exc_info=True)
return False, {"error": f"Error checking out branch: {str(e)}"}

View File

@@ -0,0 +1,29 @@
# git/branches/create.py
import git
import logging
logger = logging.getLogger(__name__)
def create_branch(repo_path, branch_name, base_branch='main'):
try:
logger.debug(f"Attempting to create branch {branch_name} from {base_branch}")
repo = git.Repo(repo_path)
# Check if the branch already exists
if branch_name in repo.heads:
return False, f"Branch '{branch_name}' already exists."
# Create and checkout the new branch
new_branch = repo.create_head(branch_name, commit=base_branch)
new_branch.checkout()
# Push the new branch to remote and set the upstream branch
origin = repo.remote(name='origin')
origin.push(refspec=f"{branch_name}:{branch_name}", set_upstream=True)
logger.debug(f"Successfully created and pushed branch: {branch_name}")
return True, {"message": f"Created and set upstream for branch: {branch_name}", "current_branch": branch_name}
except Exception as e:
logger.error(f"Error creating branch: {str(e)}", exc_info=True)
return False, {"error": f"Error creating branch: {str(e)}"}

View File

@@ -0,0 +1,35 @@
# git/branches/delete.py
import git
from git.exc import GitCommandError
import logging
logger = logging.getLogger(__name__)
def delete_branch(repo_path, branch_name):
try:
logger.debug(f"Attempting to delete branch: {branch_name}")
repo = git.Repo(repo_path)
# Check if the branch exists
if branch_name not in repo.heads:
return False, f"Branch '{branch_name}' does not exist."
# Check if it's the current branch
if repo.active_branch.name == branch_name:
return False, f"Cannot delete the current branch: {branch_name}"
# Delete the branch locally
repo.delete_head(branch_name, force=True)
# Delete the branch remotely
try:
repo.git.push('origin', '--delete', branch_name)
except GitCommandError:
logger.warning(f"Failed to delete remote branch: {branch_name}. It may not exist on remote.")
logger.debug(f"Successfully deleted branch: {branch_name}")
return True, {"message": f"Deleted branch: {branch_name}", "current_branch": repo.active_branch.name}
except Exception as e:
logger.error(f"Error deleting branch: {str(e)}", exc_info=True)
return False, {"error": f"Error deleting branch: {str(e)}"}

View File

@@ -0,0 +1,38 @@
# git/branches/get.py
import git
import logging
logger = logging.getLogger(__name__)
def get_branches(repo_path):
try:
logger.debug("Attempting to get branches")
repo = git.Repo(repo_path)
# Get local branches
local_branches = [{'name': branch.name} for branch in repo.heads]
# Get remote branches
remote_branches = [{'name': ref.remote_head} for ref in repo.remote().refs]
# Combine and remove duplicates, and exclude 'HEAD'
all_branches = {branch['name']: branch for branch in local_branches + remote_branches if branch['name'] != 'HEAD'}.values()
logger.debug(f"Successfully retrieved branches: {[branch['name'] for branch in all_branches]}")
# Log the branches before sending
logger.info(f"Branches being sent: {[branch['name'] for branch in all_branches]}")
return True, {"branches": list(all_branches)}
except Exception as e:
logger.error(f"Error getting branches: {str(e)}", exc_info=True)
return False, {"error": f"Error getting branches: {str(e)}"}
def get_current_branch(repo_path):
try:
repo = git.Repo(repo_path)
return repo.active_branch.name
except Exception as e:
logger.error(f"Error getting current branch: {str(e)}", exc_info=True)
return None

View File

@@ -14,6 +14,7 @@ from .git.clone_repo import clone_repository
from .git.authenticate import validate_git_token
from .git.status.status import get_git_status
from .git.status.diff import get_diff
from .git.branches.branches import Branch_Manager
from .settings_utils import load_settings, save_settings
logging.basicConfig(level=logging.DEBUG)
@@ -33,35 +34,7 @@ class SettingsManager:
self.settings = load_settings()
self.repo_url = self.settings.get('gitRepo') if self.settings else None
self.repo_path = DB_DIR
def get_default_branch(self):
try:
logger.info(f"Fetching default branch for repo: {self.repo_url}")
parts = self.repo_url.strip('/').split('/')
if len(parts) < 2:
logger.error("Invalid repository URL")
return None
repo_owner, repo_name = parts[-2], parts[-1].replace('.git', '')
api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
headers = {
'Authorization': f'Bearer {self.settings["gitToken"]}',
'Accept': 'application/vnd.github+json'
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
repo_info = response.json()
default_branch = repo_info.get('default_branch', 'main')
logger.info(f"Default branch: {default_branch}")
return default_branch
else:
logger.error(f"Failed to fetch default branch, status code: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching default branch: {str(e)}", exc_info=True)
return None
self.branch_manager = Branch_Manager(self.repo_path)
def clone_repository(self):
return clone_repository(self.repo_url, self.repo_path, self.settings["gitToken"])
@@ -70,110 +43,19 @@ class SettingsManager:
return get_git_status(self.repo_path)
def get_branches(self):
try:
logger.debug("Attempting to get branches")
repo = git.Repo(self.repo_path)
# Get local branches
local_branches = [{'name': branch.name} for branch in repo.heads]
# Get remote branches
remote_branches = [{'name': ref.remote_head} for ref in repo.remote().refs]
# Combine and remove duplicates, and exclude 'HEAD'
all_branches = {branch['name']: branch for branch in local_branches + remote_branches if branch['name'] != 'HEAD'}.values()
logger.debug(f"Successfully retrieved branches: {[branch['name'] for branch in all_branches]}")
# Log the branches before sending
logger.info(f"Branches being sent: {[branch['name'] for branch in all_branches]}")
return True, {"branches": list(all_branches)}
except Exception as e:
logger.error(f"Error getting branches: {str(e)}", exc_info=True)
return False, {"error": f"Error getting branches: {str(e)}"}
return self.branch_manager.get_all()
def create_branch(self, branch_name, base_branch='main'):
try:
logger.debug(f"Attempting to create branch {branch_name} from {base_branch}")
repo = git.Repo(self.repo_path)
# Check if the branch already exists
if branch_name in repo.heads:
return False, f"Branch '{branch_name}' already exists."
# Create and checkout the new branch
new_branch = repo.create_head(branch_name, commit=base_branch)
new_branch.checkout()
# Push the new branch to remote and set the upstream branch
origin = repo.remote(name='origin')
origin.push(refspec=f"{branch_name}:{branch_name}", set_upstream=True)
logger.debug(f"Successfully created and pushed branch: {branch_name}")
return True, {"message": f"Created and set upstream for branch: {branch_name}", "current_branch": branch_name}
except Exception as e:
logger.error(f"Error creating branch: {str(e)}", exc_info=True)
return False, {"error": f"Error creating branch: {str(e)}"}
return self.branch_manager.create(branch_name, base_branch)
def checkout_branch(self, branch_name):
try:
logger.debug(f"Attempting to checkout branch: {branch_name}")
repo = git.Repo(self.repo_path)
# Check if the branch exists
if branch_name not in repo.heads:
return False, f"Branch '{branch_name}' does not exist."
# Checkout the branch
repo.git.checkout(branch_name)
logger.debug(f"Successfully checked out branch: {branch_name}")
return True, {"message": f"Checked out branch: {branch_name}", "current_branch": branch_name}
except Exception as e:
logger.error(f"Error checking out branch: {str(e)}", exc_info=True)
return False, {"error": f"Error checking out branch: {str(e)}"}
return self.branch_manager.checkout(branch_name)
def delete_branch(self, branch_name):
try:
logger.debug(f"Attempting to delete branch: {branch_name}")
repo = git.Repo(self.repo_path)
# Check if the branch exists
if branch_name not in repo.heads:
return False, f"Branch '{branch_name}' does not exist."
# Check if it's the current branch
if repo.active_branch.name == branch_name:
return False, f"Cannot delete the current branch: {branch_name}"
# Delete the branch locally
repo.delete_head(branch_name, force=True)
# Delete the branch remotely
try:
repo.git.push('origin', '--delete', branch_name)
except GitCommandError:
logger.warning(f"Failed to delete remote branch: {branch_name}. It may not exist on remote.")
logger.debug(f"Successfully deleted branch: {branch_name}")
return True, {"message": f"Deleted branch: {branch_name}", "current_branch": repo.active_branch.name}
except Exception as e:
logger.error(f"Error deleting branch: {str(e)}", exc_info=True)
return False, {"error": f"Error deleting branch: {str(e)}"}
return self.branch_manager.delete(branch_name)
def get_current_branch(self):
try:
repo = git.Repo(self.repo_path)
return repo.active_branch.name
except Exception as e:
logger.error(f"Error getting current branch: {str(e)}", exc_info=True)
return None
return self.branch_manager.get_current()
def stage_files(self, files):
try: