refactor: move status functions into seperate files

This commit is contained in:
Sam Chau
2024-09-03 19:40:00 +09:30
parent a685dfa3a5
commit 8e9cd5e357
5 changed files with 259 additions and 230 deletions

View File

@@ -0,0 +1,34 @@
# git/status/incoming_changes.py
import os
import logging
from .utils import extract_data_from_yaml, determine_type
logger = logging.getLogger(__name__)
def get_incoming_changes(repo, branch):
incoming_changes = []
diff = repo.git.diff(f'HEAD...origin/{branch}', name_only=True)
changed_files = diff.split('\n') if diff else []
for file_path in changed_files:
if file_path:
full_path = os.path.join(repo.working_dir, file_path)
file_data = extract_data_from_yaml(full_path) if os.path.exists(full_path) else None
# Correcting the git show command
commit_message = repo.git.show(f'HEAD...origin/{branch}', '--format=%B', '-s', file_path).strip()
incoming_changes.append({
'name': file_data.get('name', os.path.basename(file_path)) if file_data else os.path.basename(file_path),
'id': file_data.get('id') if file_data else None,
'type': determine_type(file_path),
'status': 'Incoming',
'file_path': file_path,
'commit_message': commit_message, # Include commit message here
'staged': False,
'modified': True,
'deleted': False
})
return incoming_changes

View File

@@ -0,0 +1,106 @@
# git/status/outgoing_changes.py
import os
import json
import yaml
import logging
from .utils import extract_data_from_yaml, determine_type, interpret_git_status
logger = logging.getLogger(__name__)
def get_outgoing_changes(repo):
status = repo.git.status('--porcelain', '-z').split('\0')
logger.debug(f"Raw porcelain status: {status}")
changes = []
for item in status:
if not item:
continue
logger.debug(f"Processing status item: {item}")
if len(item) < 4:
logger.warning(f"Unexpected status item format: {item}")
continue
x, y, file_path = item[0], item[1], item[3:]
logger.debug(f"Parsed status: x={x}, y={y}, file_path={file_path}")
is_staged = x != ' ' and x != '?'
is_deleted = x == 'D' or y == 'D'
full_path = os.path.join(repo.working_dir, file_path)
if is_deleted:
try:
# Get the content of the file from the last commit
file_content = repo.git.show(f'HEAD:{file_path}')
yaml_content = yaml.safe_load(file_content)
original_name = yaml_content.get('name', 'Unknown')
original_id = yaml_content.get('id', '')
except Exception as e:
logger.warning(f"Could not retrieve original name for deleted file {file_path}: {str(e)}")
original_name = "Unknown"
original_id = ""
changes.append({
'name': original_name,
'id': original_id,
'type': determine_type(file_path),
'status': 'Deleted',
'file_path': file_path,
'staged': is_staged,
'modified': False,
'deleted': True
})
elif os.path.isdir(full_path):
logger.debug(f"Found directory: {file_path}, going through folder.")
for root, dirs, files in os.walk(full_path):
for file in files:
if file.endswith('.yml') or file.endswith('.yaml'):
file_full_path = os.path.join(root, file)
logger.debug(f"Found file: {file_full_path}, going through file.")
file_data = extract_data_from_yaml(file_full_path)
if file_data:
logger.debug(f"File contents: {file_data}")
logger.debug(f"Found ID: {file_data.get('id')}")
logger.debug(f"Found Name: {file_data.get('name')}")
changes.append({
'name': file_data.get('name', ''),
'id': file_data.get('id', ''),
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': os.path.relpath(file_full_path, repo.working_dir),
'staged': x != '?' and x != ' ',
'modified': y == 'M',
'deleted': False
})
else:
logger.debug(f"No data extracted from file: {file_full_path}")
else:
file_data = extract_data_from_yaml(full_path) if os.path.exists(full_path) else None
if file_data:
changes.append({
'name': file_data.get('name', ''),
'id': file_data.get('id', ''),
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': file_path,
'staged': is_staged,
'modified': y != ' ',
'deleted': False
})
else:
changes.append({
'name': os.path.basename(file_path).replace('.yml', ''),
'id': '',
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': file_path,
'staged': is_staged,
'modified': y != ' ',
'deleted': False
})
logger.debug(f"Final changes: {json.dumps(changes, indent=2)}")
return changes

View File

@@ -0,0 +1,61 @@
# git/status/status.py
import git
from git.exc import GitCommandError, InvalidGitRepositoryError
import logging
import json
from .incoming_changes import get_incoming_changes
from .outgoing_changes import get_outgoing_changes
logger = logging.getLogger(__name__)
def get_commits_ahead(repo, branch):
return list(repo.iter_commits(f'origin/{branch}..{branch}'))
def get_commits_behind(repo, branch):
return list(repo.iter_commits(f'{branch}..origin/{branch}'))
def get_git_status(repo_path):
try:
logger.debug(f"Attempting to get status for repo at {repo_path}")
repo = git.Repo(repo_path)
logger.debug(f"Successfully created Repo object")
outgoing_changes = get_outgoing_changes(repo)
logger.debug(f"Outgoing changes detected: {outgoing_changes}")
branch = repo.active_branch.name
remote_branch_exists = f"origin/{branch}" in [ref.name for ref in repo.remotes.origin.refs]
if remote_branch_exists:
repo.remotes.origin.fetch()
commits_behind = get_commits_behind(repo, branch)
commits_ahead = get_commits_ahead(repo, branch)
logger.debug(f"Commits behind: {len(commits_behind)}, Commits ahead: {len(commits_ahead)}")
incoming_changes = get_incoming_changes(repo, branch)
else:
commits_behind = []
commits_ahead = []
incoming_changes = []
logger.debug("Remote branch does not exist, skipping commits ahead/behind and incoming changes calculation.")
status = {
"branch": branch,
"remote_branch_exists": remote_branch_exists,
"outgoing_changes": outgoing_changes,
"commits_behind": len(commits_behind),
"commits_ahead": len(commits_ahead),
"incoming_changes": incoming_changes,
}
logger.debug(f"Final status object: {json.dumps(status, indent=2)}")
return True, status
except GitCommandError as e:
logger.error(f"GitCommandError: {str(e)}")
return False, f"Git error: {str(e)}"
except InvalidGitRepositoryError:
logger.error(f"InvalidGitRepositoryError for path: {repo_path}")
return False, "Invalid Git repository"
except Exception as e:
logger.error(f"Unexpected error in get_git_status: {str(e)}", exc_info=True)
return False, f"Unexpected error: {str(e)}"

View File

@@ -0,0 +1,56 @@
# git/status/utils.py
import os
import yaml
import logging
logger = logging.getLogger(__name__)
def extract_data_from_yaml(file_path):
logger.debug(f"Extracting data from file: {file_path}")
try:
with open(file_path, 'r') as f:
content = yaml.safe_load(f)
logger.debug(f"File content: {content}") # Log the full file content
if content is None:
logger.error(f"Failed to parse YAML file or file is empty: {file_path}")
return None
# Check if expected keys are in the content
if 'name' not in content or 'id' not in content:
logger.warning(f"'name' or 'id' not found in file: {file_path}")
return {
'name': content.get('name'),
'id': content.get('id')
}
except Exception as e:
logger.warning(f"Error reading file {file_path}: {str(e)}")
return None
def determine_type(file_path):
if 'regex_patterns' in file_path:
return 'Regex Pattern'
elif 'custom_formats' in file_path:
return 'Custom Format'
elif 'profiles' in file_path:
return 'Quality Profile'
return 'Unknown'
def interpret_git_status(x, y):
if x == 'D' or y == 'D':
return 'Deleted'
elif x == 'A':
return 'Added'
elif x == 'M' or y == 'M':
return 'Modified'
elif x == 'R':
return 'Renamed'
elif x == 'C':
return 'Copied'
elif x == 'U':
return 'Updated but unmerged'
elif x == '?' and y == '?':
return 'Untracked'
else:
return 'Unknown'

View File

@@ -12,6 +12,7 @@ import requests
from .git.unlink_repo import repo_bp, unlink_repository
from .git.clone_repo import clone_repository
from .git.authenticate import validate_git_token
from .git.status.status import get_git_status
from .settings_utils import load_settings, save_settings
logging.basicConfig(level=logging.DEBUG)
@@ -26,191 +27,6 @@ SETTINGS_FILE = os.path.join(DATA_DIR, 'config', 'settings.yml')
REGEX_DIR = os.path.join(DB_DIR, 'regex_patterns')
FORMAT_DIR = os.path.join(DB_DIR, 'custom_formats')
def get_outgoing_changes(repo):
status = repo.git.status('--porcelain', '-z').split('\0')
logger.debug(f"Raw porcelain status: {status}")
changes = []
for item in status:
if not item:
continue
logger.debug(f"Processing status item: {item}")
if len(item) < 4:
logger.warning(f"Unexpected status item format: {item}")
continue
x, y, file_path = item[0], item[1], item[3:]
logger.debug(f"Parsed status: x={x}, y={y}, file_path={file_path}")
is_staged = x != ' ' and x != '?'
is_deleted = x == 'D' or y == 'D'
full_path = os.path.join(repo.working_dir, file_path)
if is_deleted:
try:
# Get the content of the file from the last commit
file_content = repo.git.show(f'HEAD:{file_path}')
yaml_content = yaml.safe_load(file_content)
original_name = yaml_content.get('name', 'Unknown')
original_id = yaml_content.get('id', '')
except Exception as e:
logger.warning(f"Could not retrieve original name for deleted file {file_path}: {str(e)}")
original_name = "Unknown"
original_id = ""
changes.append({
'name': original_name,
'id': original_id,
'type': determine_type(file_path),
'status': 'Deleted',
'file_path': file_path,
'staged': is_staged,
'modified': False,
'deleted': True
})
elif os.path.isdir(full_path):
logger.debug(f"Found directory: {file_path}, going through folder.")
for root, dirs, files in os.walk(full_path):
for file in files:
if file.endswith('.yml') or file.endswith('.yaml'):
file_full_path = os.path.join(root, file)
logger.debug(f"Found file: {file_full_path}, going through file.")
file_data = extract_data_from_yaml(file_full_path)
if file_data:
logger.debug(f"File contents: {file_data}")
logger.debug(f"Found ID: {file_data.get('id')}")
logger.debug(f"Found Name: {file_data.get('name')}")
changes.append({
'name': file_data.get('name', ''),
'id': file_data.get('id', ''),
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': os.path.relpath(file_full_path, repo.working_dir),
'staged': x != '?' and x != ' ',
'modified': y == 'M',
'deleted': False
})
else:
logger.debug(f"No data extracted from file: {file_full_path}")
else:
file_data = extract_data_from_yaml(full_path) if os.path.exists(full_path) else None
if file_data:
changes.append({
'name': file_data.get('name', ''),
'id': file_data.get('id', ''),
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': file_path,
'staged': is_staged,
'modified': y != ' ',
'deleted': False
})
else:
changes.append({
'name': os.path.basename(file_path).replace('.yml', ''),
'id': '',
'type': determine_type(file_path),
'status': interpret_git_status(x, y),
'file_path': file_path,
'staged': is_staged,
'modified': y != ' ',
'deleted': False
})
logger.debug(f"Final changes: {json.dumps(changes, indent=2)}")
return changes
def get_incoming_changes(repo, branch):
incoming_changes = []
diff = repo.git.diff(f'HEAD...origin/{branch}', name_only=True)
changed_files = diff.split('\n') if diff else []
for file_path in changed_files:
if file_path:
full_path = os.path.join(repo.working_dir, file_path)
file_data = extract_data_from_yaml(full_path) if os.path.exists(full_path) else None
# Correcting the git show command
commit_message = repo.git.show(f'HEAD...origin/{branch}', '--format=%B', '-s', file_path).strip()
incoming_changes.append({
'name': file_data.get('name', os.path.basename(file_path)) if file_data else os.path.basename(file_path),
'id': file_data.get('id') if file_data else None,
'type': determine_type(file_path),
'status': 'Incoming',
'file_path': file_path,
'commit_message': commit_message, # Include commit message here
'staged': False,
'modified': True,
'deleted': False
})
return incoming_changes
def extract_data_from_yaml(file_path):
logger.debug(f"Extracting data from file: {file_path}")
try:
with open(file_path, 'r') as f:
content = yaml.safe_load(f)
logger.debug(f"File content: {content}") # Log the full file content
if content is None:
logger.error(f"Failed to parse YAML file or file is empty: {file_path}")
return None
# Check if expected keys are in the content
if 'name' not in content or 'id' not in content:
logger.warning(f"'name' or 'id' not found in file: {file_path}")
return {
'name': content.get('name'),
'id': content.get('id')
}
except Exception as e:
logger.warning(f"Error reading file {file_path}: {str(e)}")
return None
def determine_type(file_path):
if 'regex_patterns' in file_path:
return 'Regex Pattern'
elif 'custom_formats' in file_path:
return 'Custom Format'
elif 'profiles' in file_path:
return 'Quality Profile'
return 'Unknown'
def interpret_git_status(x, y):
if x == 'D' or y == 'D':
return 'Deleted'
elif x == 'A':
return 'Added'
elif x == 'M' or y == 'M':
return 'Modified'
elif x == 'R':
return 'Renamed'
elif x == 'C':
return 'Copied'
elif x == 'U':
return 'Updated but unmerged'
elif x == '?' and y == '?':
return 'Untracked'
else:
return 'Unknown'
def get_staged_files(repo):
return [item.a_path for item in repo.index.diff('HEAD')]
def get_commits_ahead(repo, branch):
return list(repo.iter_commits(f'origin/{branch}..{branch}'))
def get_commits_behind(repo, branch):
return list(repo.iter_commits(f'{branch}..origin/{branch}'))
class SettingsManager:
def __init__(self):
self.settings = load_settings()
@@ -250,52 +66,8 @@ class SettingsManager:
return clone_repository(self.repo_url, self.repo_path, self.settings["gitToken"])
def get_git_status(self):
try:
logger.debug(f"Attempting to get status for repo at {self.repo_path}")
repo = git.Repo(self.repo_path)
logger.debug(f"Successfully created Repo object")
return get_git_status(self.repo_path)
outgoing_changes = get_outgoing_changes(repo)
logger.debug(f"Outgoing changes detected: {outgoing_changes}")
branch = repo.active_branch.name
remote_branch_exists = f"origin/{branch}" in [ref.name for ref in repo.remotes.origin.refs]
if remote_branch_exists:
repo.remotes.origin.fetch()
commits_behind = get_commits_behind(repo, branch)
commits_ahead = get_commits_ahead(repo, branch)
logger.debug(f"Commits behind: {len(commits_behind)}, Commits ahead: {len(commits_ahead)}")
incoming_changes = get_incoming_changes(repo, branch)
else:
commits_behind = []
commits_ahead = []
incoming_changes = []
logger.debug("Remote branch does not exist, skipping commits ahead/behind and incoming changes calculation.")
status = {
"branch": branch,
"remote_branch_exists": remote_branch_exists,
"outgoing_changes": outgoing_changes,
"commits_behind": len(commits_behind),
"commits_ahead": len(commits_ahead),
"incoming_changes": incoming_changes,
}
logger.debug(f"Final status object: {json.dumps(status, indent=2)}")
return True, status
except GitCommandError as e:
logger.error(f"GitCommandError: {str(e)}")
return False, f"Git error: {str(e)}"
except InvalidGitRepositoryError:
logger.error(f"InvalidGitRepositoryError for path: {self.repo_path}")
return False, "Invalid Git repository"
except Exception as e:
logger.error(f"Unexpected error in get_git_status: {str(e)}", exc_info=True)
return False, f"Unexpected error: {str(e)}"
def get_branches(self):
try:
logger.debug("Attempting to get branches")