feat(pcd): implement PCD cache management and initialization logic

This commit is contained in:
Sam Chau
2025-11-05 07:30:33 +10:30
parent 4280403cfa
commit 2abc9aa86a
6 changed files with 889 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ import { db } from '$db/db.ts';
import { runMigrations } from '$db/migrations.ts';
import { initializeJobs } from '$jobs/init.ts';
import { jobScheduler } from '$jobs/scheduler.ts';
import { pcdManager } from '$pcd/pcd.ts';
// Initialize configuration on server startup
await config.init();
@@ -21,6 +22,9 @@ await runMigrations();
// Load log settings from database (must be after migrations)
logSettings.load();
// Initialize PCD caches (must be after migrations and log settings)
await pcdManager.initialize();
// Initialize and start job system
await initializeJobs();
await jobScheduler.start();

View File

@@ -207,5 +207,23 @@ export const databaseInstancesQueries = {
name
);
return (result?.count ?? 0) > 0;
},
/**
* Disable a database instance (set enabled = 0)
*/
disable(id: number): boolean {
const affected = db.execute(
'UPDATE database_instances SET enabled = 0, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
id
);
return affected > 0;
}
};
/**
* Helper function to disable a database instance
*/
export function disableDatabaseInstance(id: number): boolean {
return databaseInstancesQueries.disable(id);
}

385
src/lib/server/pcd/cache.ts Normal file
View File

@@ -0,0 +1,385 @@
/**
* PCD Cache - In-memory compiled view of PCD operations
*/
import { Database } from '@jsr/db__sqlite';
import { logger } from '$logger/logger.ts';
import { loadAllOperations, validateOperations } from './ops.ts';
import { disableDatabaseInstance } from '$db/queries/databaseInstances.ts';
/**
* PCDCache - Manages an in-memory compiled database for a single PCD
*/
export class PCDCache {
private db: Database | null = null;
private pcdPath: string;
private databaseInstanceId: number;
private built = false;
constructor(pcdPath: string, databaseInstanceId: number) {
this.pcdPath = pcdPath;
this.databaseInstanceId = databaseInstanceId;
}
/**
* Build the cache by executing all operations in layer order
*/
async build(): Promise<void> {
try {
await logger.info('Building PCD cache', {
source: 'PCDCache',
meta: { path: this.pcdPath, databaseInstanceId: this.databaseInstanceId }
});
// 1. Create in-memory database
this.db = new Database(':memory:');
// Enable foreign keys
this.db.exec('PRAGMA foreign_keys = ON');
// 2. Register helper functions
this.registerHelperFunctions();
// 3. Load all operations
const operations = await loadAllOperations(this.pcdPath);
validateOperations(operations);
await logger.info(`Loaded ${operations.length} operations`, {
source: 'PCDCache',
meta: {
schema: operations.filter((o) => o.layer === 'schema').length,
base: operations.filter((o) => o.layer === 'base').length,
tweaks: operations.filter((o) => o.layer === 'tweaks').length
}
});
// 4. Execute operations in order
for (const operation of operations) {
try {
this.db.exec(operation.sql);
} catch (error) {
throw new Error(
`Failed to execute operation ${operation.filename} in ${operation.layer} layer: ${error}`
);
}
}
this.built = true;
await logger.info('PCD cache built successfully', {
source: 'PCDCache',
meta: { databaseInstanceId: this.databaseInstanceId }
});
} catch (error) {
await logger.error('Failed to build PCD cache', {
source: 'PCDCache',
meta: { error: String(error), databaseInstanceId: this.databaseInstanceId }
});
// Disable the database instance
await disableDatabaseInstance(this.databaseInstanceId);
// Clean up
this.close();
throw error;
}
}
/**
* Register SQL helper functions (qp, cf)
*/
private registerHelperFunctions(): void {
if (!this.db) return;
// qp(name) - Quality profile lookup by name
this.db.function('qp', (name: string) => {
const result = this.db!.prepare('SELECT id FROM quality_profiles WHERE name = ?').get(
name
) as { id: number } | undefined;
if (!result) {
throw new Error(`Quality profile not found: ${name}`);
}
return result.id;
});
// cf(name) - Custom format lookup by name
this.db.function('cf', (name: string) => {
const result = this.db!.prepare('SELECT id FROM custom_formats WHERE name = ?').get(
name
) as { id: number } | undefined;
if (!result) {
throw new Error(`Custom format not found: ${name}`);
}
return result.id;
});
}
/**
* Check if cache is built and ready
*/
isBuilt(): boolean {
return this.built && this.db !== null;
}
/**
* Close the database connection
*/
close(): void {
if (this.db) {
this.db.close();
this.db = null;
}
this.built = false;
}
// ============================================================================
// QUERY API
// ============================================================================
/**
* Execute a raw SQL query and return all rows
* Use this in your query functions in pcd/queries/*.ts
*/
query<T = unknown>(sql: string, ...params: (string | number | null | boolean | Uint8Array)[]): T[] {
if (!this.isBuilt()) {
throw new Error('Cache not built');
}
return this.db!.prepare(sql).all(...params) as T[];
}
/**
* Execute a raw SQL query and return a single row
* Use this in your query functions in pcd/queries/*.ts
*/
queryOne<T = unknown>(sql: string, ...params: (string | number | null | boolean | Uint8Array)[]): T | undefined {
if (!this.isBuilt()) {
throw new Error('Cache not built');
}
return this.db!.prepare(sql).get(...params) as T | undefined;
}
}
// ============================================================================
// MODULE-LEVEL REGISTRY AND FUNCTIONS
// ============================================================================
/**
* Cache registry - maps database instance ID to PCDCache
*/
const caches = new Map<number, PCDCache>();
/**
* File watchers - maps database instance ID to watcher
*/
const watchers = new Map<number, Deno.FsWatcher>();
/**
* Debounce timers - maps database instance ID to timer
*/
const debounceTimers = new Map<number, number>();
/**
* Debounce delay in milliseconds
*/
const DEBOUNCE_DELAY = 500;
/**
* Compile a PCD into an in-memory cache
*/
export async function compile(pcdPath: string, databaseInstanceId: number): Promise<void> {
// Stop any existing watchers
stopWatch(databaseInstanceId);
// Close existing cache if present
const existing = caches.get(databaseInstanceId);
if (existing) {
existing.close();
}
// Create and build new cache
const cache = new PCDCache(pcdPath, databaseInstanceId);
await cache.build();
// Store in registry
caches.set(databaseInstanceId, cache);
}
/**
* Get a compiled cache by database instance ID
*/
export function getCache(databaseInstanceId: number): PCDCache | undefined {
return caches.get(databaseInstanceId);
}
/**
* Invalidate a cache (close and remove from registry)
*/
export async function invalidate(databaseInstanceId: number): Promise<void> {
const cache = caches.get(databaseInstanceId);
if (cache) {
cache.close();
caches.delete(databaseInstanceId);
}
// Stop file watcher and debounce timers
stopWatch(databaseInstanceId);
await logger.info('Cache invalidated', {
source: 'PCDCache',
meta: { databaseInstanceId }
});
}
/**
* Invalidate all caches
*/
export async function invalidateAll(): Promise<void> {
const ids = Array.from(caches.keys());
for (const id of ids) {
await invalidate(id);
}
}
// ============================================================================
// FILE WATCHING
// ============================================================================
/**
* Start watching PCD directories for changes
*/
export async function startWatch(pcdPath: string, databaseInstanceId: number): Promise<void> {
// Stop existing watcher if present
stopWatch(databaseInstanceId);
const pathsToWatch: string[] = [];
// Watch ops directory
const opsPath = `${pcdPath}/ops`;
try {
await Deno.stat(opsPath);
pathsToWatch.push(opsPath);
} catch {
// ops directory doesn't exist, skip
}
// Watch deps/schema/ops directory
const schemaOpsPath = `${pcdPath}/deps/schema/ops`;
try {
await Deno.stat(schemaOpsPath);
pathsToWatch.push(schemaOpsPath);
} catch {
// schema ops directory doesn't exist, skip
}
// Watch tweaks directory (optional)
const tweaksPath = `${pcdPath}/tweaks`;
try {
await Deno.stat(tweaksPath);
pathsToWatch.push(tweaksPath);
} catch {
// tweaks directory doesn't exist, that's ok
}
if (pathsToWatch.length === 0) {
await logger.warn('No directories to watch for PCD', {
source: 'PCDCache',
meta: { pcdPath, databaseInstanceId }
});
return;
}
await logger.info(`Starting file watch on ${pathsToWatch.length} directories`, {
source: 'PCDCache',
meta: { paths: pathsToWatch, databaseInstanceId }
});
const watcher = Deno.watchFs(pathsToWatch);
watchers.set(databaseInstanceId, watcher);
// Process file system events in the background
(async () => {
try {
for await (const event of watcher) {
// Only rebuild on modify, create, or remove events
if (['modify', 'create', 'remove'].includes(event.kind)) {
// Only care about .sql files
const hasSqlFile = event.paths.some((path) => path.endsWith('.sql'));
if (!hasSqlFile) continue;
await logger.debug('File change detected, scheduling rebuild', {
source: 'PCDCache',
meta: { event: event.kind, paths: event.paths, databaseInstanceId }
});
// Debounce the rebuild
scheduleRebuild(pcdPath, databaseInstanceId);
}
}
} catch (error) {
// Watcher was closed or errored
if (error instanceof Deno.errors.BadResource) {
// This is expected when we close the watcher
return;
}
await logger.error('File watcher error', {
source: 'PCDCache',
meta: { error: String(error), databaseInstanceId }
});
}
})();
}
/**
* Stop watching a PCD for changes
*/
function stopWatch(databaseInstanceId: number): void {
const watcher = watchers.get(databaseInstanceId);
if (watcher) {
watcher.close();
watchers.delete(databaseInstanceId);
}
// Clear any pending debounce timer
const timer = debounceTimers.get(databaseInstanceId);
if (timer) {
clearTimeout(timer);
debounceTimers.delete(databaseInstanceId);
}
}
/**
* Schedule a rebuild with debouncing
*/
function scheduleRebuild(pcdPath: string, databaseInstanceId: number): void {
// Clear existing timer
const existingTimer = debounceTimers.get(databaseInstanceId);
if (existingTimer) {
clearTimeout(existingTimer);
}
// Schedule new rebuild
const timer = setTimeout(async () => {
await logger.info('Rebuilding cache due to file changes', {
source: 'PCDCache',
meta: { databaseInstanceId }
});
try {
await compile(pcdPath, databaseInstanceId);
// Restart watch after rebuild (compile clears watchers)
await startWatch(pcdPath, databaseInstanceId);
} catch (error) {
await logger.error('Failed to rebuild cache', {
source: 'PCDCache',
meta: { error: String(error), databaseInstanceId }
});
}
debounceTimers.delete(databaseInstanceId);
}, DEBOUNCE_DELAY);
debounceTimers.set(databaseInstanceId, timer);
}

142
src/lib/server/pcd/ops.ts Normal file
View File

@@ -0,0 +1,142 @@
/**
* Utilities for loading and managing SQL operations from PCD layers
*/
export interface Operation {
filename: string;
filepath: string;
sql: string;
order: number; // Extracted from numeric prefix
layer: 'schema' | 'base' | 'tweaks' | 'user';
}
/**
* Check if a path exists
*/
async function pathExists(path: string): Promise<boolean> {
try {
await Deno.stat(path);
return true;
} catch {
return false;
}
}
/**
* Load SQL operations from a directory, sorted by filename
*/
export async function loadOperationsFromDir(
dirPath: string,
layer: 'schema' | 'base' | 'tweaks' | 'user'
): Promise<Operation[]> {
if (!(await pathExists(dirPath))) {
return [];
}
const operations: Operation[] = [];
try {
for await (const entry of Deno.readDir(dirPath)) {
if (!entry.isFile || !entry.name.endsWith('.sql')) {
continue;
}
const filepath = `${dirPath}/${entry.name}`;
const sql = await Deno.readTextFile(filepath);
const order = extractOrderFromFilename(entry.name);
operations.push({
filename: entry.name,
filepath,
sql,
order,
layer
});
}
} catch (error) {
throw new Error(`Failed to read operations from ${dirPath}: ${error}`);
}
// Sort by order (numeric prefix)
return operations.sort((a, b) => a.order - b.order);
}
/**
* Extract numeric order from filename prefix
* Examples:
* "0.schema.sql" -> 0
* "1.initial.sql" -> 1
* "10.advanced.sql" -> 10
* "allow-DV.sql" -> Infinity (no prefix)
*/
function extractOrderFromFilename(filename: string): number {
const match = filename.match(/^(\d+)\./);
if (match) {
return parseInt(match[1], 10);
}
return Infinity; // Files without numeric prefix go last
}
/**
* Load all operations for a PCD in layer order:
* 1. Schema layer (from dependency)
* 2. Base layer (from PCD)
* 3. Tweaks layer (from PCD, optional)
* 4. User ops layer (TODO: future implementation)
*/
export async function loadAllOperations(pcdPath: string): Promise<Operation[]> {
const allOperations: Operation[] = [];
// 1. Load schema layer from dependency
const schemaPath = `${pcdPath}/deps/schema/ops`;
const schemaOps = await loadOperationsFromDir(schemaPath, 'schema');
allOperations.push(...schemaOps);
// 2. Load base layer from PCD
const basePath = `${pcdPath}/ops`;
const baseOps = await loadOperationsFromDir(basePath, 'base');
allOperations.push(...baseOps);
// 3. Load tweaks layer (optional)
const tweaksPath = `${pcdPath}/tweaks`;
const tweakOps = await loadOperationsFromDir(tweaksPath, 'tweaks');
allOperations.push(...tweakOps);
// 4. User ops layer (TODO: implement in future)
// const userOpsPath = `${pcdPath}/user_ops`;
// const userOps = await loadOperationsFromDir(userOpsPath, 'user');
// allOperations.push(...userOps);
return allOperations;
}
/**
* Validate that operations can be executed
* - Check for empty SQL
* - Check for duplicate order numbers within a layer
*/
export function validateOperations(operations: Operation[]): void {
for (const op of operations) {
if (!op.sql.trim()) {
throw new Error(`Operation ${op.filename} in ${op.layer} layer is empty`);
}
}
// Check for duplicate order numbers within each layer
const layerOrders = new Map<string, Set<number>>();
for (const op of operations) {
if (op.order === Infinity) continue; // Skip unprefixed files
if (!layerOrders.has(op.layer)) {
layerOrders.set(op.layer, new Set());
}
const orders = layerOrders.get(op.layer)!;
if (orders.has(op.order)) {
throw new Error(
`Duplicate order number ${op.order} in ${op.layer} layer (${op.filename})`
);
}
orders.add(op.order);
}
}

View File

@@ -9,6 +9,8 @@ import { loadManifest, type Manifest } from './manifest.ts';
import { getPCDPath } from './paths.ts';
import { processDependencies } from './deps.ts';
import { notificationManager } from '$notifications/NotificationManager.ts';
import { compile, invalidate, startWatch, getCache } from './cache.ts';
import { logger } from '$logger/logger.ts';
export interface LinkOptions {
repositoryUrl: string;
@@ -65,6 +67,20 @@ class PCDManager {
throw new Error('Failed to retrieve created database instance');
}
// Compile cache and start watching (only if enabled)
if (instance.enabled) {
try {
await compile(localPath, id);
await startWatch(localPath, id);
} catch (error) {
// Log error but don't fail the link operation
await logger.error('Failed to compile PCD cache after linking', {
source: 'PCDManager',
meta: { error: String(error), databaseId: id }
});
}
}
// Send notification
await notificationManager.notify({
type: 'pcd.linked',
@@ -101,7 +117,10 @@ class PCDManager {
// Store name and URL for notification
const { name, repository_url } = instance;
// Delete from database first
// Invalidate cache first
await invalidate(id);
// Delete from database
databaseInstancesQueries.delete(id);
// Then cleanup filesystem
@@ -153,6 +172,19 @@ class PCDManager {
// Update last_synced_at
databaseInstancesQueries.updateSyncedAt(id);
// Recompile cache and restart watching (only if enabled)
if (instance.enabled) {
try {
await compile(instance.local_path, id);
await startWatch(instance.local_path, id);
} catch (error) {
await logger.error('Failed to recompile PCD cache after sync', {
source: 'PCDManager',
meta: { error: String(error), databaseId: id }
});
}
}
return {
success: true,
commitsBehind: updateInfo.commitsBehind
@@ -236,6 +268,57 @@ class PCDManager {
getDueForSync(): DatabaseInstance[] {
return databaseInstancesQueries.getDueForSync();
}
/**
* Initialize PCD caches for all enabled databases
* Should be called on application startup
*/
async initialize(): Promise<void> {
await logger.info('Initializing PCD caches', { source: 'PCDManager' });
const instances = databaseInstancesQueries.getAll();
const enabledInstances = instances.filter((instance) => instance.enabled);
await logger.info(`Found ${enabledInstances.length} enabled databases to compile`, {
source: 'PCDManager',
meta: {
total: instances.length,
enabled: enabledInstances.length
}
});
// Compile and watch all enabled instances
for (const instance of enabledInstances) {
try {
await logger.info(`Compiling cache for database: ${instance.name}`, {
source: 'PCDManager',
meta: { databaseId: instance.id }
});
await compile(instance.local_path, instance.id);
await startWatch(instance.local_path, instance.id);
await logger.info(`Successfully compiled cache for: ${instance.name}`, {
source: 'PCDManager',
meta: { databaseId: instance.id }
});
} catch (error) {
await logger.error(`Failed to compile cache for: ${instance.name}`, {
source: 'PCDManager',
meta: { error: String(error), databaseId: instance.id }
});
}
}
await logger.info('PCD cache initialization complete', { source: 'PCDManager' });
}
/**
* Get the cache for a database instance
*/
getCache(id: number) {
return getCache(id);
}
}
// Export singleton instance

256
src/lib/server/pcd/types.ts Normal file
View File

@@ -0,0 +1,256 @@
/**
* Type definitions for PCD (Profilarr Compliant Database) entities
* Based on PCD Schema v1
*/
// ============================================================================
// CORE ENTITY TYPES
// ============================================================================
export interface Tag {
id: number;
name: string;
created_at: string;
}
export interface Language {
id: number;
name: string;
created_at: string;
updated_at: string;
}
export interface RegularExpression {
id: number;
name: string;
pattern: string;
regex101_id: string | null;
description: string | null;
created_at: string;
updated_at: string;
tags?: Tag[];
}
export interface Quality {
id: number;
name: string;
created_at: string;
updated_at: string;
}
export interface CustomFormat {
id: number;
name: string;
description: string | null;
created_at: string;
updated_at: string;
tags?: Tag[];
conditions?: CustomFormatCondition[];
}
// ============================================================================
// QUALITY PROFILE TYPES
// ============================================================================
export interface QualityProfile {
id: number;
name: string;
description: string | null;
upgrades_allowed: number; // 0 or 1 (boolean)
minimum_custom_format_score: number;
upgrade_until_score: number;
upgrade_score_increment: number;
created_at: string;
updated_at: string;
tags?: Tag[];
languages?: QualityProfileLanguage[];
qualities?: QualityProfileQuality[];
customFormats?: QualityProfileCustomFormat[];
}
export interface QualityGroup {
id: number;
quality_profile_id: number;
name: string;
created_at: string;
updated_at: string;
members?: Quality[]; // Qualities in this group
}
export interface QualityProfileLanguage {
language: Language;
type: 'must' | 'only' | 'not' | 'simple';
}
export interface QualityProfileQuality {
id: number;
quality_profile_id: number;
quality_id: number | null;
quality_group_id: number | null;
position: number;
upgrade_until: number; // 0 or 1 (boolean)
// Populated fields
quality?: Quality;
qualityGroup?: QualityGroup;
}
export interface QualityProfileCustomFormat {
customFormat: CustomFormat;
arr_type: 'radarr' | 'sonarr' | 'all';
score: number;
}
// ============================================================================
// CUSTOM FORMAT CONDITION TYPES
// ============================================================================
export type ArrType = 'radarr' | 'sonarr' | 'all';
export type ConditionType =
| 'release_title'
| 'release_group'
| 'edition'
| 'language'
| 'indexer_flag'
| 'source'
| 'resolution'
| 'quality_modifier'
| 'size'
| 'release_type'
| 'year';
export interface CustomFormatCondition {
id: number;
custom_format_id: number;
name: string;
type: ConditionType;
arr_type: ArrType;
negate: number; // 0 or 1 (boolean)
required: number; // 0 or 1 (boolean)
created_at: string;
updated_at: string;
// Type-specific data (only one will be populated based on type)
patternData?: ConditionPattern;
languageData?: ConditionLanguage;
indexerFlagData?: ConditionIndexerFlag;
sourceData?: ConditionSource;
resolutionData?: ConditionResolution;
qualityModifierData?: ConditionQualityModifier;
sizeData?: ConditionSize;
releaseTypeData?: ConditionReleaseType;
yearData?: ConditionYear;
}
// Condition type-specific data structures
export interface ConditionPattern {
regular_expression_id: number;
regularExpression?: RegularExpression;
}
export interface ConditionLanguage {
language_id: number;
except_language: number; // 0 or 1 (boolean)
language?: Language;
}
export interface ConditionIndexerFlag {
flag: string;
}
export interface ConditionSource {
source: string;
}
export interface ConditionResolution {
resolution: string;
}
export interface ConditionQualityModifier {
quality_modifier: string;
}
export interface ConditionSize {
min_bytes: number | null;
max_bytes: number | null;
}
export interface ConditionReleaseType {
release_type: string;
}
export interface ConditionYear {
min_year: number | null;
max_year: number | null;
}
// ============================================================================
// RAW DATABASE ROW TYPES (for direct SQL queries)
// ============================================================================
export interface QualityProfileRow {
id: number;
name: string;
description: string | null;
upgrades_allowed: number;
minimum_custom_format_score: number;
upgrade_until_score: number;
upgrade_score_increment: number;
created_at: string;
updated_at: string;
}
export interface CustomFormatRow {
id: number;
name: string;
description: string | null;
created_at: string;
updated_at: string;
}
export interface CustomFormatConditionRow {
id: number;
custom_format_id: number;
name: string;
type: string;
arr_type: string;
negate: number;
required: number;
created_at: string;
updated_at: string;
}
export interface QualityProfileCustomFormatRow {
quality_profile_id: number;
custom_format_id: number;
arr_type: string;
score: number;
}
export interface QualityProfileLanguageRow {
quality_profile_id: number;
language_id: number;
type: string;
}
export interface QualityProfileQualityRow {
id: number;
quality_profile_id: number;
quality_id: number | null;
quality_group_id: number | null;
position: number;
upgrade_until: number;
}
export interface QualityGroupRow {
id: number;
quality_profile_id: number;
name: string;
created_at: string;
updated_at: string;
}
export interface QualityGroupMemberRow {
quality_group_id: number;
quality_id: number;
}