feat(sync): implement sync functionality for delay profiles

- Added syncArrJob to handle syncing of PCD profiles and settings to arr instances.
- Created syncArr logic to process pending syncs and log results.
- Introduced BaseSyncer class for common sync operations and specific syncers for delay profiles
- Implemented fetch, transform, and push methods for delay profiles
- Added manual sync actions in the UI for delay profiles
- Enhanced logging for sync operations and error handling.
This commit is contained in:
Sam Chau
2025-12-29 05:37:55 +10:30
parent ea5c543647
commit 1e8fc7a42d
20 changed files with 1305 additions and 10 deletions

View File

@@ -0,0 +1,94 @@
/**
* Base syncer class
* Provides common structure for syncing PCD data to arr instances
*/
import type { BaseArrClient } from '$arr/base.ts';
import { logger } from '$logger/logger.ts';
export interface SyncResult {
success: boolean;
itemsSynced: number;
error?: string;
}
/**
* Abstract base class for syncers
* Each syncer type (quality profiles, delay profiles, media management) extends this
*/
export abstract class BaseSyncer {
protected client: BaseArrClient;
protected instanceId: number;
protected instanceName: string;
constructor(client: BaseArrClient, instanceId: number, instanceName: string) {
this.client = client;
this.instanceId = instanceId;
this.instanceName = instanceName;
}
/**
* Get the sync type name for logging
*/
protected abstract get syncType(): string;
/**
* Fetch data from PCD based on sync config
*/
protected abstract fetchFromPcd(): Promise<unknown[]>;
/**
* Transform PCD data to arr API format
*/
protected abstract transformToArr(pcdData: unknown[]): unknown[];
/**
* Push transformed data to arr instance
*/
protected abstract pushToArr(arrData: unknown[]): Promise<void>;
/**
* Main sync method - orchestrates fetch, transform, push
*/
async sync(): Promise<SyncResult> {
try {
await logger.info(`Starting ${this.syncType} sync for "${this.instanceName}"`, {
source: 'Syncer',
meta: { instanceId: this.instanceId, syncType: this.syncType }
});
// Fetch from PCD
const pcdData = await this.fetchFromPcd();
if (pcdData.length === 0) {
await logger.debug(`No ${this.syncType} to sync for "${this.instanceName}"`, {
source: 'Syncer',
meta: { instanceId: this.instanceId }
});
return { success: true, itemsSynced: 0 };
}
// Transform to arr format
const arrData = this.transformToArr(pcdData);
// Push to arr
await this.pushToArr(arrData);
await logger.info(`Completed ${this.syncType} sync for "${this.instanceName}"`, {
source: 'Syncer',
meta: { instanceId: this.instanceId, itemsSynced: arrData.length }
});
return { success: true, itemsSynced: arrData.length };
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
await logger.error(`Failed ${this.syncType} sync for "${this.instanceName}"`, {
source: 'Syncer',
meta: { instanceId: this.instanceId, error: errorMsg }
});
return { success: false, itemsSynced: 0, error: errorMsg };
}
}
}

View File

@@ -0,0 +1,267 @@
/**
* Delay profile syncer
* Syncs delay profiles from PCD to arr instances
*
* TODO: Handle ordering for multiple profiles
* - Delay profiles have an `order` field (lower = higher priority)
* - Currently we just create in selection order
* - May need to use PUT /api/v3/delayprofile/reorder/{id} endpoint
* - Consider: should PCD store order, or derive from selection order?
*/
import { BaseSyncer } from './base.ts';
import { arrSyncQueries } from '$db/queries/arrSync.ts';
import { getCache } from '$lib/server/pcd/cache.ts';
import { get as getDelayProfile } from '$lib/server/pcd/queries/delayProfiles/get.ts';
import type { DelayProfileTableRow } from '$lib/server/pcd/queries/delayProfiles/types.ts';
import { logger } from '$logger/logger.ts';
/**
* Intermediate type for transformed profiles (before tag ID resolution)
*/
interface TransformedDelayProfile {
name: string;
enableUsenet: boolean;
enableTorrent: boolean;
preferredProtocol: string;
usenetDelay: number;
torrentDelay: number;
bypassIfHighestQuality: boolean;
bypassIfAboveCustomFormatScore: boolean;
minimumCustomFormatScore: number;
tagNames: string[];
}
export class DelayProfileSyncer extends BaseSyncer {
protected get syncType(): string {
return 'delay profiles';
}
protected async fetchFromPcd(): Promise<DelayProfileTableRow[]> {
const syncConfig = arrSyncQueries.getDelayProfilesSync(this.instanceId);
await logger.debug(`Starting fetch - ${syncConfig.selections.length} selections configured`, {
source: 'Sync:DelayProfiles:fetch',
meta: {
instanceId: this.instanceId,
selections: syncConfig.selections
}
});
if (syncConfig.selections.length === 0) {
return [];
}
const profiles: DelayProfileTableRow[] = [];
for (const selection of syncConfig.selections) {
await logger.debug(`Fetching profile ${selection.profileId} from database ${selection.databaseId}`, {
source: 'Sync:DelayProfiles:fetch',
meta: { instanceId: this.instanceId, ...selection }
});
const cache = getCache(selection.databaseId);
if (!cache) {
await logger.warn(`PCD cache not found for database ${selection.databaseId}`, {
source: 'Sync:DelayProfiles:fetch',
meta: { instanceId: this.instanceId, databaseId: selection.databaseId }
});
continue;
}
const profile = await getDelayProfile(cache, selection.profileId);
if (!profile) {
await logger.warn(`Profile ${selection.profileId} not found`, {
source: 'Sync:DelayProfiles:fetch',
meta: { instanceId: this.instanceId, ...selection }
});
continue;
}
await logger.debug(`Found profile: "${profile.name}" (${profile.preferred_protocol})`, {
source: 'Sync:DelayProfiles:fetch',
meta: {
instanceId: this.instanceId,
profileId: profile.id,
name: profile.name,
protocol: profile.preferred_protocol,
usenetDelay: profile.usenet_delay,
torrentDelay: profile.torrent_delay,
tags: profile.tags.map(t => t.name)
}
});
profiles.push(profile);
}
await logger.debug(`Fetch complete - ${profiles.length} profiles retrieved`, {
source: 'Sync:DelayProfiles:fetch',
meta: {
instanceId: this.instanceId,
profiles: profiles.map(p => p.name)
}
});
return profiles;
}
protected transformToArr(pcdData: DelayProfileTableRow[]): TransformedDelayProfile[] {
// Note: transformToArr is sync but logger is async - we'll log in pushToArr instead
return pcdData.map((profile) => {
let enableUsenet = true;
let enableTorrent = true;
let preferredProtocol = 'usenet';
switch (profile.preferred_protocol) {
case 'prefer_usenet':
enableUsenet = true;
enableTorrent = true;
preferredProtocol = 'usenet';
break;
case 'prefer_torrent':
enableUsenet = true;
enableTorrent = true;
preferredProtocol = 'torrent';
break;
case 'only_usenet':
enableUsenet = true;
enableTorrent = false;
preferredProtocol = 'usenet';
break;
case 'only_torrent':
enableUsenet = false;
enableTorrent = true;
preferredProtocol = 'torrent';
break;
}
return {
name: profile.name,
enableUsenet,
enableTorrent,
preferredProtocol,
usenetDelay: profile.usenet_delay ?? 0,
torrentDelay: profile.torrent_delay ?? 0,
bypassIfHighestQuality: profile.bypass_if_highest_quality,
bypassIfAboveCustomFormatScore: profile.bypass_if_above_custom_format_score,
minimumCustomFormatScore: profile.minimum_custom_format_score ?? 0,
tagNames: profile.tags.map((t) => t.name)
};
});
}
protected async pushToArr(arrData: TransformedDelayProfile[]): Promise<void> {
await logger.debug(`Starting push - ${arrData.length} profiles to sync`, {
source: 'Sync:DelayProfiles:push',
meta: {
instanceId: this.instanceId,
profiles: arrData.map(p => ({
name: p.name,
enableUsenet: p.enableUsenet,
enableTorrent: p.enableTorrent,
preferredProtocol: p.preferredProtocol,
usenetDelay: p.usenetDelay,
torrentDelay: p.torrentDelay,
tags: p.tagNames
}))
}
});
// Get existing delay profiles and tags from arr
const existingProfiles = await this.client.getDelayProfiles();
const existingTags = await this.client.getTags();
await logger.debug(`Found ${existingProfiles.length} existing profiles, ${existingTags.length} tags in arr`, {
source: 'Sync:DelayProfiles:push',
meta: {
instanceId: this.instanceId,
existingProfiles: existingProfiles.map(p => ({ id: p.id, tags: p.tags })),
existingTags: existingTags.map(t => ({ id: t.id, label: t.label }))
}
});
// Delete all non-default delay profiles (id !== 1)
const profilesToDelete = existingProfiles.filter(p => p.id !== 1);
if (profilesToDelete.length > 0) {
await logger.debug(`Deleting ${profilesToDelete.length} existing profiles`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId, deletingIds: profilesToDelete.map(p => p.id) }
});
for (const profile of profilesToDelete) {
await this.client.deleteDelayProfile(profile.id);
}
await logger.debug(`Deleted ${profilesToDelete.length} profiles`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId }
});
}
// Build tag name -> ID map
const tagMap = new Map<string, number>();
for (const tag of existingTags) {
tagMap.set(tag.label.toLowerCase(), tag.id);
}
// Create new profiles
for (const profile of arrData) {
// Resolve tag names to IDs, creating missing tags
const tagIds: number[] = [];
for (const tagName of profile.tagNames) {
let tagId = tagMap.get(tagName.toLowerCase());
if (tagId === undefined) {
await logger.debug(`Creating missing tag "${tagName}"`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId, tagName }
});
const newTag = await this.client.createTag(tagName);
tagId = newTag.id;
tagMap.set(tagName.toLowerCase(), tagId);
await logger.debug(`Created tag "${tagName}" with id=${tagId}`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId, tagName, tagId }
});
}
tagIds.push(tagId);
}
const profileData = {
enableUsenet: profile.enableUsenet,
enableTorrent: profile.enableTorrent,
preferredProtocol: profile.preferredProtocol,
usenetDelay: profile.usenetDelay,
torrentDelay: profile.torrentDelay,
bypassIfHighestQuality: profile.bypassIfHighestQuality,
bypassIfAboveCustomFormatScore: profile.bypassIfAboveCustomFormatScore,
minimumCustomFormatScore: profile.minimumCustomFormatScore,
tags: tagIds
};
await logger.debug(`Creating profile "${profile.name}"`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId, profileData }
});
const created = await this.client.createDelayProfile(profileData);
await logger.debug(`Created profile "${profile.name}" with id=${created.id}`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId, createdId: created.id }
});
}
await logger.debug(`Push complete - ${arrData.length} profiles created`, {
source: 'Sync:DelayProfiles:push',
meta: { instanceId: this.instanceId }
});
}
}

View File

@@ -0,0 +1,18 @@
/**
* Sync module - handles syncing PCD profiles to arr instances
*
* Used by:
* - Sync job (automatic, triggered by should_sync flag)
* - Manual sync (Sync Now button)
*/
// Base class
export { BaseSyncer, type SyncResult } from './base.ts';
// Syncer implementations
export { QualityProfileSyncer } from './qualityProfiles.ts';
export { DelayProfileSyncer } from './delayProfiles.ts';
export { MediaManagementSyncer } from './mediaManagement.ts';
// Processor functions
export { processPendingSyncs, syncInstance, type ProcessSyncsResult } from './processor.ts';

View File

@@ -0,0 +1,54 @@
/**
* Media management syncer
* Syncs media management settings from PCD to arr instances
*/
import { BaseSyncer } from './base.ts';
import { arrSyncQueries } from '$db/queries/arrSync.ts';
export class MediaManagementSyncer extends BaseSyncer {
protected get syncType(): string {
return 'media management';
}
protected async fetchFromPcd(): Promise<unknown[]> {
const syncConfig = arrSyncQueries.getMediaManagementSync(this.instanceId);
// Check if any settings are configured
if (
!syncConfig.namingDatabaseId &&
!syncConfig.qualityDefinitionsDatabaseId &&
!syncConfig.mediaSettingsDatabaseId
) {
return [];
}
// TODO: Implement
// Fetch each configured setting from PCD:
// 1. Naming settings (if namingDatabaseId is set)
// 2. Quality definitions (if qualityDefinitionsDatabaseId is set)
// 3. Media settings (if mediaSettingsDatabaseId is set)
throw new Error('Not implemented: fetchFromPcd');
}
protected transformToArr(pcdData: unknown[]): unknown[] {
// TODO: Implement
// Transform PCD settings to arr API format
// Each setting type has different structure:
// - Naming: renaming rules, folder format, etc.
// - Quality definitions: min/max sizes per quality
// - Media settings: file management options
throw new Error('Not implemented: transformToArr');
}
protected async pushToArr(arrData: unknown[]): Promise<void> {
// TODO: Implement
// Push settings to arr instance
// These are typically PUT operations to update existing config
// rather than creating new items
throw new Error('Not implemented: pushToArr');
}
}

View File

@@ -0,0 +1,186 @@
/**
* Sync processor
* Processes pending syncs by creating syncer instances and running them
*
* TODO: Trigger markForSync() from events:
* - on_pull: Call arrSyncQueries.markForSync('on_pull') after database git pull completes
* - on_change: Call arrSyncQueries.markForSync('on_change') after PCD files change
* - schedule: Evaluate cron expressions and set should_sync when schedule matches
*/
import { arrSyncQueries } from '$db/queries/arrSync.ts';
import { arrInstancesQueries } from '$db/queries/arrInstances.ts';
import { createArrClient } from '$arr/factory.ts';
import type { ArrType } from '$arr/types.ts';
import { logger } from '$logger/logger.ts';
import { QualityProfileSyncer } from './qualityProfiles.ts';
import { DelayProfileSyncer } from './delayProfiles.ts';
import { MediaManagementSyncer } from './mediaManagement.ts';
import type { SyncResult } from './base.ts';
export interface ProcessSyncsResult {
totalSynced: number;
results: {
instanceId: number;
instanceName: string;
qualityProfiles?: SyncResult;
delayProfiles?: SyncResult;
mediaManagement?: SyncResult;
}[];
}
/**
* Process all pending syncs
* Called by the sync job and can be called manually
*/
export async function processPendingSyncs(): Promise<ProcessSyncsResult> {
const pending = arrSyncQueries.getPendingSyncs();
const results: ProcessSyncsResult['results'] = [];
// Collect all unique instance IDs
const instanceIds = new Set([
...pending.qualityProfiles,
...pending.delayProfiles,
...pending.mediaManagement
]);
if (instanceIds.size === 0) {
await logger.debug('No pending syncs', { source: 'SyncProcessor' });
return { totalSynced: 0, results: [] };
}
await logger.info(`Processing syncs for ${instanceIds.size} instance(s)`, {
source: 'SyncProcessor',
meta: {
qualityProfiles: pending.qualityProfiles.length,
delayProfiles: pending.delayProfiles.length,
mediaManagement: pending.mediaManagement.length
}
});
let totalSynced = 0;
for (const instanceId of instanceIds) {
const instance = arrInstancesQueries.getById(instanceId);
if (!instance) {
await logger.warn(`Instance ${instanceId} not found, skipping sync`, {
source: 'SyncProcessor'
});
continue;
}
if (!instance.enabled) {
await logger.debug(`Instance "${instance.name}" is disabled, skipping sync`, {
source: 'SyncProcessor'
});
continue;
}
const instanceResult: ProcessSyncsResult['results'][0] = {
instanceId,
instanceName: instance.name
};
try {
// Create arr client for this instance
const client = createArrClient(instance.type as ArrType, instance.url, instance.api_key);
// Process quality profiles if pending
if (pending.qualityProfiles.includes(instanceId)) {
const syncer = new QualityProfileSyncer(client, instanceId, instance.name);
instanceResult.qualityProfiles = await syncer.sync();
totalSynced += instanceResult.qualityProfiles.itemsSynced;
// Clear the should_sync flag
arrSyncQueries.setQualityProfilesShouldSync(instanceId, false);
}
// Process delay profiles if pending
if (pending.delayProfiles.includes(instanceId)) {
const syncer = new DelayProfileSyncer(client, instanceId, instance.name);
instanceResult.delayProfiles = await syncer.sync();
totalSynced += instanceResult.delayProfiles.itemsSynced;
// Clear the should_sync flag
arrSyncQueries.setDelayProfilesShouldSync(instanceId, false);
}
// Process media management if pending
if (pending.mediaManagement.includes(instanceId)) {
const syncer = new MediaManagementSyncer(client, instanceId, instance.name);
instanceResult.mediaManagement = await syncer.sync();
totalSynced += instanceResult.mediaManagement.itemsSynced;
// Clear the should_sync flag
arrSyncQueries.setMediaManagementShouldSync(instanceId, false);
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
await logger.error(`Failed to sync instance "${instance.name}"`, {
source: 'SyncProcessor',
meta: { instanceId, error: errorMsg }
});
}
results.push(instanceResult);
}
await logger.info(`Sync processing complete`, {
source: 'SyncProcessor',
meta: { totalSynced, instanceCount: results.length }
});
return { totalSynced, results };
}
/**
* Sync a specific instance manually
* Syncs all configured sections regardless of should_sync flag
*/
export async function syncInstance(instanceId: number): Promise<ProcessSyncsResult['results'][0]> {
const instance = arrInstancesQueries.getById(instanceId);
if (!instance) {
throw new Error(`Instance ${instanceId} not found`);
}
await logger.info(`Manual sync triggered for "${instance.name}"`, {
source: 'SyncProcessor',
meta: { instanceId }
});
const client = createArrClient(instance.type as ArrType, instance.url, instance.api_key);
const result: ProcessSyncsResult['results'][0] = {
instanceId,
instanceName: instance.name
};
// Get sync configs to check what's enabled
const qpConfig = arrSyncQueries.getQualityProfilesSync(instanceId);
const dpConfig = arrSyncQueries.getDelayProfilesSync(instanceId);
const mmConfig = arrSyncQueries.getMediaManagementSync(instanceId);
// Sync quality profiles if configured
if (qpConfig.config.trigger !== 'none' && qpConfig.selections.length > 0) {
const syncer = new QualityProfileSyncer(client, instanceId, instance.name);
result.qualityProfiles = await syncer.sync();
}
// Sync delay profiles if configured
if (dpConfig.config.trigger !== 'none' && dpConfig.selections.length > 0) {
const syncer = new DelayProfileSyncer(client, instanceId, instance.name);
result.delayProfiles = await syncer.sync();
}
// Sync media management if configured
if (
mmConfig.trigger !== 'none' &&
(mmConfig.namingDatabaseId || mmConfig.qualityDefinitionsDatabaseId || mmConfig.mediaSettingsDatabaseId)
) {
const syncer = new MediaManagementSyncer(client, instanceId, instance.name);
result.mediaManagement = await syncer.sync();
}
return result;
}

View File

@@ -0,0 +1,49 @@
/**
* Quality profile syncer
* Syncs quality profiles from PCD to arr instances
*/
import { BaseSyncer } from './base.ts';
import { arrSyncQueries } from '$db/queries/arrSync.ts';
export class QualityProfileSyncer extends BaseSyncer {
protected get syncType(): string {
return 'quality profiles';
}
protected async fetchFromPcd(): Promise<unknown[]> {
const syncConfig = arrSyncQueries.getQualityProfilesSync(this.instanceId);
if (syncConfig.selections.length === 0) {
return [];
}
// TODO: Implement
// For each selection (databaseId, profileId):
// 1. Get the PCD cache for the database
// 2. Fetch the quality profile by ID
// 3. Also fetch dependent custom formats
throw new Error('Not implemented: fetchFromPcd');
}
protected transformToArr(pcdData: unknown[]): unknown[] {
// TODO: Implement
// Transform PCD quality profile format to arr API format
// This includes:
// - Quality profile structure
// - Custom format mappings
// - Quality tier configurations
throw new Error('Not implemented: transformToArr');
}
protected async pushToArr(arrData: unknown[]): Promise<void> {
// TODO: Implement
// 1. First sync custom formats (dependencies)
// 2. Then sync quality profiles
// 3. Handle create vs update (check if profile exists by name)
throw new Error('Not implemented: pushToArr');
}
}