Refactor memory operations in Filter class for improved readability and consistency; utilize statistics.median for score calculation and streamline operation details formatting.

This commit is contained in:
mtayfur
2025-10-12 22:54:18 +03:00
parent 2db2d3f2c8
commit 158f0d1983

View File

@@ -7,6 +7,7 @@ import asyncio
import hashlib
import json
import logging
import statistics
import time
from collections import OrderedDict
from datetime import datetime, timezone
@@ -820,8 +821,8 @@ class LLMConsolidationService:
if not user_memories:
logger.info("💭 No existing memories found for consolidation")
return []
else:
logger.info(f"🚀 Reusing cached user memories for consolidation: {len(user_memories)} memories")
logger.info(f"🚀 Reusing cached user memories for consolidation: {len(user_memories)} memories")
try:
all_similarities, _, _ = await self.memory_system._compute_similarities(user_message, user_id, user_memories)
@@ -1293,7 +1294,7 @@ class Filter:
top_score = max(scores)
lowest_score = min(scores)
median_score = sorted(scores)[len(scores) // 2]
median_score = statistics.median(scores)
context_label = "📊 Consolidation candidate memories" if context_type == "consolidation" else "📊 Retrieved memories"
max_scores_to_show = int(self.valves.max_memories_returned * Constants.EXTENDED_MAX_MEMORY_MULTIPLIER)
@@ -1304,16 +1305,8 @@ class Filter:
logger.info(f"Scores: [{scores_str}{suffix}]")
def _build_operation_details(self, created_count: int, updated_count: int, deleted_count: int) -> List[str]:
"""Build operation details list with consistent formatting."""
operation_details = []
operations = [(created_count, "📝 Created"), (updated_count, "✏️ Updated"), (deleted_count, "🗑️ Deleted")]
for count, label in operations:
if count > 0:
operation_details.append(f"{label} {count}")
return operation_details
return [f"{label} {count}" for count, label in operations if count > 0]
def _cache_key(self, cache_type: str, user_id: str, content: Optional[str] = None) -> str:
"""Unified cache key generation for all cache types."""
@@ -1522,8 +1515,7 @@ class Filter:
if user_memories is None:
user_memories = await self._get_user_memories(user_id)
if user_memories:
await self._cache_manager.put(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key, user_memories)
await self._cache_manager.put(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key, user_memories)
retrieval_result = await self._retrieve_relevant_memories(user_message, user_id, user_memories, __event_emitter__)
memories = retrieval_result.get("memories", [])
@@ -1632,36 +1624,41 @@ class Filter:
"""Execute a single memory operation."""
try:
if operation.operation == Models.MemoryOperationType.CREATE:
if not operation.content.strip():
content_stripped = operation.content.strip()
if not content_stripped:
logger.warning(f"⚠️ Skipping CREATE operation: empty content")
return Models.OperationResult.SKIPPED_EMPTY_CONTENT.value
await asyncio.wait_for(
asyncio.to_thread(Memories.insert_new_memory, user.id, operation.content.strip()), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
asyncio.to_thread(Memories.insert_new_memory, user.id, content_stripped), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
)
return Models.MemoryOperationType.CREATE.value
elif operation.operation == Models.MemoryOperationType.UPDATE:
if not operation.id.strip():
id_stripped = operation.id.strip()
if not id_stripped:
logger.warning(f"⚠️ Skipping UPDATE operation: empty ID")
return Models.OperationResult.SKIPPED_EMPTY_ID.value
if not operation.content.strip():
logger.warning(f"⚠️ Skipping UPDATE operation for {operation.id}: empty content")
content_stripped = operation.content.strip()
if not content_stripped:
logger.warning(f"⚠️ Skipping UPDATE operation for {id_stripped}: empty content")
return Models.OperationResult.SKIPPED_EMPTY_CONTENT.value
await asyncio.wait_for(
asyncio.to_thread(Memories.update_memory_by_id_and_user_id, operation.id, user.id, operation.content.strip()),
asyncio.to_thread(Memories.update_memory_by_id_and_user_id, id_stripped, user.id, content_stripped),
timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC,
)
return Models.MemoryOperationType.UPDATE.value
elif operation.operation == Models.MemoryOperationType.DELETE:
if not operation.id.strip():
id_stripped = operation.id.strip()
if not id_stripped:
logger.warning(f"⚠️ Skipping DELETE operation: empty ID")
return Models.OperationResult.SKIPPED_EMPTY_ID.value
await asyncio.wait_for(
asyncio.to_thread(Memories.delete_memory_by_id_and_user_id, operation.id, user.id), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
asyncio.to_thread(Memories.delete_memory_by_id_and_user_id, id_stripped, user.id), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
)
return Models.MemoryOperationType.DELETE.value
else: