diff --git a/memory_system.py b/memory_system.py index 1de4caa..cf4dc53 100644 --- a/memory_system.py +++ b/memory_system.py @@ -50,8 +50,8 @@ class Constants: # Retrieval & Similarity SEMANTIC_RETRIEVAL_THRESHOLD = 0.25 # Semantic similarity threshold for retrieval RELAXED_SEMANTIC_THRESHOLD_MULTIPLIER = 0.8 # Multiplier for relaxed similarity threshold in secondary operations - EXTENDED_MAX_MEMORY_MULTIPLIER = 1.6 # Multiplier for expanding memory candidates in advanced operations - LLM_RERANKING_TRIGGER_MULTIPLIER = 0.8 # Multiplier for LLM reranking trigger threshold + EXTENDED_MAX_MEMORY_MULTIPLIER = 1.5 # Multiplier for expanding memory candidates in advanced operations + LLM_RERANKING_TRIGGER_MULTIPLIER = 0.5 # Multiplier for LLM reranking trigger threshold # Skip Detection SKIP_CATEGORY_MARGIN = 0.20 # Margin above personal similarity for skip category classification @@ -64,17 +64,8 @@ class Constants: # Content Display CONTENT_PREVIEW_LENGTH = 100 # Maximum length for content preview display - # Status Emit Levels - STATUS_LEVEL_BASIC = 0 # Maps to "Basic" - Show only summary counts - STATUS_LEVEL_INTERMEDIATE = 1 # Maps to "Intermediate" - Show summaries and key details - STATUS_LEVEL_DETAILED = 2 # Maps to "Detailed" - Show everything including full diagnostics - - # Mapping from enum string values to numeric levels for comparison - STATUS_LEVEL_MAP = { - "Basic": 0, - "Intermediate": 1, - "Detailed": 2, - } + # Status Emit Levels (maps enum string values to numeric levels for comparison) + STATUS_LEVEL = {"Basic": 0, "Intermediate": 1, "Detailed": 2} class Prompts: @@ -627,7 +618,7 @@ class SkipDetector: fast_skip = self._fast_path_skip_detection(message) if fast_skip: - logger.info(f"Fast-path skip: {self.SkipReason.SKIP_NON_PERSONAL.value}") + logger.info(f"โšก Fast-path skip: {self.SkipReason.SKIP_NON_PERSONAL.value}") return self.SkipReason.SKIP_NON_PERSONAL.value if self._reference_embeddings is None: @@ -736,20 +727,20 @@ CANDIDATE MEMORIES: emitter, f"๐Ÿค– LLM Analyzing {len(llm_candidates)} Memories for Relevance", done=False, - level=Constants.STATUS_LEVEL_INTERMEDIATE, + level=Constants.STATUS_LEVEL["Intermediate"], ) - logger.info(f"Using LLM reranking: {decision_reason}") + logger.info(f"๐Ÿง  Using LLM reranking: {decision_reason}") selected_memories = await self._llm_select_memories(user_message, llm_candidates, max_injection, emitter) if not selected_memories: logger.info("๐Ÿ“ญ No relevant memories after LLM analysis") await self.memory_system._emit_status( - emitter, f"๐Ÿ“ญ No Relevant Memories After LLM Analysis", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE + emitter, f"๐Ÿ“ญ No Relevant Memories After LLM Analysis", done=True, level=Constants.STATUS_LEVEL["Intermediate"] ) return selected_memories, analysis_info else: - logger.info(f"Skipping LLM reranking: {decision_reason}") + logger.info(f"โฉ Skipping LLM reranking: {decision_reason}") selected_memories = candidate_memories[:max_injection] duration = time.time() - start_time @@ -759,7 +750,7 @@ CANDIDATE MEMORIES: emitter, f"๐ŸŽฏ {retrieval_method} Memory Retrieval Complete{duration_text}", done=True, - level=Constants.STATUS_LEVEL_DETAILED, + level=Constants.STATUS_LEVEL["Detailed"], ) return selected_memories, analysis_info @@ -816,7 +807,7 @@ class LLMConsolidationService: if cached_similarities: candidates, threshold_info = self._filter_consolidation_candidates(cached_similarities) - logger.info(f"๐ŸŽฏ Found {len(candidates)} candidate memories for consolidation (threshold: {threshold_info})") + logger.info(f"๐ŸŽฏ Found {len(candidates)} cached candidates for consolidation (threshold: {threshold_info})") self.memory_system._log_retrieved_memories(candidates, "consolidation") return candidates @@ -826,19 +817,19 @@ class LLMConsolidationService: except asyncio.TimeoutError: raise TimeoutError(f"โฑ๏ธ Memory retrieval timed out after {Constants.DATABASE_OPERATION_TIMEOUT_SEC}s") except Exception as e: - logger.error(f"๐Ÿ’พ Failed to retrieve user memories from database: {str(e)}") + logger.error(f"๐Ÿ’พ Failed to retrieve user memories: {str(e)}") return [] if not user_memories: logger.info("๐Ÿ’ญ No existing memories found for consolidation") return [] - logger.info(f"๐Ÿš€ Reusing cached user memories for consolidation: {len(user_memories)} memories") + logger.info(f"๐Ÿš€ Processing {len(user_memories)} cached memories for consolidation") try: all_similarities, _, _ = await self.memory_system._compute_similarities(user_message, user_id, user_memories) except Exception as e: - logger.error(f"๐Ÿ” Failed to compute memory similarities for retrieval: {str(e)}") + logger.error(f"๐Ÿ” Failed to compute memory similarities: {str(e)}") return [] if all_similarities: @@ -847,7 +838,7 @@ class LLMConsolidationService: candidates = [] threshold_info = "N/A" - logger.info(f"๐ŸŽฏ Found {len(candidates)} candidate memories for consolidation (threshold: {threshold_info})") + logger.info(f"๐ŸŽฏ Found {len(candidates)} candidates for consolidation (threshold: {threshold_info})") self.memory_system._log_retrieved_memories(candidates, "consolidation") @@ -881,7 +872,7 @@ class LLMConsolidationService: ) except Exception as e: logger.warning(f"๐Ÿค– LLM consolidation failed during memory processing: {str(e)}") - await self.memory_system._emit_status(emitter, f"โš ๏ธ Memory Consolidation Failed", done=True, level=Constants.STATUS_LEVEL_BASIC) + await self.memory_system._emit_status(emitter, f"โš ๏ธ Memory Consolidation Failed", done=True, level=Constants.STATUS_LEVEL["Basic"]) return [] operations = response.ops @@ -906,14 +897,14 @@ class LLMConsolidationService: continue if op.operation == Models.MemoryOperationType.UPDATE and op.id in seen_update_ids: - logger.info(f"โญ๏ธ Skipping duplicate UPDATE for memory {op.id} in LLM response") + logger.info(f"โญ๏ธ Skipping duplicate UPDATE for memory {op.id}") continue if op.operation in [Models.MemoryOperationType.CREATE, Models.MemoryOperationType.UPDATE]: normalized_content = op.content.strip().lower() if normalized_content in seen_contents: op_type = "CREATE" if op.operation == Models.MemoryOperationType.CREATE else f"UPDATE {op.id}" - logger.info(f"โญ๏ธ Skipping duplicate {op_type} in LLM response: {self.memory_system._truncate_content(op.content)}") + logger.info(f"โญ๏ธ Skipping duplicate {op_type}: {self.memory_system._truncate_content(op.content)}") continue seen_contents.add(normalized_content) @@ -930,9 +921,9 @@ class LLMConsolidationService: operation_details = self.memory_system._build_operation_details(create_count, update_count, delete_count) - logger.info(f"๐ŸŽฏ Planned {len(valid_operations)} memory operations: {', '.join(operation_details)}") + logger.info(f"๐ŸŽฏ Planned {len(valid_operations)} operations: {', '.join(operation_details)}") else: - logger.info("๐ŸŽฏ No valid memory operations planned") + logger.info("๐ŸŽฏ No valid operations planned") return valid_operations @@ -954,17 +945,11 @@ class LLMConsolidationService: if duplicate_id: if operation_type == "UPDATE" and delete_operations is not None: - logger.info( - f"๐Ÿ”„ UPDATE creates duplicate: keeping enriched content from memory {operation.id}, " f"deleting duplicate memory {duplicate_id}" - ) + logger.info(f"๐Ÿ”„ UPDATE creates duplicate: keeping {operation.id}, deleting {duplicate_id}") deduplicated.append(operation) delete_operations.append(Models.MemoryOperation(operation=Models.MemoryOperationType.DELETE, content="", id=duplicate_id)) else: - logger.info( - f"โญ๏ธ Skipping duplicate {operation_type}: " - f"{self.memory_system._truncate_content(operation.content)} " - f"(matches memory {duplicate_id})" - ) + logger.info(f"โญ๏ธ Skipping duplicate {operation_type}: {self.memory_system._truncate_content(operation.content)} (matches {duplicate_id})") continue deduplicated.append(operation) @@ -1026,36 +1011,36 @@ class LLMConsolidationService: if isinstance(result, Exception): failed_count += 1 - await self.memory_system._emit_status(emitter, f"โŒ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self.memory_system._emit_status(emitter, f"โŒ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) elif result == Models.MemoryOperationType.CREATE.value: created_count += 1 content_preview = self.memory_system._truncate_content(operation.content) - await self.memory_system._emit_status(emitter, f"๐Ÿ“ Created: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self.memory_system._emit_status(emitter, f"๐Ÿ“ Created: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) elif result == Models.MemoryOperationType.UPDATE.value: updated_count += 1 content_preview = self.memory_system._truncate_content(operation.content) - await self.memory_system._emit_status(emitter, f"โœ๏ธ Updated: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self.memory_system._emit_status(emitter, f"โœ๏ธ Updated: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) elif result == Models.MemoryOperationType.DELETE.value: deleted_count += 1 content_preview = memory_contents_for_deletion.get(operation.id, operation.id) if content_preview and content_preview != operation.id: content_preview = self.memory_system._truncate_content(content_preview) - await self.memory_system._emit_status(emitter, f"๐Ÿ—‘๏ธ Deleted: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self.memory_system._emit_status(emitter, f"๐Ÿ—‘๏ธ Deleted: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) elif result in [ Models.OperationResult.FAILED.value, Models.OperationResult.UNSUPPORTED.value, ]: failed_count += 1 - await self.memory_system._emit_status(emitter, f"โŒ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self.memory_system._emit_status(emitter, f"โŒ Failed {operation_type}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) total_executed = created_count + updated_count + deleted_count logger.info( - f"โœ… Memory processing completed {total_executed}/{len(operations)} operations (Created {created_count}, Updated {updated_count}, Deleted {deleted_count}, Failed {failed_count})" + f"โœ… Memory processing completed: {total_executed}/{len(operations)} ops (created: {created_count}, updated: {updated_count}, deleted: {deleted_count}, failed: {failed_count})" ) if total_executed > 0: operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count) - logger.info(f"๐Ÿ”„ Memory Operations: {', '.join(operation_details)}") + logger.info(f"๐Ÿ”„ Memory operations: {', '.join(operation_details)}") await self.memory_system._refresh_user_cache(user_id) return created_count, updated_count, deleted_count, failed_count @@ -1085,7 +1070,7 @@ class LLMConsolidationService: created_count, updated_count, deleted_count, failed_count = await self.execute_memory_operations(operations, user_id, emitter) duration = time.time() - start_time - logger.info(f"๐Ÿ’พ Memory Consolidation Complete In {duration:.2f}s") + logger.info(f"๐Ÿ’พ Memory consolidation complete in {duration:.2f}s") total_operations = created_count + updated_count + deleted_count if total_operations > 0 or failed_count > 0: @@ -1093,7 +1078,7 @@ class LLMConsolidationService: emitter, f"๐Ÿ’พ Memory Consolidation Complete in {duration:.2f}s", done=False, - level=Constants.STATUS_LEVEL_DETAILED, + level=Constants.STATUS_LEVEL["Detailed"], ) operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count) @@ -1103,14 +1088,14 @@ class LLMConsolidationService: if failed_count > 0: operations_summary += f" (โŒ {failed_count} Failed)" - await self.memory_system._emit_status(emitter, operations_summary, done=True, level=Constants.STATUS_LEVEL_BASIC) + await self.memory_system._emit_status(emitter, operations_summary, done=True, level=Constants.STATUS_LEVEL["Basic"]) else: duration = time.time() - start_time await self.memory_system._emit_status( emitter, f"โœ… Consolidation Complete: No Updates Needed", done=True, - level=Constants.STATUS_LEVEL_DETAILED, + level=Constants.STATUS_LEVEL["Detailed"], ) except Exception as e: @@ -1195,7 +1180,7 @@ class Filter: if self._embedding_function is None and hasattr(__request__.app.state, "EMBEDDING_FUNCTION"): self._embedding_function = __request__.app.state.EMBEDDING_FUNCTION - logger.info(f"โœ… Using OpenWebUI's embedding function") + logger.info("โœ… Using OpenWebUI embedding function") if self._embedding_function and self._embedding_dimension is None: async with self._initialization_lock: @@ -1213,7 +1198,7 @@ class Filter: logger.info(f"โ™ป๏ธ Reusing cached skip detector: {cache_key}") self._skip_detector = _SHARED_SKIP_DETECTOR_CACHE[cache_key] else: - logger.info(f"๐Ÿค– Initializing skip detector with OpenWebUI embeddings: {cache_key}") + logger.info(f"๐Ÿค– Initializing skip detector: {cache_key}") embedding_fn = self._embedding_function normalize_fn = self._normalize_embedding @@ -1228,7 +1213,7 @@ class Filter: self._skip_detector = SkipDetector(embedding_wrapper) await self._skip_detector.initialize() _SHARED_SKIP_DETECTOR_CACHE[cache_key] = self._skip_detector - logger.info(f"โœ… Skip detector initialized and cached") + logger.info("โœ… Skip detector initialized and cached") def _truncate_content(self, content: str, max_length: Optional[int] = None) -> str: """Truncate content with ellipsis if needed.""" @@ -1359,11 +1344,11 @@ class Filter: if is_single: if uncached_texts: - logger.info("๐Ÿ’พ User message embedding: generated and cached") + logger.info("๐Ÿ’พ User message embedding generated and cached") return result_embeddings[0] else: valid_count = sum(1 for emb in result_embeddings if emb is not None) - logger.info(f"๐Ÿš€ Batch embedding: {len(text_list) - len(uncached_texts)} cached, {len(uncached_texts)} new, {valid_count}/{len(text_list)} valid") + logger.info(f"๐Ÿš€ Batch embeddings: {len(text_list) - len(uncached_texts)} cached, {len(uncached_texts)} new, {valid_count}/{len(text_list)} valid") return result_embeddings async def _should_skip_memory_operations(self, user_message: str) -> Tuple[bool, str]: @@ -1457,7 +1442,7 @@ class Filter: if not emitter: return - current_level_value = Constants.STATUS_LEVEL_MAP.get(self.valves.status_emit_level, 1) + current_level_value = Constants.STATUS_LEVEL.get(self.valves.status_emit_level, 1) if current_level_value < level: return @@ -1478,7 +1463,7 @@ class Filter: """Retrieve memories for injection using similarity computation with optional LLM reranking.""" if cached_similarities is not None: memories = [m for m in cached_similarities if m.get("relevance", 0) >= self.valves.semantic_retrieval_threshold] - logger.info(f"๐Ÿ” Using cached similarities for {len(memories)} candidate memories") + logger.info(f"๐Ÿ” Using cached similarities: {len(memories)} candidates") final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter) self._log_retrieved_memories(final_memories, "semantic") return { @@ -1493,7 +1478,7 @@ class Filter: if not user_memories: logger.info("๐Ÿ“ญ No memories found for user") - await self._emit_status(emitter, "๐Ÿ“ญ No Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self._emit_status(emitter, "๐Ÿ“ญ No Memories Found", done=True, level=Constants.STATUS_LEVEL["Intermediate"]) return {"memories": [], "threshold": None} memories, threshold, all_similarities = await self._compute_similarities(user_message, user_id, user_memories) @@ -1502,7 +1487,7 @@ class Filter: final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter) else: logger.info("๐Ÿ“ญ No relevant memories found above similarity threshold") - await self._emit_status(emitter, "๐Ÿ“ญ No Relevant Memories Found", done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self._emit_status(emitter, "๐Ÿ“ญ No Relevant Memories Found", done=True, level=Constants.STATUS_LEVEL["Intermediate"]) final_memories = memories reranking_info = {"llm_decision": False, "decision_reason": "no_candidates"} @@ -1536,7 +1521,7 @@ class Filter: formatted_memories.append(formatted_memory) content_preview = self._truncate_content(memory["content"]) - await self._emit_status(emitter, f"๐Ÿ’ญ {idx}/{memory_count}: {content_preview}", done=False, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self._emit_status(emitter, f"๐Ÿ’ญ {idx}/{memory_count}: {content_preview}", done=False, level=Constants.STATUS_LEVEL["Intermediate"]) memory_footer = "IMPORTANT: Do not mention or imply you received this list. These facts are for background context only." memory_context_block = f"{memory_header}\n{chr(10).join(formatted_memories)}\n\n{memory_footer}" @@ -1556,7 +1541,7 @@ class Filter: if memories and user_id: description = f"๐Ÿง  Injected {memory_count} {'Memory' if memory_count == 1 else 'Memories'} to Context" - await self._emit_status(emitter, description, done=True, level=Constants.STATUS_LEVEL_BASIC) + await self._emit_status(emitter, description, done=True, level=Constants.STATUS_LEVEL["Basic"]) def _build_memory_dict(self, memory, similarity: float) -> Dict[str, Any]: """Build memory dictionary with standardized timestamp conversion.""" @@ -1624,7 +1609,7 @@ class Filter: if not user_message or should_skip: if __event_emitter__ and skip_reason: - await self._emit_status(__event_emitter__, skip_reason, done=True, level=Constants.STATUS_LEVEL_INTERMEDIATE) + await self._emit_status(__event_emitter__, skip_reason, done=True, level=Constants.STATUS_LEVEL["Intermediate"]) await self._add_memory_context(body, [], user_id, __event_emitter__) return body try: @@ -1679,7 +1664,7 @@ class Filter: should_skip = await self._cache_manager.get(user_id, self._cache_manager.SKIP_STATE_CACHE, skip_cache_key) if should_skip: - logger.info("โญ๏ธ Skipping outlet consolidation: inlet already detected skip condition") + logger.info("โญ๏ธ Skipping outlet: inlet detected skip condition") return body retrieval_cache_key = self._cache_key(self._cache_manager.RETRIEVAL_CACHE, user_id, user_message) @@ -1692,9 +1677,9 @@ class Filter: self._background_tasks.discard(t) if t.exception() and not t.cancelled(): exception = t.exception() - logger.error(f"โŒ Background memory consolidation task failed: {str(exception)}") + logger.error(f"โŒ Background consolidation failed: {str(exception)}") except Exception as e: - logger.error(f"โŒ Failed to cleanup background memory task: {str(e)}") + logger.error(f"โŒ Failed to cleanup background task: {str(e)}") task.add_done_callback(safe_cleanup) return body @@ -1716,9 +1701,7 @@ class Filter: retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE) embedding_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.EMBEDDING_CACHE) skip_state_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.SKIP_STATE_CACHE) - logger.info( - f"๐Ÿ”„ Cleared {retrieval_cleared} retrieval + {embedding_cleared} embedding + {skip_state_cleared} skip state cache entries for user {user_id}" - ) + logger.info(f"๐Ÿ”„ Cleared cache: {retrieval_cleared} retrieval, {embedding_cleared} embedding, {skip_state_cleared} skip entries") user_memories = await self._get_user_memories(user_id) memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id) @@ -1740,7 +1723,7 @@ class Filter: if memory_contents: await self._generate_embeddings(memory_contents, user_id) duration = time.time() - start_time - logger.info(f"๐Ÿ”„ Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s") + logger.info(f"๐Ÿ”„ Cache refreshed: {len(memory_contents)} embeddings in {duration:.2f}s") except Exception as e: raise RuntimeError(f"๐Ÿงน Failed to refresh cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}")