From e3709fe677b31436d07cdc80d7ff7bd00f099fa8 Mon Sep 17 00:00:00 2001 From: mtayfur Date: Wed, 15 Oct 2025 14:05:01 +0300 Subject: [PATCH] Refactor cache management in Filter class; reduce maximum cache entries and concurrent user caches for improved performance and clarity. Update cache management methods for consistency and better logging. --- memory_system.py | 43 ++++++++++--------------------------------- 1 file changed, 10 insertions(+), 33 deletions(-) diff --git a/memory_system.py b/memory_system.py index 0b32807..166de0d 100644 --- a/memory_system.py +++ b/memory_system.py @@ -39,8 +39,8 @@ class Constants: LLM_CONSOLIDATION_TIMEOUT_SEC = 60.0 # Timeout for LLM consolidation operations # Cache System - MAX_CACHE_ENTRIES_PER_TYPE = 2500 # Maximum cache entries per cache type - MAX_CONCURRENT_USER_CACHES = 250 # Maximum concurrent user cache instances + MAX_CACHE_ENTRIES_PER_TYPE = 500 # Maximum cache entries per cache type + MAX_CONCURRENT_USER_CACHES = 50 # Maximum concurrent user cache instances CACHE_KEY_HASH_PREFIX_LENGTH = 10 # Hash prefix length for cache keys # Retrieval & Similarity @@ -313,26 +313,6 @@ class UnifiedCacheManager: async with self._lock: self.caches.clear() - async def get_cache_stats(self) -> Dict[str, Any]: - """Get cache statistics for monitoring.""" - async with self._lock: - total_users = len(self.caches) - total_items = 0 - cache_type_counts = {} - - for user_id, user_cache in self.caches.items(): - for cache_type, type_cache in user_cache.items(): - cache_type_counts[cache_type] = cache_type_counts.get(cache_type, 0) + len(type_cache) - total_items += len(type_cache) - - return { - "total_users": total_users, - "total_items": total_items, - "cache_type_counts": cache_type_counts, - "max_users": self.max_users, - "max_cache_size_per_type": self.max_cache_size_per_type, - } - class SkipDetector: """Semantic-based content classifier using zero-shot classification with category descriptions.""" @@ -979,7 +959,7 @@ class LLMConsolidationService: if total_executed > 0: operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count) logger.info(f"๐Ÿ”„ Memory Operations: {', '.join(operation_details)}") - await self.memory_system._manage_user_cache(user_id) + await self.memory_system._refresh_user_cache(user_id) return created_count, updated_count, deleted_count, failed_count @@ -1584,16 +1564,13 @@ class Filter: await self._cache_manager.clear_all_caches() - async def _manage_user_cache(self, user_id: str, clear_first: bool = False) -> None: - """Manage user cache - clear, invalidate, and refresh as needed.""" + async def _refresh_user_cache(self, user_id: str) -> None: + """Refresh user cache - clear stale caches and update with fresh embeddings.""" start_time = time.time() try: - if clear_first: - total_removed = await self._cache_manager.clear_user_cache(user_id) - logger.info(f"๐Ÿงน Cleared {total_removed} cache entries for user {user_id}") - else: - retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE) - logger.info(f"๐Ÿ”„ Cleared {retrieval_cleared} retrieval cache entries for user {user_id}") + retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE) + embedding_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.EMBEDDING_CACHE) + logger.info(f"๐Ÿ”„ Cleared {retrieval_cleared} retrieval + {embedding_cleared} embedding cache entries for user {user_id}") user_memories = await self._get_user_memories(user_id) memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id) @@ -1614,10 +1591,10 @@ class Filter: if memory_contents: await self._generate_embeddings(memory_contents, user_id) duration = time.time() - start_time - logger.info(f"๏ฟฝ Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s") + logger.info(f"๐Ÿ”„ Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s") except Exception as e: - raise RuntimeError(f"๐Ÿงน Failed to manage cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}") + raise RuntimeError(f"๐Ÿงน Failed to refresh cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}") async def _execute_single_operation(self, operation: Models.MemoryOperation, user: Any) -> str: """Execute a single memory operation."""