Files
profilarr/backend/app/importarr/format.py
Samuel Chau ecc5f9eade refactor: profile import optimisation (#165)
feat: add concurrent, async requests to improve profile / format import performance

feat: optimise cf prefetching, reduce verbose logging
- now only make single request, then propagate changes to remaining profiles
- remove verbose logging

fix: increase Gunicorn worker timeout to 10 minutes to prevent timeouts during database syncing operations
2025-03-16 10:41:13 +10:30

399 lines
15 KiB
Python

import requests
import logging
import json
import yaml
import asyncio
import aiohttp
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from ..data.utils import (load_yaml_file, get_category_directory, REGEX_DIR,
FORMAT_DIR)
from ..compile import CustomFormat, FormatConverter, TargetApp
from ..db.queries.format_renames import is_format_in_renames
logger = logging.getLogger('importarr')
def import_formats_to_arr(format_names, base_url, api_key, arr_type,
original_names):
"""
Import custom formats to arr instance.
This function supports bulk importing of formats with sequential processing.
"""
logger.info(
f"Received {len(format_names)} formats to import for {arr_type}")
# For larger imports, use the async version to improve performance
if len(format_names) > 5:
# Run async function within the event loop
return asyncio.run(
async_import_formats_to_arr(
format_names=format_names,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
original_names=original_names
)
)
# For smaller imports, use the regular synchronous version
results = {
'success': True,
'added': 0,
'updated': 0,
'failed': 0,
'details': []
}
try:
logger.info("Looking for existing formats...")
existing_formats = get_existing_formats(base_url, api_key)
if existing_formats is None:
return {
'success': False,
'error': 'Failed to get existing formats'
}
existing_names = {fmt['name']: fmt['id'] for fmt in existing_formats}
patterns = {}
for pattern_file in Path(REGEX_DIR).glob('*.yml'):
try:
pattern_data = load_yaml_file(str(pattern_file))
if pattern_data and 'name' in pattern_data and 'pattern' in pattern_data:
patterns[pattern_data['name']] = pattern_data['pattern']
except Exception as e:
logger.error(
f"Error loading pattern file {pattern_file}: {str(e)}")
continue
converter = FormatConverter(patterns)
target_app = TargetApp.RADARR if arr_type.lower(
) == 'radarr' else TargetApp.SONARR
for i, format_name in enumerate(format_names):
try:
# Use original name for file lookup
original_name = original_names[i]
format_file = f"{get_category_directory('custom_format')}/{original_name}.yml"
format_data = load_yaml_file(format_file)
custom_format = CustomFormat(**format_data)
converted_format = converter.convert_format(
custom_format, target_app)
if not converted_format:
raise ValueError("Format conversion failed")
# Create base compiled data with ordered fields
compiled_data = {'name': format_name} # Start with name
# Check rename status and add field right after name if true
if is_format_in_renames(original_name):
compiled_data['includeCustomFormatWhenRenaming'] = True
logger.info(
f"Format {original_name} has renames enabled, including field"
)
# Add specifications last
compiled_data['specifications'] = [
vars(spec) for spec in converted_format.specifications
]
result = process_format(compiled_data, existing_names,
base_url, api_key)
if result['success']:
results[result['action']] += 1
else:
results['failed'] += 1
results['success'] = False
results['details'].append(result['detail'])
except Exception as e:
logger.error(
f"Error processing format {format_name}: {str(e)}")
results['failed'] += 1
results['success'] = False
results['details'].append({
'name': format_name,
'action': 'failed',
'success': False,
'error': str(e)
})
logger.info(
f"Importing {len(format_names)} formats complete. "
f"Added: {results['added']}, Updated: {results['updated']}, "
f"Failed: {results['failed']}")
return results
except Exception as e:
logger.error(f"Error in import_formats_to_arr: {str(e)}")
return {'success': False, 'error': str(e)}
async def async_import_formats_to_arr(format_names: List[str],
base_url: str,
api_key: str,
arr_type: str,
original_names: List[str]) -> Dict:
"""
Asynchronous version of import_formats_to_arr that processes formats concurrently.
This significantly improves performance for large batches.
"""
logger.info(
f"Received {len(format_names)} formats to import (async) for {arr_type}")
results = {
'success': True,
'added': 0,
'updated': 0,
'failed': 0,
'details': []
}
try:
logger.info("Looking for existing formats (async)...")
existing_formats = await async_get_existing_formats(base_url, api_key)
if existing_formats is None:
return {
'success': False,
'error': 'Failed to get existing formats'
}
existing_names = {fmt['name']: fmt['id'] for fmt in existing_formats}
# Load patterns - this doesn't need to be async as it's file system operations
patterns = {}
for pattern_file in Path(REGEX_DIR).glob('*.yml'):
try:
pattern_data = load_yaml_file(str(pattern_file))
if pattern_data and 'name' in pattern_data and 'pattern' in pattern_data:
patterns[pattern_data['name']] = pattern_data['pattern']
except Exception as e:
logger.error(
f"Error loading pattern file {pattern_file}: {str(e)}")
continue
converter = FormatConverter(patterns)
target_app = TargetApp.RADARR if arr_type.lower() == 'radarr' else TargetApp.SONARR
# Process all formats into API-ready format first
compiled_formats = []
format_tasks = []
for i, format_name in enumerate(format_names):
try:
# Use original name for file lookup
original_name = original_names[i]
format_file = f"{get_category_directory('custom_format')}/{original_name}.yml"
format_data = load_yaml_file(format_file)
custom_format = CustomFormat(**format_data)
converted_format = converter.convert_format(custom_format, target_app)
if not converted_format:
raise ValueError("Format conversion failed")
# Create base compiled data with ordered fields
compiled_data = {'name': format_name} # Start with name
# Check rename status and add field right after name if true
if is_format_in_renames(original_name):
compiled_data['includeCustomFormatWhenRenaming'] = True
logger.info(
f"Format {original_name} has renames enabled, including field"
)
# Add specifications last
compiled_data['specifications'] = [
vars(spec) for spec in converted_format.specifications
]
compiled_formats.append((format_name, compiled_data))
except Exception as e:
logger.error(f"Error processing format {format_name}: {str(e)}")
results['failed'] += 1
results['success'] = False
results['details'].append({
'name': format_name,
'action': 'failed',
'success': False,
'error': str(e)
})
# Now create async tasks for all formats to upload them concurrently
for format_name, compiled_data in compiled_formats:
task = asyncio.ensure_future(
async_process_format(
format_data=compiled_data,
existing_names=existing_names,
base_url=base_url,
api_key=api_key
)
)
format_tasks.append((format_name, task))
# Wait for all format uploads to complete
for format_name, task in format_tasks:
try:
result = await task
if result['success']:
results[result['action']] += 1
else:
results['failed'] += 1
results['success'] = False
results['details'].append(result['detail'])
except Exception as e:
logger.error(f"Error waiting for format task {format_name}: {str(e)}")
results['failed'] += 1
results['success'] = False
results['details'].append({
'name': format_name,
'action': 'failed',
'success': False,
'error': str(e)
})
logger.info(
f"Async importing {len(format_names)} formats complete. "
f"Added: {results['added']}, Updated: {results['updated']}, "
f"Failed: {results['failed']}")
return results
except Exception as e:
logger.error(f"Error in async_import_formats_to_arr: {str(e)}")
return {'success': False, 'error': str(e)}
def get_existing_formats(base_url, api_key):
try:
response = requests.get(f"{base_url.rstrip('/')}/api/v3/customformat",
headers={'X-Api-Key': api_key})
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Error getting existing formats: {str(e)}")
return None
async def async_get_existing_formats(base_url: str, api_key: str) -> Optional[List[Dict]]:
"""Async version of get_existing_formats"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url.rstrip('/')}/api/v3/customformat",
headers={'X-Api-Key': api_key}
) as response:
if response.status == 200:
return await response.json()
return None
except Exception as e:
logger.error(f"Error getting existing formats (async): {str(e)}")
return None
def process_format(format_data, existing_names, base_url, api_key):
format_name = format_data['name']
if format_name in existing_names:
format_data['id'] = existing_names[format_name]
success = update_format(base_url, api_key, format_data)
action = 'updated'
else:
success = add_format(base_url, api_key, format_data)
action = 'added'
return {
'success': success,
'action': action if success else 'failed',
'detail': {
'name': format_name,
'action': action if success else 'failed',
'success': success
}
}
async def async_process_format(format_data: Dict, existing_names: Dict[str, int],
base_url: str, api_key: str) -> Dict:
"""Async version of process_format"""
format_name = format_data['name']
if format_name in existing_names:
format_data['id'] = existing_names[format_name]
success = await async_update_format(base_url, api_key, format_data)
action = 'updated'
else:
success = await async_add_format(base_url, api_key, format_data)
action = 'added'
return {
'success': success,
'action': action if success else 'failed',
'detail': {
'name': format_name,
'action': action if success else 'failed',
'success': success
}
}
def update_format(base_url, api_key, format_data):
try:
url = f"{base_url.rstrip('/')}/api/v3/customformat/{format_data['id']}"
response = requests.put(url,
headers={'X-Api-Key': api_key},
json=format_data)
logger.info(f"Update format '{format_data['name']}' response: {response.status_code}")
return response.status_code in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error updating format: {str(e)}")
return False
async def async_update_format(base_url: str, api_key: str, format_data: Dict) -> bool:
"""Async version of update_format"""
try:
url = f"{base_url.rstrip('/')}/api/v3/customformat/{format_data['id']}"
async with aiohttp.ClientSession() as session:
async with session.put(
url,
headers={'X-Api-Key': api_key},
json=format_data
) as response:
logger.info(f"Update format '{format_data['name']}' response: {response.status} (async)")
return response.status in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error updating format (async): {str(e)}")
return False
def add_format(base_url, api_key, format_data):
try:
url = f"{base_url.rstrip('/')}/api/v3/customformat"
response = requests.post(url,
headers={'X-Api-Key': api_key},
json=format_data)
logger.info(f"Add format '{format_data['name']}' response: {response.status_code}")
return response.status_code in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error adding format: {str(e)}")
return False
async def async_add_format(base_url: str, api_key: str, format_data: Dict) -> bool:
"""Async version of add_format"""
try:
url = f"{base_url.rstrip('/')}/api/v3/customformat"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={'X-Api-Key': api_key},
json=format_data
) as response:
logger.info(f"Add format '{format_data['name']}' response: {response.status} (async)")
return response.status in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error adding format (async): {str(e)}")
return False