Files
profilarr/backend/app/importarr/profile.py
Samuel Chau ecc5f9eade refactor: profile import optimisation (#165)
feat: add concurrent, async requests to improve profile / format import performance

feat: optimise cf prefetching, reduce verbose logging
- now only make single request, then propagate changes to remaining profiles
- remove verbose logging

fix: increase Gunicorn worker timeout to 10 minutes to prevent timeouts during database syncing operations
2025-03-16 10:41:13 +10:30

862 lines
35 KiB
Python

# app/importarr/profile.py
import requests
import logging
import json
import yaml
import asyncio
import aiohttp
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from ..data.utils import load_yaml_file, get_category_directory
from ..compile.profile_compiler import compile_quality_profile
from ..compile.mappings import TargetApp
from .format import import_formats_to_arr
from .format_memory import import_format_from_memory, async_import_format_from_memory
from ..arr.manager import get_arr_config
logger = logging.getLogger('importarr')
def import_profiles_to_arr(profile_names: List[str], original_names: List[str],
base_url: str, api_key: str, arr_type: str,
arr_id: str, import_as_unique: bool) -> Dict:
"""
Import quality profiles to arr instance.
This function supports bulk importing of profiles with sequential or concurrent processing.
"""
logger.info(
f"Received {len(profile_names)} profiles to import for {arr_type}")
# For larger imports, use the async version to improve performance
if len(profile_names) > 1:
# Run async function within the event loop
return asyncio.run(
async_import_profiles_to_arr(profile_names=profile_names,
original_names=original_names,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
arr_id=arr_id,
import_as_unique=import_as_unique))
# For smaller imports, use the regular synchronous version
results = {
'success': True,
'added': 0,
'updated': 0,
'failed': 0,
'details': []
}
try:
arr_config_response = get_arr_config(arr_id)
if not arr_config_response['success']:
return {
'success': False,
'error': 'Failed to get arr configuration'
}
arr_config = arr_config_response['data']
logger.info("Looking for existing profiles...")
existing_profiles = get_existing_profiles(base_url, api_key)
if existing_profiles is None:
return {
'success': False,
'error': 'Failed to get existing profiles'
}
# Create mapping for existing profiles
existing_profile_map = {}
for profile in existing_profiles:
existing_profile_map[profile['name']] = profile['id']
target_app = TargetApp.RADARR if arr_type.lower(
) == 'radarr' else TargetApp.SONARR
for i, profile_name in enumerate(profile_names):
try:
# Use original name for file lookup
original_name = original_names[i]
profile_file = f"{get_category_directory('profile')}/{original_name}.yml"
profile_data = load_yaml_file(profile_file)
# Set the potentially modified profile name
profile_data['name'] = profile_name
# Modify custom format names if import_as_unique is true
if import_as_unique and 'custom_formats' in profile_data:
for cf in profile_data['custom_formats']:
cf['name'] = f"{cf['name']} [Dictionarry]"
# Profile loaded
profile_language = profile_data.get('language', 'any')
if profile_language != 'any':
# Detect if we're using simple or advanced mode
is_simple_mode = '_' not in profile_language
if is_simple_mode:
logger.info(
f"Profile '{profile_name}' has simple mode language: {profile_language}"
)
logger.info(
f"Simple mode will set language filter to: {profile_language}"
)
else:
logger.info(
f"Profile '{profile_name}' has advanced mode language: {profile_language}"
)
compiled_profiles = compile_quality_profile(
profile_data=profile_data,
target_app=target_app,
base_url=base_url,
api_key=api_key,
format_importer=import_formats_to_arr,
import_as_unique=import_as_unique)
if not compiled_profiles:
raise ValueError("Profile compilation returned no data")
profile_data = compiled_profiles[0]
logger.info(
"Looking for existing custom formats to sync format IDs..."
)
existing_formats = get_existing_formats(base_url, api_key)
if existing_formats is None:
raise ValueError("Failed to get updated format list")
format_id_map = {
fmt['name']: fmt['id']
for fmt in existing_formats
}
logger.debug(
f"Found {len(format_id_map)} existing custom formats")
profile_data = sync_format_ids(profile_data, format_id_map)
logger.debug("Format items after sync:")
for item in profile_data.get('formatItems', []):
logger.debug(
f" {item['name']} => Score: {item.get('score', 0)}, "
f"Format ID: {item.get('format', 'missing')}")
# Profile compiled successfully
result = process_profile(profile_data=profile_data,
existing_names=existing_profile_map,
base_url=base_url,
api_key=api_key)
results[result['action']] += 1
results['details'].append(result['detail'])
if not result['success']:
results['success'] = False
except Exception as e:
logger.error(
f"Error processing profile {profile_name}: {str(e)}, type: {type(e).__name__}"
)
logger.exception("Full traceback:")
results['failed'] += 1
results['details'].append({
'name': profile_name,
'action': 'failed',
'success': False,
'error': str(e)
})
results['success'] = False
logger.info(
f"Importing {len(profile_names)} profiles complete. "
f"Added: {results['added']}, Updated: {results['updated']}, "
f"Failed: {results['failed']}")
return results
except Exception as e:
logger.error(f"Error in import_profiles_to_arr: {str(e)}")
return {'success': False, 'error': str(e)}
async def async_import_profiles_to_arr(profile_names: List[str],
original_names: List[str],
base_url: str, api_key: str,
arr_type: str, arr_id: str,
import_as_unique: bool) -> Dict:
"""
Asynchronous version of import_profiles_to_arr that processes profiles concurrently.
This significantly improves performance for larger batches of profile imports.
"""
logger.info(
f"Received {len(profile_names)} profiles to import (async) for {arr_type}"
)
results = {
'success': True,
'added': 0,
'updated': 0,
'failed': 0,
'details': []
}
try:
arr_config_response = get_arr_config(arr_id)
if not arr_config_response['success']:
return {
'success': False,
'error': 'Failed to get arr configuration'
}
arr_config = arr_config_response['data']
logger.info("Looking for existing profiles (async)...")
existing_profiles = await async_get_existing_profiles(
base_url, api_key)
if existing_profiles is None:
return {
'success': False,
'error': 'Failed to get existing profiles'
}
# Create mapping for existing profiles
existing_profile_map = {}
for profile in existing_profiles:
existing_profile_map[profile['name']] = profile['id']
target_app = TargetApp.RADARR if arr_type.lower(
) == 'radarr' else TargetApp.SONARR
# Fetch all existing formats once upfront
logger.info("Pre-fetching existing custom formats for all profiles...")
existing_formats = await async_get_existing_formats(base_url, api_key)
if existing_formats is None:
return {
'success': False,
'error': 'Failed to get existing custom formats'
}
format_id_map = {fmt['name']: fmt['id'] for fmt in existing_formats}
logger.info(f"Successfully pre-fetched {len(format_id_map)} existing custom formats")
# Pre-scan all profiles to identify and cache language formats
needed_language_formats = set()
initial_profiles_data = []
# First, load and analyze all profile files
for i, profile_name in enumerate(profile_names):
try:
# Use original name for file lookup
original_name = original_names[i]
profile_file = f"{get_category_directory('profile')}/{original_name}.yml"
profile_data = load_yaml_file(profile_file)
# Store original profile data for later processing
initial_profiles_data.append((i, profile_name, original_name, profile_data))
# Extract language from profile data
profile_language = profile_data.get('language', 'any')
if profile_language != 'any' and '_' in profile_language:
# This is an advanced mode language that needs special format handling
needed_language_formats.add(profile_language)
# Language format identified
except Exception as e:
logger.error(f"Error pre-scanning profile {profile_name}: {str(e)}")
results['failed'] += 1
results['details'].append({
'name': profile_name,
'action': 'failed',
'success': False,
'error': f"Error pre-scanning profile: {str(e)}"
})
results['success'] = False
# Pre-load all language formats if any exist
language_format_cache = {}
if needed_language_formats:
logger.info(f"Pre-importing {len(needed_language_formats)} unique language formats for {len(profile_names)} profiles")
language_format_cache = await preload_language_formats(
language_formats=list(needed_language_formats),
target_app=target_app,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
import_as_unique=import_as_unique
)
logger.info(f"Successfully pre-loaded language formats for {len(language_format_cache)} languages")
# Process each profile with the cached language formats
profile_tasks = []
for i, profile_name, original_name, profile_data in initial_profiles_data:
try:
# Set the potentially modified profile name
profile_data['name'] = profile_name
# Modify custom format names if import_as_unique is true
if import_as_unique and 'custom_formats' in profile_data:
for cf in profile_data['custom_formats']:
cf['name'] = f"{cf['name']} [Dictionarry]"
# Profile loaded
profile_language = profile_data.get('language', 'any')
if profile_language != 'any':
# Detect if we're using simple or advanced mode
is_simple_mode = '_' not in profile_language
# Language mode detected
# Setup the profile compilation with the cached language formats
# By default, use normal import
format_importer = import_formats_to_arr
# For profiles with language formats, attach the cached formats
if language_format_cache and profile_language != 'any' and '_' in profile_language:
language_format_configs = language_format_cache.get(profile_language, [])
if language_format_configs:
# Using cached language formats
# Define a special function that will be detected by the profile compiler
# The function name is checked in _process_language_formats
def cached_format_importer(*args, **kwargs):
# Using cached formats from importer
return {
'success': True,
'added': 0,
'updated': len(language_format_configs),
'failed': 0,
'details': []
}
# Add the cached formats to the function so they can be accessed by the compiler
cached_format_importer.cached_formats = language_format_configs
format_importer = cached_format_importer
else:
logger.warning(f"No cached formats found for language {profile_language}")
# Add language formats from cache directly to the profile for the compiler
# This way we don't need to modify the compiler code at all
if profile_language != 'any' and '_' in profile_language and profile_language in language_format_cache:
# Add the cached language formats directly to the profile
if 'custom_formats' not in profile_data:
profile_data['custom_formats'] = []
# Add the cached formats - these are already imported, we just need to reference them
profile_data['custom_formats'].extend(language_format_cache[profile_language])
compiled_profiles = compile_quality_profile(
profile_data=profile_data,
target_app=target_app,
base_url=base_url,
api_key=api_key,
format_importer=format_importer,
import_as_unique=import_as_unique
)
if not compiled_profiles:
raise ValueError("Profile compilation returned no data")
compiled_profile = compiled_profiles[0]
# Sync format IDs upfront using the cached format_id_map
synced_profile = sync_format_ids(compiled_profile, format_id_map)
# Create a task for processing this profile (without fetching formats again)
task = asyncio.create_task(
async_process_profile(
profile_data=synced_profile,
existing_names=existing_profile_map,
base_url=base_url,
api_key=api_key
)
)
profile_tasks.append((profile_name, task))
except Exception as e:
logger.error(
f"Error processing profile {profile_name}: {str(e)}, type: {type(e).__name__} (async)"
)
logger.exception("Full traceback:")
results['failed'] += 1
results['details'].append({
'name': profile_name,
'action': 'failed',
'success': False,
'error': str(e)
})
results['success'] = False
# Process all profile upload results
for profile_name, task in profile_tasks:
try:
result = await task
if result['success']:
results[result['action']] += 1
else:
results['failed'] += 1
results['success'] = False
results['details'].append(result['detail'])
except Exception as e:
logger.error(
f"Error waiting for profile task {profile_name}: {str(e)} (async)"
)
results['failed'] += 1
results['details'].append({
'name': profile_name,
'action': 'failed',
'success': False,
'error': str(e)
})
results['success'] = False
logger.info(
f"Async importing {len(profile_names)} profiles complete. "
f"Added: {results['added']}, Updated: {results['updated']}, "
f"Failed: {results['failed']}")
return results
except Exception as e:
logger.error(f"Error in async_import_profiles_to_arr: {str(e)}")
return {'success': False, 'error': str(e)}
def get_existing_profiles(base_url: str, api_key: str) -> Optional[List[Dict]]:
try:
response = requests.get(
f"{base_url.rstrip('/')}/api/v3/qualityprofile",
headers={'X-Api-Key': api_key})
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Error getting existing profiles: {str(e)}")
return None
async def async_get_existing_profiles(base_url: str,
api_key: str) -> Optional[List[Dict]]:
"""Async version of get_existing_profiles"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url.rstrip('/')}/api/v3/qualityprofile",
headers={'X-Api-Key': api_key}) as response:
if response.status == 200:
return await response.json()
return None
except Exception as e:
logger.error(f"Error getting existing profiles (async): {str(e)}")
return None
def get_existing_formats(base_url: str, api_key: str) -> Optional[List[Dict]]:
try:
response = requests.get(f"{base_url.rstrip('/')}/api/v3/customformat",
headers={'X-Api-Key': api_key})
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Error getting existing formats: {str(e)}")
return None
async def async_get_existing_formats(base_url: str,
api_key: str) -> Optional[List[Dict]]:
"""Async version of get_existing_formats"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{base_url.rstrip('/')}/api/v3/customformat",
headers={'X-Api-Key': api_key}) as response:
if response.status == 200:
return await response.json()
return None
except Exception as e:
logger.error(f"Error getting existing formats (async): {str(e)}")
return None
async def preload_language_formats(language_formats: List[str],
target_app: TargetApp,
base_url: str,
api_key: str,
arr_type: str,
import_as_unique: bool) -> Dict[str, List[Dict]]:
"""
Pre-load all language formats for the specified languages to avoid
duplicate imports when multiple profiles use the same language settings.
Args:
language_formats: List of language identifiers (e.g. ["must_english", "prefer_french"])
target_app: TargetApp enum value (RADARR or SONARR)
base_url: API base URL
api_key: API key for the arr instance
arr_type: Type of arr (radarr or sonarr)
import_as_unique: Whether to append [Dictionarry] to format names
Returns:
Dictionary mapping language IDs to their imported format configs
"""
from ..compile.profile_compiler import ProfileConverter
language_format_cache = {}
# Create a single ProfileConverter instance for all languages
converter = ProfileConverter(
target_app=target_app,
base_url=base_url,
api_key=api_key,
format_importer=None, # We'll handle importing manually
import_as_unique=import_as_unique
)
# For each unique language, process and cache its formats
for language_id in language_formats:
try:
# Skip if we've already processed this language
if language_id in language_format_cache:
continue
# Parse the language behavior and code
if '_' in language_id:
behavior, language_code = language_id.split('_', 1)
else:
# Skip simple language modes - they don't need special format imports
continue
logger.info(f"Pre-importing language formats for {language_id} (async batch)")
# First generate format data for this language
formats_data = converter._generate_language_formats(behavior, language_code)
# Import these language formats just once
format_results = await import_language_formats_once(
formats_data=formats_data,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
import_as_unique=import_as_unique
)
# Store the format configs for this language
language_format_cache[language_id] = format_results
logger.info(f"Successfully cached {len(format_results)} formats for language {language_id}")
except Exception as e:
logger.error(f"Error pre-loading language formats for {language_id}: {str(e)}")
language_format_cache[language_id] = [] # Empty list to indicate failure
return language_format_cache
async def import_language_formats_once(formats_data: List[Dict],
base_url: str,
api_key: str,
arr_type: str,
import_as_unique: bool) -> List[Dict]:
"""
Helper function to import language formats once and return the results.
Args:
formats_data: List of format data dictionaries to import
base_url: API base URL
api_key: API key for arr instance
arr_type: Type of arr (radarr or sonarr)
import_as_unique: Whether to append [Dictionarry] to format names
Returns:
List of format configs ready to be added to profiles
"""
# Create tasks for concurrent format imports
format_configs = []
import_tasks = []
for format_data in formats_data:
# Setup task for importing this format
task = asyncio.create_task(
async_import_format_from_memory(
format_data=format_data,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
import_as_unique=import_as_unique
)
)
import_tasks.append((format_data['name'], task))
# Process all format imports
for format_name, task in import_tasks:
try:
result = await task
if not result.get('success', False):
logger.error(f"Format import failed for cached language format: {format_name}")
continue
# Determine final format name (after any [Dictionarry] suffix)
display_name = format_name
if import_as_unique:
display_name = f"{format_name} [Dictionarry]"
# Create format config exactly as needed by profile compiler
format_configs.append({
'name': display_name,
'score': -9999
})
except Exception as e:
logger.error(f"Error importing cached language format {format_name}: {str(e)}")
return format_configs
def use_cached_language_formats(language_cache: Dict[str, List[Dict]],
format_names: List[str],
base_url: str,
api_key: str,
arr_type: str,
original_names: List[str]) -> Dict:
"""
Custom format importer that returns cached language formats instead
of re-importing them. This is used by the profile compiler when we've
already pre-loaded the language formats.
This is a replacement for the regular import_formats_to_arr function.
"""
# Extract the language ID from the original profile data
# This is passed from the profile compiler's context when calling this function
language_id = getattr(use_cached_language_formats, 'current_language_id', None)
if language_id and language_id in language_cache:
logger.info(f"Using cached language formats for {language_id}")
return {
'success': True,
'added': 0,
'updated': len(language_cache[language_id]),
'failed': 0,
'details': [
{'name': fmt['name'], 'action': 'updated', 'success': True}
for fmt in language_cache[language_id]
]
}
else:
# Fall back to normal import if no cache entry exists
# or if this isn't a language format import
logger.info(f"No cached formats for language ID {language_id}, using normal import")
return import_formats_to_arr(
format_names=format_names,
base_url=base_url,
api_key=api_key,
arr_type=arr_type,
original_names=original_names
)
def sync_format_ids(profile_data: Dict, format_id_map: Dict[str, int]) -> Dict:
if 'formatItems' not in profile_data:
profile_data['formatItems'] = []
# Create a set to track format names we've already processed
processed_formats = set()
synced_items = []
# First process existing items
for item in profile_data.get('formatItems', []):
if item['name'] not in processed_formats:
if item['name'] in format_id_map:
synced_items.append({
'format': format_id_map[item['name']],
'name': item['name'],
'score': item['score']
})
processed_formats.add(item['name'])
else:
logger.warning(
f"Custom format not found in arr: {item['name']}")
# Only add formats that haven't been processed yet
for format_name, format_id in format_id_map.items():
if format_name not in processed_formats:
synced_items.append({
'format': format_id,
'name': format_name,
'score': 0 # Default score for new formats
})
processed_formats.add(format_name)
profile_data['formatItems'] = synced_items
return profile_data
# This function is now deprecated and replaced by direct use of sync_format_ids and async_process_profile
# We're keeping the signature for backward compatibility but not using it in the optimized code path
async def async_process_profile_with_formats(profile_name: str,
profile_data: Dict,
existing_profile_map: Dict[str,
int],
base_url: str,
api_key: str) -> Dict:
"""
Asynchronous function that handles getting formats and processing a profile in one go.
This allows for concurrent profile processing.
Note: This function is deprecated and should not be used in new code.
It's better to fetch formats once upfront for all profiles.
"""
try:
# Get formats for profile synchronization
logger.info(
f"Looking for existing custom formats to sync format IDs (async)..."
)
existing_formats = await async_get_existing_formats(base_url, api_key)
if existing_formats is None:
raise ValueError("Failed to get updated format list")
format_id_map = {fmt['name']: fmt['id'] for fmt in existing_formats}
logger.debug(
f"Found {len(format_id_map)} existing custom formats (async)")
# Sync format IDs in the profile
synced_profile = sync_format_ids(profile_data, format_id_map)
# Process the profile (add or update)
return await async_process_profile(profile_data=synced_profile,
existing_names=existing_profile_map,
base_url=base_url,
api_key=api_key)
except Exception as e:
logger.error(
f"Error in async_process_profile_with_formats for {profile_name}: {str(e)}"
)
return {
'success': False,
'action': 'failed',
'detail': {
'name': profile_name,
'action': 'failed',
'success': False,
'error': str(e)
}
}
def process_profile(profile_data: Dict, existing_names: Dict[str, int],
base_url: str, api_key: str) -> Dict:
profile_name = profile_data['name']
if profile_name in existing_names:
profile_data['id'] = existing_names[profile_name]
success = update_profile(base_url, api_key, profile_data)
return {
'success': success,
'action': 'updated' if success else 'failed',
'detail': {
'name': profile_name,
'action': 'updated',
'success': success
}
}
else:
success = add_profile(base_url, api_key, profile_data)
return {
'success': success,
'action': 'added' if success else 'failed',
'detail': {
'name': profile_name,
'action': 'added',
'success': success
}
}
async def async_process_profile(profile_data: Dict, existing_names: Dict[str,
int],
base_url: str, api_key: str) -> Dict:
"""Async version of process_profile"""
profile_name = profile_data['name']
if profile_name in existing_names:
profile_data['id'] = existing_names[profile_name]
success = await async_update_profile(base_url, api_key, profile_data)
return {
'success': success,
'action': 'updated' if success else 'failed',
'detail': {
'name': profile_name,
'action': 'updated',
'success': success
}
}
else:
success = await async_add_profile(base_url, api_key, profile_data)
return {
'success': success,
'action': 'added' if success else 'failed',
'detail': {
'name': profile_name,
'action': 'added',
'success': success
}
}
def update_profile(base_url: str, api_key: str, profile_data: Dict) -> bool:
try:
url = f"{base_url.rstrip('/')}/api/v3/qualityprofile/{profile_data['id']}"
response = requests.put(url,
headers={'X-Api-Key': api_key},
json=profile_data)
logger.info(f"Update profile '{profile_data['name']}' response: {response.status_code}")
return response.status_code in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error updating profile: {str(e)}")
return False
async def async_update_profile(base_url: str, api_key: str,
profile_data: Dict) -> bool:
"""Async version of update_profile"""
try:
url = f"{base_url.rstrip('/')}/api/v3/qualityprofile/{profile_data['id']}"
async with aiohttp.ClientSession() as session:
async with session.put(url,
headers={'X-Api-Key': api_key},
json=profile_data) as response:
logger.info(f"Update profile '{profile_data['name']}' response: {response.status} (async)")
return response.status in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error updating profile (async): {str(e)}")
return False
def add_profile(base_url: str, api_key: str, profile_data: Dict) -> bool:
try:
url = f"{base_url.rstrip('/')}/api/v3/qualityprofile"
response = requests.post(url,
headers={'X-Api-Key': api_key},
json=profile_data)
logger.info(f"Add profile '{profile_data['name']}' response: {response.status_code}")
return response.status_code in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error adding profile: {str(e)}")
return False
async def async_add_profile(base_url: str, api_key: str,
profile_data: Dict) -> bool:
"""Async version of add_profile"""
try:
url = f"{base_url.rstrip('/')}/api/v3/qualityprofile"
async with aiohttp.ClientSession() as session:
async with session.post(url,
headers={'X-Api-Key': api_key},
json=profile_data) as response:
logger.info(f"Add profile '{profile_data['name']}' response: {response.status} (async)")
return response.status in [200, 201, 202, 204]
except Exception as e:
logger.error(f"Error adding profile (async): {str(e)}")
return False