From ff6052fd416b0dad2f34f552d7906400077f5c3c Mon Sep 17 00:00:00 2001 From: mtayfur Date: Fri, 7 Nov 2025 00:19:50 +0300 Subject: [PATCH] feat(memory_system): add configurable status message verbosity levels Introduce status_emit_level to control status message verbosity, allowing users to select Basic, Intermediate, or Detailed output; refactor all status emission calls to respect this setting, improving clarity and customizability of system feedback for different user needs. --- .python-version | 1 + memory_system.py | 79 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 64 insertions(+), 16 deletions(-) create mode 100644 .python-version diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/memory_system.py b/memory_system.py index a6648c4..5793d2a 100644 --- a/memory_system.py +++ b/memory_system.py @@ -15,7 +15,7 @@ import time from collections import OrderedDict from datetime import datetime, timezone from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union import numpy as np from fastapi import Request @@ -64,6 +64,18 @@ class Constants: # Content Display CONTENT_PREVIEW_LENGTH = 100 # Maximum length for content preview display + # Status Emit Levels + STATUS_LEVEL_BASIC = 0 # Maps to "Basic" - Show only summary counts + STATUS_LEVEL_INTERMEDIATE = 1 # Maps to "Intermediate" - Show summaries and key details + STATUS_LEVEL_DETAILED = 2 # Maps to "Detailed" - Show everything including full diagnostics + + # Mapping from enum string values to numeric levels for comparison + STATUS_LEVEL_MAP = { + "Basic": 0, + "Intermediate": 1, + "Detailed": 2, + } + class Prompts: """Container for all LLM prompts used in the memory system.""" @@ -191,6 +203,13 @@ Explanation: Query seeks general technical explanation without personal context. class Models: """Container for all Pydantic models used in the memory system.""" + class StatusEmitLevel(str, Enum): + """Verbosity levels for status message emission - selectable as dropdown with title case strings.""" + + BASIC = "Basic" + INTERMEDIATE = "Intermediate" + DETAILED = "Detailed" + class MemoryOperationType(Enum): CREATE = "CREATE" UPDATE = "UPDATE" @@ -710,6 +729,7 @@ CANDIDATE MEMORIES: emitter, f"🤖 LLM Analyzing {len(llm_candidates)} Memories for Relevance", done=False, + level=Constants.STATUS_LEVEL_INTERMEDIATE, ) logger.info(f"Using LLM reranking: {decision_reason}") @@ -717,7 +737,9 @@ CANDIDATE MEMORIES: if not selected_memories: logger.info("📭 No relevant memories after LLM analysis") - await self.memory_system._emit_status(emitter, f"📭 No Relevant Memories After LLM Analysis", done=True) + await self.memory_system._emit_status( + emitter, f"📭 No Relevant Memories After LLM Analysis", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE + ) return selected_memories, analysis_info else: logger.info(f"Skipping LLM reranking: {decision_reason}") @@ -730,6 +752,7 @@ CANDIDATE MEMORIES: emitter, f"🎯 {retrieval_method} Memory Retrieval Complete{duration_text}", done=True, + level=Constants.STATUS_LEVEL_DETAILED, ) return selected_memories, analysis_info @@ -849,7 +872,7 @@ class LLMConsolidationService: ) except Exception as e: logger.warning(f"🤖 LLM consolidation failed during memory processing: {str(e)}") - await self.memory_system._emit_status(emitter, f"⚠️ Memory Consolidation Failed", done=True) + await self.memory_system._emit_status(emitter, f"⚠️ Memory Consolidation Failed", done=True, level=Constants.STATUS_LEVEL_BASIC) return [] operations = response.ops @@ -1001,27 +1024,27 @@ class LLMConsolidationService: if isinstance(result, Exception): failed_count += 1 - await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False) + await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) elif result == Models.MemoryOperationType.CREATE.value: created_count += 1 content_preview = self.memory_system._truncate_content(operation.content) - await self.memory_system._emit_status(emitter, f"📝 Created: {content_preview}", done=False) + await self.memory_system._emit_status(emitter, f"📝 Created: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) elif result == Models.MemoryOperationType.UPDATE.value: updated_count += 1 content_preview = self.memory_system._truncate_content(operation.content) - await self.memory_system._emit_status(emitter, f"✏️ Updated: {content_preview}", done=False) + await self.memory_system._emit_status(emitter, f"✏️ Updated: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) elif result == Models.MemoryOperationType.DELETE.value: deleted_count += 1 content_preview = memory_contents_for_deletion.get(operation.id, operation.id) if content_preview and content_preview != operation.id: content_preview = self.memory_system._truncate_content(content_preview) - await self.memory_system._emit_status(emitter, f"🗑️ Deleted: {content_preview}", done=False) + await self.memory_system._emit_status(emitter, f"🗑️ Deleted: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) elif result in [ Models.OperationResult.FAILED.value, Models.OperationResult.UNSUPPORTED.value, ]: failed_count += 1 - await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False) + await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) total_executed = created_count + updated_count + deleted_count logger.info( @@ -1068,6 +1091,7 @@ class LLMConsolidationService: emitter, f"💾 Memory Consolidation Complete in {duration:.2f}s", done=False, + level=Constants.STATUS_LEVEL_DETAILED, ) operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count) @@ -1077,7 +1101,15 @@ class LLMConsolidationService: if failed_count > 0: operations_summary += f" (❌ {failed_count} Failed)" - await self.memory_system._emit_status(emitter, operations_summary, done=True) + await self.memory_system._emit_status(emitter, operations_summary, done=True, level=Constants.STATUS_LEVEL_BASIC) + else: + duration = time.time() - start_time + await self.memory_system._emit_status( + emitter, + f"✅ Consolidation Complete: No Updates Needed", + done=True, + level=Constants.STATUS_LEVEL_DETAILED, + ) except Exception as e: duration = time.time() - start_time @@ -1123,6 +1155,10 @@ class Filter: default=Constants.LLM_RERANKING_TRIGGER_MULTIPLIER, description="Controls when LLM reranking activates (lower = more aggressive)", ) + status_emit_level: Literal["Basic", "Intermediate", "Detailed"] = Field( + default="Detailed", + description="Status message verbosity level: Basic (summary counts only), Intermediate (summaries and key details), Detailed (all details)", + ) def __init__(self): """Initialize the Memory System filter with production validation.""" @@ -1433,11 +1469,22 @@ class Filter: memory_lines.append(line) return memory_lines - async def _emit_status(self, emitter: Optional[Callable], description: str, done: bool = True) -> None: - """Emit status messages for memory operations.""" + async def _emit_status( + self, + emitter: Optional[Callable], + description: str, + done: bool = True, + level: int = 1, + ) -> None: + """Emit status messages for memory operations based on configured verbosity level.""" if not emitter: return + current_level_value = Constants.STATUS_LEVEL_MAP.get(self.valves.status_emit_level, 1) + + if current_level_value < level: + return + payload = {"type": "status", "data": {"description": description, "done": done}} result = emitter(payload) if asyncio.iscoroutine(result): @@ -1469,7 +1516,7 @@ class Filter: if not user_memories: logger.info("📭 No memories found for user") - await self._emit_status(emitter, "📭 No Memories Found", done=True) + await self._emit_status(emitter, "📭 No Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) return {"memories": [], "threshold": None} memories, threshold, all_similarities = await self._compute_similarities(user_message, user_id, user_memories) @@ -1478,7 +1525,7 @@ class Filter: final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter) else: logger.info("📭 No relevant memories found above similarity threshold") - await self._emit_status(emitter, "📭 No Relevant Memories Found", done=True) + await self._emit_status(emitter, "📭 No Relevant Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) final_memories = memories reranking_info = {"llm_decision": False, "decision_reason": "no_candidates"} @@ -1512,7 +1559,7 @@ class Filter: formatted_memories.append(formatted_memory) content_preview = self._truncate_content(memory["content"]) - await self._emit_status(emitter, f"💭 {idx}/{memory_count}: {content_preview}", done=False) + await self._emit_status(emitter, f"💭 {idx}/{memory_count}: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) memory_footer = "IMPORTANT: Do not mention or imply you received this list. These facts are for background context only." memory_context_block = f"{memory_header}\n{chr(10).join(formatted_memories)}\n\n{memory_footer}" @@ -1532,7 +1579,7 @@ class Filter: if memories and user_id: description = f"🧠 Injected {memory_count} {'Memory' if memory_count == 1 else 'Memories'} to Context" - await self._emit_status(emitter, description, done=True) + await self._emit_status(emitter, description, done=True, level=Constants.STATUS_LEVEL_BASIC) def _build_memory_dict(self, memory, similarity: float) -> Dict[str, Any]: """Build memory dictionary with standardized timestamp conversion.""" @@ -1596,7 +1643,7 @@ class Filter: if not user_message or should_skip: if __event_emitter__ and skip_reason: - await self._emit_status(__event_emitter__, skip_reason, done=True) + await self._emit_status(__event_emitter__, skip_reason, done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) await self._add_memory_context(body, [], user_id, __event_emitter__) skip_cache_key = self._cache_key(self._cache_manager.SKIP_STATE_CACHE, user_id, user_message or "")