Refactor cache management in Filter class; reduce maximum cache entries and concurrent user caches for improved performance and clarity. Update cache management methods for consistency and better logging.

This commit is contained in:
mtayfur
2025-10-15 14:05:01 +03:00
parent 2deba4fb2c
commit e3709fe677

View File

@@ -39,8 +39,8 @@ class Constants:
LLM_CONSOLIDATION_TIMEOUT_SEC = 60.0 # Timeout for LLM consolidation operations
# Cache System
MAX_CACHE_ENTRIES_PER_TYPE = 2500 # Maximum cache entries per cache type
MAX_CONCURRENT_USER_CACHES = 250 # Maximum concurrent user cache instances
MAX_CACHE_ENTRIES_PER_TYPE = 500 # Maximum cache entries per cache type
MAX_CONCURRENT_USER_CACHES = 50 # Maximum concurrent user cache instances
CACHE_KEY_HASH_PREFIX_LENGTH = 10 # Hash prefix length for cache keys
# Retrieval & Similarity
@@ -313,26 +313,6 @@ class UnifiedCacheManager:
async with self._lock:
self.caches.clear()
async def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
async with self._lock:
total_users = len(self.caches)
total_items = 0
cache_type_counts = {}
for user_id, user_cache in self.caches.items():
for cache_type, type_cache in user_cache.items():
cache_type_counts[cache_type] = cache_type_counts.get(cache_type, 0) + len(type_cache)
total_items += len(type_cache)
return {
"total_users": total_users,
"total_items": total_items,
"cache_type_counts": cache_type_counts,
"max_users": self.max_users,
"max_cache_size_per_type": self.max_cache_size_per_type,
}
class SkipDetector:
"""Semantic-based content classifier using zero-shot classification with category descriptions."""
@@ -979,7 +959,7 @@ class LLMConsolidationService:
if total_executed > 0:
operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count)
logger.info(f"🔄 Memory Operations: {', '.join(operation_details)}")
await self.memory_system._manage_user_cache(user_id)
await self.memory_system._refresh_user_cache(user_id)
return created_count, updated_count, deleted_count, failed_count
@@ -1584,16 +1564,13 @@ class Filter:
await self._cache_manager.clear_all_caches()
async def _manage_user_cache(self, user_id: str, clear_first: bool = False) -> None:
"""Manage user cache - clear, invalidate, and refresh as needed."""
async def _refresh_user_cache(self, user_id: str) -> None:
"""Refresh user cache - clear stale caches and update with fresh embeddings."""
start_time = time.time()
try:
if clear_first:
total_removed = await self._cache_manager.clear_user_cache(user_id)
logger.info(f"🧹 Cleared {total_removed} cache entries for user {user_id}")
else:
retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE)
logger.info(f"🔄 Cleared {retrieval_cleared} retrieval cache entries for user {user_id}")
retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE)
embedding_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.EMBEDDING_CACHE)
logger.info(f"🔄 Cleared {retrieval_cleared} retrieval + {embedding_cleared} embedding cache entries for user {user_id}")
user_memories = await self._get_user_memories(user_id)
memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id)
@@ -1614,10 +1591,10 @@ class Filter:
if memory_contents:
await self._generate_embeddings(memory_contents, user_id)
duration = time.time() - start_time
logger.info(f"<EFBFBD> Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s")
logger.info(f"🔄 Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s")
except Exception as e:
raise RuntimeError(f"🧹 Failed to manage cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}")
raise RuntimeError(f"🧹 Failed to refresh cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}")
async def _execute_single_operation(self, operation: Models.MemoryOperation, user: Any) -> str:
"""Execute a single memory operation."""