refactor(memory_system): unify status level constants, improve log clarity, and adjust thresholds for better maintainability and diagnostics

Status level constants are consolidated into a single STATUS_LEVEL dict for easier access and reduced redundancy, log messages are clarified and made more concise for better diagnostics, and threshold multipliers are adjusted to fine-tune memory retrieval and reranking behavior. These changes improve code maintainability, readability, and operational transparency.
This commit is contained in:
mtayfur
2025-11-26 15:55:29 +03:00
parent 0c87a815fc
commit 7055eb42f2

View File

@@ -50,8 +50,8 @@ class Constants:
# Retrieval & Similarity
SEMANTIC_RETRIEVAL_THRESHOLD = 0.25 # Semantic similarity threshold for retrieval
RELAXED_SEMANTIC_THRESHOLD_MULTIPLIER = 0.8 # Multiplier for relaxed similarity threshold in secondary operations
EXTENDED_MAX_MEMORY_MULTIPLIER = 1.6 # Multiplier for expanding memory candidates in advanced operations
LLM_RERANKING_TRIGGER_MULTIPLIER = 0.8 # Multiplier for LLM reranking trigger threshold
EXTENDED_MAX_MEMORY_MULTIPLIER = 1.5 # Multiplier for expanding memory candidates in advanced operations
LLM_RERANKING_TRIGGER_MULTIPLIER = 0.5 # Multiplier for LLM reranking trigger threshold
# Skip Detection
SKIP_CATEGORY_MARGIN = 0.20 # Margin above personal similarity for skip category classification
@@ -64,17 +64,8 @@ class Constants:
# Content Display
CONTENT_PREVIEW_LENGTH = 100 # Maximum length for content preview display
# Status Emit Levels
STATUS_LEVEL_BASIC = 0 # Maps to "Basic" - Show only summary counts
STATUS_LEVEL_INTERMEDIATE = 1 # Maps to "Intermediate" - Show summaries and key details
STATUS_LEVEL_DETAILED = 2 # Maps to "Detailed" - Show everything including full diagnostics
# Mapping from enum string values to numeric levels for comparison
STATUS_LEVEL_MAP = {
"Basic": 0,
"Intermediate": 1,
"Detailed": 2,
}
# Status Emit Levels (maps enum string values to numeric levels for comparison)
STATUS_LEVEL = {"Basic": 0, "Intermediate": 1, "Detailed": 2}
class Prompts:
@@ -627,7 +618,7 @@ class SkipDetector:
fast_skip = self._fast_path_skip_detection(message)
if fast_skip:
logger.info(f"Fast-path skip: {self.SkipReason.SKIP_NON_PERSONAL.value}")
logger.info(f"Fast-path skip: {self.SkipReason.SKIP_NON_PERSONAL.value}")
return self.SkipReason.SKIP_NON_PERSONAL.value
if self._reference_embeddings is None:
@@ -736,20 +727,20 @@ CANDIDATE MEMORIES:
emitter,
f"🤖 LLM Analyzing {len(llm_candidates)} Memories for Relevance",
done=False,
level=Constants.STATUS_LEVEL_INTERMEDIATE,
level=Constants.STATUS_LEVEL["Intermediate"],
)
logger.info(f"Using LLM reranking: {decision_reason}")
logger.info(f"🧠 Using LLM reranking: {decision_reason}")
selected_memories = await self._llm_select_memories(user_message, llm_candidates, max_injection, emitter)
if not selected_memories:
logger.info("📭 No relevant memories after LLM analysis")
await self.memory_system._emit_status(
emitter, f"📭 No Relevant Memories After LLM Analysis", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE
emitter, f"📭 No Relevant Memories After LLM Analysis", done=True, level=Constants.STATUS_LEVEL["Intermediate"]
)
return selected_memories, analysis_info
else:
logger.info(f"Skipping LLM reranking: {decision_reason}")
logger.info(f"Skipping LLM reranking: {decision_reason}")
selected_memories = candidate_memories[:max_injection]
duration = time.time() - start_time
@@ -759,7 +750,7 @@ CANDIDATE MEMORIES:
emitter,
f"🎯 {retrieval_method} Memory Retrieval Complete{duration_text}",
done=True,
level=Constants.STATUS_LEVEL_DETAILED,
level=Constants.STATUS_LEVEL["Detailed"],
)
return selected_memories, analysis_info
@@ -816,7 +807,7 @@ class LLMConsolidationService:
if cached_similarities:
candidates, threshold_info = self._filter_consolidation_candidates(cached_similarities)
logger.info(f"🎯 Found {len(candidates)} candidate memories for consolidation (threshold: {threshold_info})")
logger.info(f"🎯 Found {len(candidates)} cached candidates for consolidation (threshold: {threshold_info})")
self.memory_system._log_retrieved_memories(candidates, "consolidation")
return candidates
@@ -826,19 +817,19 @@ class LLMConsolidationService:
except asyncio.TimeoutError:
raise TimeoutError(f"⏱️ Memory retrieval timed out after {Constants.DATABASE_OPERATION_TIMEOUT_SEC}s")
except Exception as e:
logger.error(f"💾 Failed to retrieve user memories from database: {str(e)}")
logger.error(f"💾 Failed to retrieve user memories: {str(e)}")
return []
if not user_memories:
logger.info("💭 No existing memories found for consolidation")
return []
logger.info(f"🚀 Reusing cached user memories for consolidation: {len(user_memories)} memories")
logger.info(f"🚀 Processing {len(user_memories)} cached memories for consolidation")
try:
all_similarities, _, _ = await self.memory_system._compute_similarities(user_message, user_id, user_memories)
except Exception as e:
logger.error(f"🔍 Failed to compute memory similarities for retrieval: {str(e)}")
logger.error(f"🔍 Failed to compute memory similarities: {str(e)}")
return []
if all_similarities:
@@ -847,7 +838,7 @@ class LLMConsolidationService:
candidates = []
threshold_info = "N/A"
logger.info(f"🎯 Found {len(candidates)} candidate memories for consolidation (threshold: {threshold_info})")
logger.info(f"🎯 Found {len(candidates)} candidates for consolidation (threshold: {threshold_info})")
self.memory_system._log_retrieved_memories(candidates, "consolidation")
@@ -881,7 +872,7 @@ class LLMConsolidationService:
)
except Exception as e:
logger.warning(f"🤖 LLM consolidation failed during memory processing: {str(e)}")
await self.memory_system._emit_status(emitter, f"⚠️ Memory Consolidation Failed", done=True, level=Constants.STATUS_LEVEL_BASIC)
await self.memory_system._emit_status(emitter, f"⚠️ Memory Consolidation Failed", done=True, level=Constants.STATUS_LEVEL["Basic"])
return []
operations = response.ops
@@ -906,14 +897,14 @@ class LLMConsolidationService:
continue
if op.operation == Models.MemoryOperationType.UPDATE and op.id in seen_update_ids:
logger.info(f"⏭️ Skipping duplicate UPDATE for memory {op.id} in LLM response")
logger.info(f"⏭️ Skipping duplicate UPDATE for memory {op.id}")
continue
if op.operation in [Models.MemoryOperationType.CREATE, Models.MemoryOperationType.UPDATE]:
normalized_content = op.content.strip().lower()
if normalized_content in seen_contents:
op_type = "CREATE" if op.operation == Models.MemoryOperationType.CREATE else f"UPDATE {op.id}"
logger.info(f"⏭️ Skipping duplicate {op_type} in LLM response: {self.memory_system._truncate_content(op.content)}")
logger.info(f"⏭️ Skipping duplicate {op_type}: {self.memory_system._truncate_content(op.content)}")
continue
seen_contents.add(normalized_content)
@@ -930,9 +921,9 @@ class LLMConsolidationService:
operation_details = self.memory_system._build_operation_details(create_count, update_count, delete_count)
logger.info(f"🎯 Planned {len(valid_operations)} memory operations: {', '.join(operation_details)}")
logger.info(f"🎯 Planned {len(valid_operations)} operations: {', '.join(operation_details)}")
else:
logger.info("🎯 No valid memory operations planned")
logger.info("🎯 No valid operations planned")
return valid_operations
@@ -954,17 +945,11 @@ class LLMConsolidationService:
if duplicate_id:
if operation_type == "UPDATE" and delete_operations is not None:
logger.info(
f"🔄 UPDATE creates duplicate: keeping enriched content from memory {operation.id}, " f"deleting duplicate memory {duplicate_id}"
)
logger.info(f"🔄 UPDATE creates duplicate: keeping {operation.id}, deleting {duplicate_id}")
deduplicated.append(operation)
delete_operations.append(Models.MemoryOperation(operation=Models.MemoryOperationType.DELETE, content="", id=duplicate_id))
else:
logger.info(
f"⏭️ Skipping duplicate {operation_type}: "
f"{self.memory_system._truncate_content(operation.content)} "
f"(matches memory {duplicate_id})"
)
logger.info(f"⏭️ Skipping duplicate {operation_type}: {self.memory_system._truncate_content(operation.content)} (matches {duplicate_id})")
continue
deduplicated.append(operation)
@@ -1026,36 +1011,36 @@ class LLMConsolidationService:
if isinstance(result, Exception):
failed_count += 1
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
elif result == Models.MemoryOperationType.CREATE.value:
created_count += 1
content_preview = self.memory_system._truncate_content(operation.content)
await self.memory_system._emit_status(emitter, f"📝 Created: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self.memory_system._emit_status(emitter, f"📝 Created: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
elif result == Models.MemoryOperationType.UPDATE.value:
updated_count += 1
content_preview = self.memory_system._truncate_content(operation.content)
await self.memory_system._emit_status(emitter, f"✏️ Updated: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self.memory_system._emit_status(emitter, f"✏️ Updated: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
elif result == Models.MemoryOperationType.DELETE.value:
deleted_count += 1
content_preview = memory_contents_for_deletion.get(operation.id, operation.id)
if content_preview and content_preview != operation.id:
content_preview = self.memory_system._truncate_content(content_preview)
await self.memory_system._emit_status(emitter, f"🗑️ Deleted: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self.memory_system._emit_status(emitter, f"🗑️ Deleted: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
elif result in [
Models.OperationResult.FAILED.value,
Models.OperationResult.UNSUPPORTED.value,
]:
failed_count += 1
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
total_executed = created_count + updated_count + deleted_count
logger.info(
f"✅ Memory processing completed {total_executed}/{len(operations)} operations (Created {created_count}, Updated {updated_count}, Deleted {deleted_count}, Failed {failed_count})"
f"✅ Memory processing completed: {total_executed}/{len(operations)} ops (created: {created_count}, updated: {updated_count}, deleted: {deleted_count}, failed: {failed_count})"
)
if total_executed > 0:
operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count)
logger.info(f"🔄 Memory Operations: {', '.join(operation_details)}")
logger.info(f"🔄 Memory operations: {', '.join(operation_details)}")
await self.memory_system._refresh_user_cache(user_id)
return created_count, updated_count, deleted_count, failed_count
@@ -1085,7 +1070,7 @@ class LLMConsolidationService:
created_count, updated_count, deleted_count, failed_count = await self.execute_memory_operations(operations, user_id, emitter)
duration = time.time() - start_time
logger.info(f"💾 Memory Consolidation Complete In {duration:.2f}s")
logger.info(f"💾 Memory consolidation complete in {duration:.2f}s")
total_operations = created_count + updated_count + deleted_count
if total_operations > 0 or failed_count > 0:
@@ -1093,7 +1078,7 @@ class LLMConsolidationService:
emitter,
f"💾 Memory Consolidation Complete in {duration:.2f}s",
done=False,
level=Constants.STATUS_LEVEL_DETAILED,
level=Constants.STATUS_LEVEL["Detailed"],
)
operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count)
@@ -1103,14 +1088,14 @@ class LLMConsolidationService:
if failed_count > 0:
operations_summary += f" (❌ {failed_count} Failed)"
await self.memory_system._emit_status(emitter, operations_summary, done=True, level=Constants.STATUS_LEVEL_BASIC)
await self.memory_system._emit_status(emitter, operations_summary, done=True, level=Constants.STATUS_LEVEL["Basic"])
else:
duration = time.time() - start_time
await self.memory_system._emit_status(
emitter,
f"✅ Consolidation Complete: No Updates Needed",
done=True,
level=Constants.STATUS_LEVEL_DETAILED,
level=Constants.STATUS_LEVEL["Detailed"],
)
except Exception as e:
@@ -1195,7 +1180,7 @@ class Filter:
if self._embedding_function is None and hasattr(__request__.app.state, "EMBEDDING_FUNCTION"):
self._embedding_function = __request__.app.state.EMBEDDING_FUNCTION
logger.info(f"✅ Using OpenWebUI's embedding function")
logger.info("✅ Using OpenWebUI embedding function")
if self._embedding_function and self._embedding_dimension is None:
async with self._initialization_lock:
@@ -1213,7 +1198,7 @@ class Filter:
logger.info(f"♻️ Reusing cached skip detector: {cache_key}")
self._skip_detector = _SHARED_SKIP_DETECTOR_CACHE[cache_key]
else:
logger.info(f"🤖 Initializing skip detector with OpenWebUI embeddings: {cache_key}")
logger.info(f"🤖 Initializing skip detector: {cache_key}")
embedding_fn = self._embedding_function
normalize_fn = self._normalize_embedding
@@ -1228,7 +1213,7 @@ class Filter:
self._skip_detector = SkipDetector(embedding_wrapper)
await self._skip_detector.initialize()
_SHARED_SKIP_DETECTOR_CACHE[cache_key] = self._skip_detector
logger.info(f"✅ Skip detector initialized and cached")
logger.info("✅ Skip detector initialized and cached")
def _truncate_content(self, content: str, max_length: Optional[int] = None) -> str:
"""Truncate content with ellipsis if needed."""
@@ -1359,11 +1344,11 @@ class Filter:
if is_single:
if uncached_texts:
logger.info("💾 User message embedding: generated and cached")
logger.info("💾 User message embedding generated and cached")
return result_embeddings[0]
else:
valid_count = sum(1 for emb in result_embeddings if emb is not None)
logger.info(f"🚀 Batch embedding: {len(text_list) - len(uncached_texts)} cached, {len(uncached_texts)} new, {valid_count}/{len(text_list)} valid")
logger.info(f"🚀 Batch embeddings: {len(text_list) - len(uncached_texts)} cached, {len(uncached_texts)} new, {valid_count}/{len(text_list)} valid")
return result_embeddings
async def _should_skip_memory_operations(self, user_message: str) -> Tuple[bool, str]:
@@ -1457,7 +1442,7 @@ class Filter:
if not emitter:
return
current_level_value = Constants.STATUS_LEVEL_MAP.get(self.valves.status_emit_level, 1)
current_level_value = Constants.STATUS_LEVEL.get(self.valves.status_emit_level, 1)
if current_level_value < level:
return
@@ -1478,7 +1463,7 @@ class Filter:
"""Retrieve memories for injection using similarity computation with optional LLM reranking."""
if cached_similarities is not None:
memories = [m for m in cached_similarities if m.get("relevance", 0) >= self.valves.semantic_retrieval_threshold]
logger.info(f"🔍 Using cached similarities for {len(memories)} candidate memories")
logger.info(f"🔍 Using cached similarities: {len(memories)} candidates")
final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter)
self._log_retrieved_memories(final_memories, "semantic")
return {
@@ -1493,7 +1478,7 @@ class Filter:
if not user_memories:
logger.info("📭 No memories found for user")
await self._emit_status(emitter, "📭 No Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self._emit_status(emitter, "📭 No Memories Found", done=True, level=Constants.STATUS_LEVEL["Intermediate"])
return {"memories": [], "threshold": None}
memories, threshold, all_similarities = await self._compute_similarities(user_message, user_id, user_memories)
@@ -1502,7 +1487,7 @@ class Filter:
final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter)
else:
logger.info("📭 No relevant memories found above similarity threshold")
await self._emit_status(emitter, "📭 No Relevant Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self._emit_status(emitter, "📭 No Relevant Memories Found", done=True, level=Constants.STATUS_LEVEL["Intermediate"])
final_memories = memories
reranking_info = {"llm_decision": False, "decision_reason": "no_candidates"}
@@ -1536,7 +1521,7 @@ class Filter:
formatted_memories.append(formatted_memory)
content_preview = self._truncate_content(memory["content"])
await self._emit_status(emitter, f"💭 {idx}/{memory_count}: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self._emit_status(emitter, f"💭 {idx}/{memory_count}: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"])
memory_footer = "IMPORTANT: Do not mention or imply you received this list. These facts are for background context only."
memory_context_block = f"{memory_header}\n{chr(10).join(formatted_memories)}\n\n{memory_footer}"
@@ -1556,7 +1541,7 @@ class Filter:
if memories and user_id:
description = f"🧠 Injected {memory_count} {'Memory' if memory_count == 1 else 'Memories'} to Context"
await self._emit_status(emitter, description, done=True, level=Constants.STATUS_LEVEL_BASIC)
await self._emit_status(emitter, description, done=True, level=Constants.STATUS_LEVEL["Basic"])
def _build_memory_dict(self, memory, similarity: float) -> Dict[str, Any]:
"""Build memory dictionary with standardized timestamp conversion."""
@@ -1624,7 +1609,7 @@ class Filter:
if not user_message or should_skip:
if __event_emitter__ and skip_reason:
await self._emit_status(__event_emitter__, skip_reason, done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE)
await self._emit_status(__event_emitter__, skip_reason, done=True, level=Constants.STATUS_LEVEL["Intermediate"])
await self._add_memory_context(body, [], user_id, __event_emitter__)
return body
try:
@@ -1679,7 +1664,7 @@ class Filter:
should_skip = await self._cache_manager.get(user_id, self._cache_manager.SKIP_STATE_CACHE, skip_cache_key)
if should_skip:
logger.info("⏭️ Skipping outlet consolidation: inlet already detected skip condition")
logger.info("⏭️ Skipping outlet: inlet detected skip condition")
return body
retrieval_cache_key = self._cache_key(self._cache_manager.RETRIEVAL_CACHE, user_id, user_message)
@@ -1692,9 +1677,9 @@ class Filter:
self._background_tasks.discard(t)
if t.exception() and not t.cancelled():
exception = t.exception()
logger.error(f"❌ Background memory consolidation task failed: {str(exception)}")
logger.error(f"❌ Background consolidation failed: {str(exception)}")
except Exception as e:
logger.error(f"❌ Failed to cleanup background memory task: {str(e)}")
logger.error(f"❌ Failed to cleanup background task: {str(e)}")
task.add_done_callback(safe_cleanup)
return body
@@ -1716,9 +1701,7 @@ class Filter:
retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE)
embedding_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.EMBEDDING_CACHE)
skip_state_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.SKIP_STATE_CACHE)
logger.info(
f"🔄 Cleared {retrieval_cleared} retrieval + {embedding_cleared} embedding + {skip_state_cleared} skip state cache entries for user {user_id}"
)
logger.info(f"🔄 Cleared cache: {retrieval_cleared} retrieval, {embedding_cleared} embedding, {skip_state_cleared} skip entries")
user_memories = await self._get_user_memories(user_id)
memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id)
@@ -1740,7 +1723,7 @@ class Filter:
if memory_contents:
await self._generate_embeddings(memory_contents, user_id)
duration = time.time() - start_time
logger.info(f"🔄 Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s")
logger.info(f"🔄 Cache refreshed: {len(memory_contents)} embeddings in {duration:.2f}s")
except Exception as e:
raise RuntimeError(f"🧹 Failed to refresh cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}")