Files
openwebui-memory-system/memory_system.py

1795 lines
99 KiB
Python
Raw Blame History

"""
title: Memory System
version: 1.0.0
"""
import asyncio
import hashlib
import json
import logging
import time
from collections import OrderedDict
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
from pydantic import BaseModel, ConfigDict, Field, ValidationError as PydanticValidationError
from sentence_transformers import SentenceTransformer
from open_webui.utils.chat import generate_chat_completion
from open_webui.models.users import Users
from open_webui.routers.memories import Memories
from fastapi import Request
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
logger = logging.getLogger("MemorySystem")
_SHARED_MODEL_CACHE = {}
class Constants:
"""Centralized configuration constants for the memory system."""
# Core System Limits
MAX_MEMORY_CONTENT_CHARS = 500 # Character limit for LLM prompt memory content
MAX_MEMORIES_PER_RETRIEVAL = 10 # Maximum memories returned per query
MAX_MESSAGE_CHARS = 2500 # Maximum message length for validation
MIN_MESSAGE_CHARS = 10 # Minimum message length for validation
DATABASE_OPERATION_TIMEOUT_SEC = 10 # Timeout for DB operations like user lookup
LLM_CONSOLIDATION_TIMEOUT_SEC = 60.0 # Timeout for LLM consolidation operations
# Cache System
MAX_CACHE_ENTRIES_PER_TYPE = 5000 # Maximum cache entries per cache type
MAX_CONCURRENT_USER_CACHES = 500 # Maximum concurrent user cache instances
CACHE_KEY_HASH_PREFIX_LENGTH = 16 # Hash prefix length for cache keys
# Retrieval & Similarity
SEMANTIC_RETRIEVAL_THRESHOLD = 0.5 # Semantic similarity threshold for retrieval
RELAXED_SEMANTIC_THRESHOLD_MULTIPLIER = 0.9 # Multiplier for relaxed similarity threshold in secondary operations
EXTENDED_MAX_MEMORY_MULTIPLIER = 1.5 # Multiplier for expanding memory candidates in advanced operations
LLM_RERANKING_TRIGGER_MULTIPLIER = 0.5 # Multiplier for LLM reranking trigger threshold
# Skip Detection Thresholds
SKIP_DETECTION_SIMILARITY_THRESHOLD = 0.50 # Similarity threshold for skip category detection (tuned for zero-shot)
SKIP_DETECTION_MARGIN = 0.05 # Minimum margin required between skip and conversational similarity to skip
SKIP_DETECTION_CONFIDENT_MARGIN = 0.15 # Margin threshold for confident skips that trigger early exit
# Safety & Operations
MAX_DELETE_OPERATIONS_RATIO = 0.6 # Maximum delete operations ratio for safety
MIN_OPS_FOR_DELETE_RATIO_CHECK = 6 # Minimum operations to apply ratio check
# Content Display
CONTENT_PREVIEW_LENGTH = 80 # Maximum length for content preview display
# Default Models
DEFAULT_LLM_MODEL = "google/gemini-2.5-flash-lite"
DEFAULT_EMBEDDING_MODEL = "Alibaba-NLP/gte-multilingual-base"
class Prompts:
"""Container for all LLM prompts used in the memory system."""
MEMORY_CONSOLIDATION = f"""You are the Memory System Consolidator, a specialist in creating precise user memories.
## OBJECTIVE
Build precise memories of the user's personal narrative with factual, temporal statements.
## AVAILABLE OPERATIONS
- CREATE: For new, personal facts. Must be semantically and temporally enhanced.
- UPDATE: To modify existing memories, including making facts historical with a date range.
- DELETE: For explicit user requests or to resolve contradictions.
- SKIP: When no new, personal information is provided.
## PROCESSING GUIDELINES
- Personal Facts Only: Store only significant facts with lasting relevance to the user's life and identity. Exclude transient situations, questions, general knowledge, casual mentions, or momentary states.
- Maintain Temporal Accuracy:
- Capture Dates: Record temporal information when explicitly stated or clearly derivable. Convert relative references (last month, yesterday) to specific dates.
- Preserve History: Transform superseded facts into past-tense statements with defined time boundaries.
- Avoid Assumptions: Do not assign current dates to ongoing states, habits, or conditions lacking explicit temporal context.
- Build Rich Entities:
- Fuse Identifiers: Combine nouns/pronouns with specific names into a single entity.
- Capture Relationships: Always store relationships in first-person format with complete relationship context. Never store incomplete relationships, always specify with whom.
- Retroactive Enrichment: If a name is provided for prior entity, UPDATE only if substantially valuable.
- Ensure Memory Quality:
- High Bar for Creation: Only CREATE memories for significant life facts, relationships, events, or core personal attributes. Skip trivial details or passing interests.
- Contextual Completeness: Create memories that combine related information into cohesive statements. When multiple facts share connections (same topic, person, event, or timeframe), group them into a single memory rather than fragmenting. Include relevant supporting details that help understand the core fact while respecting boundaries. Only combine facts that are directly related and belong together naturally. Avoid bare statements lacking context and never merge unrelated information.
- Mandatory Semantic Enhancement: Enhance entities with descriptive categorical nouns for better retrieval.
- Verify Nouns/Pronouns: Link pronouns (he, she, they) and nouns to specific entities.
- First-Person Format: Write all memories in English from the user's perspective.
## DECISION FRAMEWORK
- Selectivity: Verify the user is stating a direct, personally significant fact with lasting importance. If not, SKIP. Never create duplicate memories. Skip momentary events or casual mentions. Be conservative with CREATE and UPDATE operations.
- Strategy: Strongly prioritize enriching existing memories over creating new ones. Analyze the message holistically to identify naturally connected facts that should be captured together. When facts share connections (same person, event, situation, or causal relationship), combine them into a unified memory that preserves the complete picture. Each memory should be self-contained and meaningful.
- Execution: For new significant facts, use CREATE. For simple attribute changes, use UPDATE only if it meaningfully improves the memory. For significant changes, use UPDATE to make the old memory historical, then CREATE the new one. For contradictions, use DELETE.
## EXAMPLES (Assumes Current Date: September 15, 2025)
### Example 1
Message: "My wife Sarah loves hiking and outdoor activities. She has an active lifestyle and enjoys rock climbing. I started this new hobby last month and it's been great."
Memories: []
Return: {{"ops": [{{"operation": "CREATE", "id": "", "content": "My wife Sarah has an active lifestyle and enjoys hiking, outdoor activities, and rock climbing"}}, {{"operation": "CREATE", "id": "", "content": "I started rock climbing in August 2025 as a new hobby and have been enjoying it"}}]}}
Explanation: Multiple facts about the same person (Sarah's active lifestyle, love for hiking, outdoor activities, and rock climbing) are combined into a single cohesive memory. The user's separate rock climbing hobby is kept as a distinct memory since it's about a different person.
### Example 2
Message: "My daughter Emma just turned 12. We adopted a dog named Max for her 11th birthday. What should I give her for her 12th birthday?"
Memories: [id:mem-002] My daughter Emma is 10 years old [noted at March 20 2024] [id:mem-101] I have a golden retriever [noted at September 20 2024]
Return: {{"ops": [{{"operation": "UPDATE", "id": "mem-002", "content": "My daughter Emma turned 12 years old in September 2025"}}, {{"operation": "UPDATE", "id": "mem-101", "content": "I have a golden retriever named Max that was adopted in September 2024 as a birthday gift for my daughter Emma when she turned 11"}}]}}
Explanation: Dog memory enriched with related context (Emma, birthday gift, age 11) and temporal anchoring (September 2024) - all semantically connected to the same event and relationship.
### Example 3
Message: "Can you recommend some good tapas restaurants in Barcelona? I moved here from Madrid last month."
Memories: [id:mem-005] I live in Madrid Spain [noted at June 12 2025]
Return: {{"ops": [{{"operation": "UPDATE", "id": "mem-005", "content": "I lived in Madrid Spain until August 2025"}}, {{"operation": "CREATE", "id": "", "content": "I moved to Barcelona Spain in August 2025"}}]}}
Explanation: Relocation is a significant life event with lasting impact. "Exploring the city" and "adjusting" are transient states and excluded.
### Example 4
Message: "My wife Sofia and I just got married in August. What are some good honeymoon destinations?"
Memories: [id:mem-008] I am single [noted at January 5 2025]
Return: {{"ops": [{{"operation": "DELETE", "id": "mem-008", "content": ""}}, {{"operation": "CREATE", "id": "", "content": "I married Sofia in August 2025 and she is now my wife"}}]}}
Explanation: Marriage is an enduring life event. Wife's name and marriage date are lasting facts combined naturally. "Planning honeymoon" is a transient activity and excluded.
### Example 5
Message: "¡Hola! Me mudé de Madrid a Barcelona el mes pasado y me casé con mi novia Sofía en agosto. ¿Me puedes recomendar un buen restaurante para celebrar?"
Memories: [id:mem-005] I live in Madrid Spain [noted at June 12 2025] [id:mem-006] I am dating Sofia [noted at February 10 2025] [id:mem-008] I am single [noted at January 5 2025]
Return: {{"ops": [{{"operation": "UPDATE", "id": "mem-005", "content": "I lived in Madrid Spain until August 2025"}}, {{"operation": "DELETE", "id": "mem-008", "content": ""}}, {{"operation": "UPDATE", "id": "mem-006", "content": "I moved to Barcelona Spain and married my girlfriend Sofia in August 2025, who is now my wife"}}]}}
Explanation: The user's move and marriage are significant, related life events that occurred in the same month. They are consolidated into a single, cohesive memory that enriches the existing relationship context.
### Example 6
Message: "I'm feeling stressed about work this week and looking for some relaxation tips. I have a big presentation coming up on Friday."
Memories: []
Return: {{"ops": []}}
Explanation: Temporary stress, seeking tips, and upcoming presentation are all transient situations without lasting personal significance. Nothing to store.
"""
MEMORY_RERANKING = f"""You are the Memory Relevance Analyzer.
## OBJECTIVE
Select relevant memories to personalize the response, prioritizing direct connections and supporting context.
## RELEVANCE CATEGORIES
- Direct: Memories explicitly about the query topic, people, or domain.
- Contextual: Personal info that affects response recommendations or understanding.
- Background: Situational context that provides useful personalization.
## SELECTION FRAMEWORK
- Prioritize Current Info: Give current facts higher relevance than historical ones unless the query is about the past or historical context directly informs the current situation.
- Hierarchy: Prioritize Direct → Contextual → Background.
- Ordering: Order IDs by relevance, most relevant first.
- Standard: Prioritize topic matches, then context that enhances the response.
- Maximum Limit: Return up to {Constants.MAX_MEMORIES_PER_RETRIEVAL} memory IDs.
## EXAMPLES (Assumes Current Date: September 15, 2025)
### Example 1
Message: "I'm struggling with imposter syndrome at my new job. Any advice?"
Memories: [id:mem-001] I work as a senior software engineer at Tesla [noted at September 10 2025] [id:mem-002] I started my current job 3 months ago [noted at June 15 2025] [id:mem-003] I used to work in marketing [noted at March 5 2025] [id:mem-004] I graduated with a computer science degree [noted at May 15 2020]
Return: {{"ids": ["mem-001", "mem-002", "mem-003", "mem-004"]}}
Explanation: Career transition history (marketing → software engineering) directly informs current imposter syndrome at new job, making historical context relevant.
### Example 2
Message: "Necesito ideas para una cena saludable y con muchas verduras esta noche."
Memories: [id:mem-030] I am trying a vegetarian diet [noted at September 20 2025] [id:mem-031] My favorite cuisine is Italian [noted at August 15 2025] [id:mem-032] I dislike spicy food [noted at August 5 2025]
Return: {{"ids": ["mem-030", "mem-031", "mem-032"]}}
Explanation: Vegetarian diet is directly relevant to healthy vegetable-focused dinner. Italian cuisine and spice preference provide contextual personalization for recipe recommendations.
### Example 3
Message: "What are some good anniversary gift ideas for my wife, Sarah?"
Memories: [id:mem-101] My wife is named Sarah. [id:mem-102] My wife Sarah loves hiking and mystery novels. [id:mem-103] My wedding anniversary with Sarah is in October. [id:mem-104] I am on a tight budget this month. [id:mem-105] I live in Denver. [id:mem-106] I have a golden retriever named Max.
Return: {{"ids": ["mem-102", "mem-103", "mem-101", "mem-104"]}}
Explanation: Wife's interests (hiking, mystery novels) are direct matches for gift suggestions. Anniversary timing and budget constraints are contextual factors. Location and pet are background details not relevant to gift selection.
### Example 4
Message: "I've been reading about quantum computing and I'm confused. Can you break down how quantum bits work differently from regular computer bits?"
Memories: [id:mem-026] I work as a senior software engineer at Tesla [noted at September 15 2025] [id:mem-027] My wife is named Sarah [noted at August 5 2025]
Return: {{"ids": []}}
Explanation: Query seeks general technical explanation without personal context. Job and family information don't affect how quantum computing concepts should be explained.
"""
class Models:
"""Container for all Pydantic models used in the memory system."""
class MemoryOperationType(Enum):
CREATE = "CREATE"
UPDATE = "UPDATE"
DELETE = "DELETE"
class OperationResult(Enum):
SKIPPED_EMPTY_CONTENT = "SKIPPED_EMPTY_CONTENT"
SKIPPED_EMPTY_ID = "SKIPPED_EMPTY_ID"
UNSUPPORTED = "UNSUPPORTED"
FAILED = "FAILED"
class StrictModel(BaseModel):
"""Base model with strict JSON schema for LLM structured output."""
model_config = ConfigDict(extra="forbid")
class MemoryOperation(StrictModel):
"""Pydantic model for memory operations with validation."""
operation: 'Models.MemoryOperationType' = Field(description="Type of memory operation to perform")
content: str = Field(description="Memory content (required for CREATE/UPDATE, empty for DELETE)")
id: str = Field(description="Memory ID (empty for CREATE, required for UPDATE/DELETE)")
def validate_operation(self, existing_memory_ids: Optional[set] = None) -> bool:
"""Validate the memory operation against existing memory IDs."""
if existing_memory_ids is None:
existing_memory_ids = set()
if self.operation == Models.MemoryOperationType.CREATE:
return True
elif self.operation in [Models.MemoryOperationType.UPDATE, Models.MemoryOperationType.DELETE]:
return self.id in existing_memory_ids
return False
class ConsolidationResponse(BaseModel):
"""Pydantic model for memory consolidation LLM response - object containing array of memory operations."""
ops: List['Models.MemoryOperation'] = Field(default_factory=list, description="List of memory operations to execute")
class MemoryRerankingResponse(BaseModel):
"""Pydantic model for memory reranking LLM response - object containing array of memory IDs."""
ids: List[str] = Field(default_factory=list, description="List of memory IDs selected as most relevant for the user query")
class UnifiedCacheManager:
"""Unified cache manager handling all cache types with user isolation and LRU eviction."""
def __init__(self, max_cache_size_per_type: int, max_users: int):
self.max_cache_size_per_type = max_cache_size_per_type
self.max_users = max_users
self.caches: OrderedDict[str, Dict[str, OrderedDict[str, Any]]] = OrderedDict()
self._lock = asyncio.Lock()
self.EMBEDDING_CACHE = "embedding"
self.RETRIEVAL_CACHE = "retrieval"
self.MEMORY_CACHE = "memory"
async def get(self, user_id: str, cache_type: str, key: str) -> Optional[Any]:
"""Get value from cache with LRU updates."""
async with self._lock:
if user_id not in self.caches:
return None
user_cache = self.caches[user_id]
if cache_type not in user_cache:
return None
type_cache = user_cache[cache_type]
if key in type_cache:
type_cache.move_to_end(key)
self.caches.move_to_end(user_id)
return type_cache[key]
return None
async def put(self, user_id: str, cache_type: str, key: str, value: Any) -> None:
"""Store value in cache with size limits and LRU eviction."""
async with self._lock:
if user_id not in self.caches:
if len(self.caches) >= self.max_users:
evicted_user, _ = self.caches.popitem(last=False)
self.caches[user_id] = {}
user_cache = self.caches[user_id]
if cache_type not in user_cache:
user_cache[cache_type] = OrderedDict()
type_cache = user_cache[cache_type]
if key not in type_cache and len(type_cache) >= self.max_cache_size_per_type:
evicted_key, _ = type_cache.popitem(last=False)
if key in type_cache:
type_cache[key] = value
type_cache.move_to_end(key)
else:
type_cache[key] = value
self.caches.move_to_end(user_id)
async def clear_user_cache(self, user_id: str, cache_type: Optional[str] = None) -> int:
"""Clear specific cache type for user, or all caches for user if cache_type is None."""
async with self._lock:
if user_id not in self.caches:
return 0
user_cache = self.caches[user_id]
if cache_type is None:
total_cleared = sum(len(type_cache) for type_cache in user_cache.values())
del self.caches[user_id]
return total_cleared
else:
if cache_type in user_cache:
cleared_count = len(user_cache[cache_type])
del user_cache[cache_type]
if not user_cache:
del self.caches[user_id]
return cleared_count
return 0
async def clear_all_caches(self) -> None:
"""Clear all caches for all users."""
async with self._lock:
self.caches.clear()
async def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
async with self._lock:
total_users = len(self.caches)
total_items = 0
cache_type_counts = {}
for user_id, user_cache in self.caches.items():
for cache_type, type_cache in user_cache.items():
cache_type_counts[cache_type] = cache_type_counts.get(cache_type, 0) + len(type_cache)
total_items += len(type_cache)
return {
"total_users": total_users,
"total_items": total_items,
"cache_type_counts": cache_type_counts,
"max_users": self.max_users,
"max_cache_size_per_type": self.max_cache_size_per_type,
}
class SkipDetector:
"""Semantic-based content classifier using zero-shot classification with category descriptions."""
TECHNICAL_CATEGORY_DESCRIPTIONS = [
"programming code with function definitions class declarations variable assignments import statements or syntax",
"error messages with stack traces exception types file paths line numbers or codes without personal context",
"system logs with timestamps severity levels status indicators like ERROR INFO WARN DEBUG",
"shell terminal commands with dollar signs or sudo git npm docker apt-get pip curl",
"structured data in JSON XML YAML CSV format with nested objects arrays brackets key-value pairs",
"technical documentation describing APIs configurations file formats system specifications protocols",
"formatted tables lists outputs with multiple rows columns headers structured entries",
"debugging output diagnostic information performance metrics system status build reports",
"impersonal technical question about how technology works programming concepts algorithms data structures protocols without my job project career",
"abstract question about technical comparisons theoretical computer science concepts without I am learning or personal development",
"programming algorithm implementation with complexity analysis O notation time space complexity algorithmic approach without personal project",
"technical explanation about software architecture design patterns microservices authentication systems backend without my work job",
"API endpoints HTTP methods request response formats REST GraphQL WebSocket protocols without I am building or personal application",
"code functionality behavior logic flow with technical terms without I am struggling I have trouble career anxiety",
"Windows file paths with backslashes C colon backslash Program Files drive letters",
"deployment scripts configuration files with multiple technical components paths URLs commands without personal story",
"error stack trace with file paths line numbers exception names NullPointerException SegmentationFault RuntimeError without personal debugging story",
"system error messages with codes status codes HTTP codes 404 500 Connection refused Timeout Exception without personal context",
"application logs with INFO WARN ERROR DEBUG levels timestamps entries without my application system",
"API query syntax GraphQL REST endpoint cURL command HTTP request without I am building my API",
"technical error output compilation errors runtime exceptions segmentation faults core dumps without personal project",
"code snippet with syntax highlighting language markers python javascript java without personal implementation story",
"React JSX component with angle brackets curly braces className props const Component equals arrow function return JSX",
"HTTP error status codes 404 Not Found 500 Internal Server Error 403 Forbidden with status messages",
"GraphQL query syntax with query mutation fragment type definitions curly brace field arguments",
"debug logs with DEBUG prefix timestamp log level output without personal application debugging story",
"command line with pipe operators curl pipe jq grep pipe sed awk without personal story",
"stack trace with arrow notation function1 arrow function2 arrow function3 call sequence without debugging my code",
"DEBUG log with request headers response details authorization content-type without my application debugging",
]
META_CONVERSATION_CATEGORY_DESCRIPTIONS = [
"acknowledgment response like thanks got it I understand makes sense helpful appreciate it",
"polite courtesy phrase like please excuse me sorry to bother you hope you are well no worries all good",
"agreement confirmation like yes correct absolutely I agree exactly right indeed totally",
"farewell closing like goodbye see you later talk soon have a good day take care bye",
"extended thanks like thank you so much appreciate your help grateful for assistance thanks again",
"clarification about previous messages like sorry for confusion let me clarify what I meant I should have been more specific apologize for unclear question",
"feedback about conversation like that was helpful your explanation was clear I appreciate the detailed response exceeded expectations",
"meta discussion about asking questions like I will try to be more specific next time sorry for vague question I should provide more context",
"simple agreement like absolutely exactly totally yes indeed right completely agree",
"brief acknowledgment like got it understood makes sense I see okay cool sounds good",
"simple strong agreement like absolutely agree totally agree exactly right yes indeed completely right",
"brief positive acknowledgment like I hope so fingers crossed hopefully that works hope it helps",
"extended thanks with multiple points like thank you for A B C appreciate detailed help grateful for assistance",
]
FACTUAL_QUERY_CATEGORY_DESCRIPTIONS = [
"definition question asking what is something what does term mean explain concept define word without personal context",
"factual information request about dates events history geography science facts trivia without personal relevance application learning goal",
"general how-to question asking for instructions steps process recipe procedure without personal needs circumstances projects",
"theoretical explanation request about why how things work in general abstract concepts principles without personal application career",
"comparison question asking differences between options technologies concepts products services without personal preference situation decision job requirement",
"who what when where question about historical figures famous people events discoveries inventions without personal connection story",
"multiple questions about same topic with numbered list several parts complex multi-part query with first second third bullet points",
"academic theoretical question with multiple sub-questions breaking down topic into components analyzing from different angles without personal project work",
"scientific explanation request about natural phenomena physics chemistry biology astronomy without personal research study career",
"general knowledge query about capitals countries populations currencies geography facts without travel plans personal interest job relevance",
"abstract technology comparison like difference between Python and Java without I am choosing or I work with",
"theoretical programming question about best practices clean code principles without my project codebase work situation",
"what is definition question like what is photosynthesis blockchain what does term mean explain concept",
"when did historical question like when did event happen when was invention when did person live",
"how to general instruction like how to tie tie change tire wash clothes without personal need",
"explain how works question like explain how blockchain works neural networks quantum mechanics without personal learning goal",
"abstract comparison without personal choice like difference between capitalism socialism what is better Python or Java",
"short when question about past events dates like when did happen invention discovery",
]
OUTPUT_FORMATTING_CATEGORY_DESCRIPTIONS = [
"instruction to format output as JSON YAML CSV table list markdown code block or specific data structure",
"request to adjust response style length like make it shorter longer simpler more detailed use bullet points numbered list",
"command to rewrite rephrase translate summarize previous response output or answer",
"request to change tone presentation like be more formal casual technical professional explain like I am five years old",
"tone adjustment like use professional tone formal language business corporate style",
]
PURE_MATH_CALCULATION_CATEGORY_DESCRIPTIONS = [
"arithmetic calculation with explicit numbers like calculate 15 percent of 250 or solve 45 times 67",
"mathematical expression evaluation with operators like 2 plus 3 times 4 divided by 5 minus 6 or what is 123 times 456",
"unit conversion with specific values like convert 100 kilometers to miles or 72 fahrenheit to celsius",
"percentage calculation with explicit numbers like what is 25 percent of 800 or discount price of 120 minus 30 percent",
"algebra equation solving with explicit numbers like solve for x in equation 2x plus 5 equals 15",
"geometry calculation with specific measurements like area of circle radius 5 or volume of cube side 10",
"numerical computation like square root of 144 or 15 plus 23 minus 8 times 2",
]
EXPLICIT_TRANSLATION_CATEGORY_DESCRIPTIONS = [
"translation instruction with text to translate like translate this to Spanish colon Hello how are you in quotes or brackets",
"translation request with provided phrase like how do you say quoted phrase good morning in French with explicit text",
"language conversion with text block like convert this English text to Japanese here is the text followed by content",
"phrase translation with quoted or bracketed text like translate I am hungry to Mandarin with explicit phrase",
"sentence translation with actual text like what is Spanish translation of quoted sentence or how to say specific phrase in Italian",
"text conversion with source content like translate following paragraph to Portuguese colon followed by actual text",
"translation with colon separator like Translate to German colon followed by English sentence or text to convert",
"how do you say question with specific word or phrase like how do you say computer in Russian or what is hello in French",
"translate to language instruction with target language like translate to Italian Japanese Portuguese",
"language translation with explicit source text in quotes brackets or after colon separator",
]
GRAMMAR_PROOFREADING_CATEGORY_DESCRIPTIONS = [
"proofreading request with incorrect text like fix grammar in this text here is my draft check for typos in quoted text",
"grammar correction with specific wrong text or sentence like She don't like",
"spelling punctuation check with specific text to review and fix errors in provided passage",
"copy editing with text like proofread this paragraph correct errors in sentence fix mistakes in text block",
"error correction like check this text for mistakes or review this sentence for grammar problems with text included",
"correction with misspelled or grammatically incorrect text like Their going too the store or Me and him went",
"typo fixing with text containing errors like Teh quick brown fox or check spelling in this paragraph",
"sentence correction with wrong grammar like fix this I has three book or correct the punctuation in this text",
"check grammar instruction with text to review like check grammar in this or correct grammar in following sentence",
"proofreading with specific errors like multiple typos spelling mistakes punctuation errors in provided text passage",
]
CONVERSATIONAL_CATEGORY_DESCRIPTIONS = [
"statement about family members by name mentioning spouse children parents siblings relatives with specific names or roles",
"expression of lasting personal feelings emotions core preferences values beliefs or dislikes about life situations",
"description of established personal hobbies regular activities consistent interests or meaningful pursuits the person does",
"significant career information about current job specific workplace company name professional role or work situation",
"major life plans important personal goals long-term aspirations meaningful future intentions or life decisions",
"personal decision experience choice about important life matters relationships family or individual circumstances",
"meaningful personal story memory reflection about significant past life experiences or events",
"personal background information about hometown childhood education cultural heritage or formative life experiences",
"health information about medical conditions treatments ongoing health situations physical attributes or personal wellness",
"personal question seeking advice about specific individual life situations relationships family decisions or personal circumstances",
"request for recommendations based on stated personal context preferences needs situation location or individual requirements",
"learning statement expressing personal interest in understanding something new as part of career transition or personal development",
"question about helping family member child spouse or relative with their interests education or personal needs",
"statement about personal challenges struggles confusion with work tasks technology language learning or skill development in personal context",
"expression of personal difficulty with writing grammar language skills at job workplace or in professional setting",
"personal language learning like I am learning Spanish for move or taking French lessons for job with personal motivation",
"studying language for personal reason like learning Mandarin because my wife speaks it or studying German for university",
"personal tech struggle at work like I am having trouble with React at my job or confused about framework at my workplace",
"career anxiety or stress about technology skills like struggling with imposter syndrome feeling overwhelmed at new technical job",
"personal request for help with specific technology problem at job workplace or in personal project with named context",
"planning party celebration event for my child family member with specific personal context like my daughter birthday my son graduation",
"learning for my course my class my school with personal educational context like understanding quantum computing for my physics course",
"help me understand technology for my personal learning my education my certification with career development or school context",
]
class SkipReason(Enum):
SKIP_SIZE = "SKIP_SIZE"
SKIP_TECHNICAL = "SKIP_TECHNICAL"
SKIP_META = "SKIP_META"
SKIP_FACTUAL_QUERY = "SKIP_FACTUAL_QUERY"
SKIP_OUTPUT_FORMATTING = "SKIP_OUTPUT_FORMATTING"
SKIP_PURE_MATH = "SKIP_PURE_MATH"
SKIP_TRANSLATION = "SKIP_TRANSLATION"
SKIP_GRAMMAR_PROOFREAD = "SKIP_GRAMMAR_PROOFREAD"
STATUS_MESSAGES = {
SkipReason.SKIP_SIZE: "📏 Message Length Out of Limits, skipping memory operations",
SkipReason.SKIP_TECHNICAL: "💻 Technical Content Detected, skipping memory operations",
SkipReason.SKIP_META: "💬 Conversational Filler Detected, skipping memory operations",
SkipReason.SKIP_FACTUAL_QUERY: "📚 General Knowledge Query Detected, skipping memory operations",
SkipReason.SKIP_OUTPUT_FORMATTING: "🎨 Format Instruction Detected, skipping memory operations",
SkipReason.SKIP_PURE_MATH: "🔢 Mathematical Calculation Detected, skipping memory operations",
SkipReason.SKIP_TRANSLATION: "🌐 Translation Request Detected, skipping memory operations",
SkipReason.SKIP_GRAMMAR_PROOFREAD: "📝 Grammar/Proofreading Request Detected, skipping memory operations",
}
def __init__(self, embedding_model: SentenceTransformer):
"""Initialize the skip detector with an embedding model and compute reference embeddings."""
self.embedding_model = embedding_model
self._reference_embeddings = None
self._initialize_reference_embeddings()
def _initialize_reference_embeddings(self) -> None:
"""Compute and cache embeddings for category descriptions."""
try:
technical_embeddings = self.embedding_model.encode(
self.TECHNICAL_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
meta_embeddings = self.embedding_model.encode(
self.META_CONVERSATION_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
factual_query_embeddings = self.embedding_model.encode(
self.FACTUAL_QUERY_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
output_formatting_embeddings = self.embedding_model.encode(
self.OUTPUT_FORMATTING_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
pure_math_embeddings = self.embedding_model.encode(
self.PURE_MATH_CALCULATION_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
translation_embeddings = self.embedding_model.encode(
self.EXPLICIT_TRANSLATION_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
grammar_embeddings = self.embedding_model.encode(
self.GRAMMAR_PROOFREADING_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
conversational_embeddings = self.embedding_model.encode(
self.CONVERSATIONAL_CATEGORY_DESCRIPTIONS,
convert_to_tensor=True,
show_progress_bar=False
)
self._reference_embeddings = {
'technical': technical_embeddings,
'meta': meta_embeddings,
'factual_query': factual_query_embeddings,
'output_formatting': output_formatting_embeddings,
'pure_math': pure_math_embeddings,
'translation': translation_embeddings,
'grammar': grammar_embeddings,
'conversational': conversational_embeddings,
}
total_skip_categories = (
len(self.TECHNICAL_CATEGORY_DESCRIPTIONS) +
len(self.META_CONVERSATION_CATEGORY_DESCRIPTIONS) +
len(self.FACTUAL_QUERY_CATEGORY_DESCRIPTIONS) +
len(self.OUTPUT_FORMATTING_CATEGORY_DESCRIPTIONS) +
len(self.PURE_MATH_CALCULATION_CATEGORY_DESCRIPTIONS) +
len(self.EXPLICIT_TRANSLATION_CATEGORY_DESCRIPTIONS) +
len(self.GRAMMAR_PROOFREADING_CATEGORY_DESCRIPTIONS)
)
logger.info(f"SkipDetector initialized with {total_skip_categories} skip categories and {len(self.CONVERSATIONAL_CATEGORY_DESCRIPTIONS)} personal categories")
except Exception as e:
logger.error(f"Failed to initialize SkipDetector reference embeddings: {e}")
self._reference_embeddings = None
def validate_message_size(self, message: str, max_message_chars: int) -> Optional[str]:
"""Validate message size constraints."""
if not message or not message.strip():
return SkipDetector.SkipReason.SKIP_SIZE.value
trimmed = message.strip()
if len(trimmed) < Constants.MIN_MESSAGE_CHARS or len(trimmed) > max_message_chars:
return SkipDetector.SkipReason.SKIP_SIZE.value
return None
def _fast_path_skip_detection(self, message: str) -> Optional[str]:
"""Language-agnostic structural pattern detection with high confidence and low false positive rate."""
msg_len = len(message)
# Pattern 1: Multiple URLs (5+ full URLs indicates link lists or technical references)
url_pattern_count = message.count('http://') + message.count('https://')
if url_pattern_count >= 5:
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 2: Long unbroken alphanumeric strings (tokens, hashes, base64)
words = message.split()
for word in words:
cleaned = word.strip('.,;:!?()[]{}"\'"')
if len(cleaned) > 80 and cleaned.replace('-', '').replace('_', '').isalnum():
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 3: Markdown/text separators (repeated ---, ===, ___, ***)
separator_patterns = ['---', '===', '___', '***']
for pattern in separator_patterns:
if pattern * 3 in message.replace(' ', ''):
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 4: Command-line patterns with context-aware detection
lines_stripped = [line.strip() for line in message.split('\n') if line.strip()]
if lines_stripped and len(lines_stripped) >= 3:
actual_command_lines = 0
for line in lines_stripped:
if line.startswith('$ ') and len(line) > 2:
parts = line[2:].split()
if parts and parts[0].isalnum():
actual_command_lines += 1
elif line.startswith('# ') and len(line) > 2:
rest = line[2:].strip()
if rest and not rest[0].isupper() and ' ' in rest:
actual_command_lines += 1
elif line.startswith('> ') and len(line) > 2:
pass
if actual_command_lines >= 3:
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 5: High path/URL density (dots and slashes suggesting file paths or URLs)
if msg_len > 30:
slash_count = message.count('/') + message.count('\\')
dot_count = message.count('.')
path_chars = slash_count + dot_count
if path_chars > 10 and (path_chars / msg_len) > 0.15:
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 6: Markup character density (structured data)
markup_chars = sum(message.count(c) for c in '{}[]<>')
if markup_chars >= 6:
if markup_chars / msg_len > 0.10:
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 7: Highly structured multi-line content (require markup chars for technical confidence)
line_count = message.count('\n')
if line_count > 15:
lines = message.split('\n')
non_empty_lines = [line for line in lines if line.strip()]
if non_empty_lines:
markup_in_lines = sum(1 for line in non_empty_lines if any(c in line for c in '{}[]<>'))
structured_lines = sum(1 for line in non_empty_lines if line.startswith((' ', '\t')))
# Require high markup presence or indented structure with technical keywords
if markup_in_lines / len(non_empty_lines) > 0.3:
return self.SkipReason.SKIP_TECHNICAL.value
elif structured_lines / len(non_empty_lines) > 0.6:
technical_keywords = ['function', 'class', 'import', 'return', 'const', 'var', 'let', 'def']
if any(keyword in message.lower() for keyword in technical_keywords):
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 8: Code-like indentation pattern (require code indicators to avoid false positives from bullet lists)
if line_count >= 3:
lines = message.split('\n')
non_empty_lines = [line for line in lines if line.strip()]
if non_empty_lines:
indented_lines = sum(1 for line in non_empty_lines if line[0] in (' ', '\t'))
if indented_lines / len(non_empty_lines) > 0.5:
code_indicators = ['def ', 'class ', 'function ', 'return ', 'import ', 'const ', 'let ', 'var ', 'public ', 'private ']
if any(indicator in message.lower() for indicator in code_indicators):
return self.SkipReason.SKIP_TECHNICAL.value
# Pattern 9: Very high special character ratio (encoded data, technical output)
if msg_len > 50:
special_chars = sum(1 for c in message if not c.isalnum() and not c.isspace())
special_ratio = special_chars / msg_len
if special_ratio > 0.35:
alphanumeric = sum(1 for c in message if c.isalnum())
if alphanumeric / msg_len < 0.50:
return self.SkipReason.SKIP_TECHNICAL.value
return None
def detect_skip_reason(self, message: str, max_message_chars: int = Constants.MAX_MESSAGE_CHARS) -> Optional[str]:
"""
Detect if a message should be skipped using two-stage detection:
1. Fast-path structural patterns (~95% confidence)
2. Semantic classification (for remaining cases)
Returns:
Skip reason string if content should be skipped, None otherwise
"""
size_issue = self.validate_message_size(message, max_message_chars)
if size_issue:
return size_issue
fast_skip = self._fast_path_skip_detection(message)
if fast_skip:
logger.info(f"Fast-path skip: {fast_skip}")
return fast_skip
if self._reference_embeddings is None:
logger.warning("SkipDetector reference embeddings not initialized, allowing message through")
return None
try:
from sentence_transformers import util
message_embedding = self.embedding_model.encode(
message.strip(),
convert_to_tensor=True,
show_progress_bar=False
)
conversational_similarities = util.cos_sim(
message_embedding,
self._reference_embeddings['conversational']
)[0]
max_conversational_similarity = float(conversational_similarities.max())
skip_categories = [
('output_formatting', self.SkipReason.SKIP_OUTPUT_FORMATTING, self.OUTPUT_FORMATTING_CATEGORY_DESCRIPTIONS),
('pure_math', self.SkipReason.SKIP_PURE_MATH, self.PURE_MATH_CALCULATION_CATEGORY_DESCRIPTIONS),
('translation', self.SkipReason.SKIP_TRANSLATION, self.EXPLICIT_TRANSLATION_CATEGORY_DESCRIPTIONS),
('grammar', self.SkipReason.SKIP_GRAMMAR_PROOFREAD, self.GRAMMAR_PROOFREADING_CATEGORY_DESCRIPTIONS),
('technical', self.SkipReason.SKIP_TECHNICAL, self.TECHNICAL_CATEGORY_DESCRIPTIONS),
('meta', self.SkipReason.SKIP_META, self.META_CONVERSATION_CATEGORY_DESCRIPTIONS),
('factual_query', self.SkipReason.SKIP_FACTUAL_QUERY, self.FACTUAL_QUERY_CATEGORY_DESCRIPTIONS),
]
for cat_key, skip_reason, descriptions in skip_categories:
similarities = util.cos_sim(
message_embedding,
self._reference_embeddings[cat_key]
)[0]
max_similarity = float(similarities.max())
if max_similarity > Constants.SKIP_DETECTION_SIMILARITY_THRESHOLD:
margin = max_similarity - max_conversational_similarity
if margin > Constants.SKIP_DETECTION_CONFIDENT_MARGIN:
logger.info(f"Skipping message - {skip_reason.value} ({cat_key}: {max_similarity:.3f}, conv: {max_conversational_similarity:.3f}, margin: {margin:.3f})")
return skip_reason.value
if margin > Constants.SKIP_DETECTION_MARGIN:
logger.info(f"Skipping message - {skip_reason.value} ({cat_key}: {max_similarity:.3f}, conv: {max_conversational_similarity:.3f}, margin: {margin:.3f})")
return skip_reason.value
return None
except Exception as e:
logger.error(f"Error in semantic skip detection: {e}")
return None
class LLMRerankingService:
"""Language-agnostic LLM-based memory reranking service."""
def __init__(self, memory_system):
self.memory_system = memory_system
def _should_use_llm_reranking(self, memories: List[Dict]) -> Tuple[bool, str]:
if not self.memory_system.valves.enable_llm_reranking:
return False, "LLM reranking disabled"
llm_trigger_threshold = int(self.memory_system.valves.max_memories_returned * self.memory_system.valves.llm_reranking_trigger_multiplier)
if len(memories) > llm_trigger_threshold:
return True, f"{len(memories)} candidate memories exceed {llm_trigger_threshold} threshold"
return False, f"{len(memories)} candidate memories within threshold of {llm_trigger_threshold}"
async def _llm_select_memories(self, user_message: str, candidate_memories: List[Dict], max_count: int, emitter: Optional[Callable] = None) -> List[Dict]:
"""Use LLM to select most relevant memories."""
memory_lines = self.memory_system._format_memories_for_llm(candidate_memories)
memory_context = "\n".join(memory_lines)
user_prompt = f"""CURRENT DATE/TIME: {self.memory_system.format_current_datetime()}
USER MESSAGE: {user_message}
CANDIDATE MEMORIES:
{memory_context}"""
try:
response = await self.memory_system._query_llm(Prompts.MEMORY_RERANKING, user_prompt, response_model=Models.MemoryRerankingResponse)
selected_ids = response.ids
selected_memories = []
for memory in candidate_memories:
if memory["id"] in selected_ids and len(selected_memories) < max_count:
selected_memories.append(memory)
logger.info(f"🧠 LLM selected {len(selected_memories)} out of {len(candidate_memories)} candidates")
if not selected_memories:
await self.memory_system._emit_status(emitter, f"📭 No Relevant Memories After LLM Analysis", done=True)
return selected_memories
except Exception as e:
logger.warning(f"🤖 LLM reranking failed during memory relevance analysis: {str(e)}")
await self.memory_system._emit_status(emitter, f"⚠️ LLM Analysis Failed, Using Similarity Ranking", done=True)
return candidate_memories
async def rerank_memories(
self, user_message: str, candidate_memories: List[Dict], emitter: Optional[Callable] = None
) -> Tuple[List[Dict], Dict[str, Any]]:
start_time = time.time()
max_injection = self.memory_system.valves.max_memories_returned
should_use_llm, decision_reason = self._should_use_llm_reranking(candidate_memories)
analysis_info = {"llm_decision": should_use_llm, "decision_reason": decision_reason, "candidate_count": len(candidate_memories)}
if should_use_llm:
extended_count = int(self.memory_system.valves.max_memories_returned * Constants.EXTENDED_MAX_MEMORY_MULTIPLIER)
llm_candidates = candidate_memories[:extended_count]
await self.memory_system._emit_status(
emitter, f"🤖 LLM Analyzing {len(llm_candidates)} Memories for Relevance", done=False
)
logger.info(f"Using LLM reranking: {decision_reason}")
selected_memories = await self._llm_select_memories(user_message, llm_candidates, max_injection, emitter)
else:
logger.info(f"Skipping LLM reranking: {decision_reason}")
selected_memories = candidate_memories[:max_injection]
duration = time.time() - start_time
duration_text = f" in {duration:.2f}s" if duration >= 0.01 else ""
await self.memory_system._emit_status(emitter, f"🎯 Memory Retrieval Complete{duration_text}", done=False)
logger.info(f"🎯 Memory Retrieval Complete{duration_text}")
return selected_memories, analysis_info
class LLMConsolidationService:
"""Language-agnostic LLM-based memory consolidation service."""
def __init__(self, memory_system):
self.memory_system = memory_system
async def collect_consolidation_candidates(
self, user_message: str, user_id: str, cached_similarities: Optional[List[Dict[str, Any]]] = None
) -> List[Dict[str, Any]]:
"""Collect candidate memories for consolidation analysis using cached or computed similarities."""
if cached_similarities:
consolidation_threshold = self.memory_system._get_retrieval_threshold(is_consolidation=True)
candidates = [mem for mem in cached_similarities if mem["relevance"] >= consolidation_threshold]
max_consolidation_memories = int(self.memory_system.valves.max_memories_returned * Constants.EXTENDED_MAX_MEMORY_MULTIPLIER)
candidates = candidates[:max_consolidation_memories]
logger.info(f"🎯 Found {len(candidates)} candidate memories for consolidation (threshold: {consolidation_threshold:.3f}, max: {max_consolidation_memories})")
self.memory_system._log_retrieved_memories(candidates, "consolidation")
return candidates
try:
user_memories = await self.memory_system._get_user_memories(user_id)
except asyncio.TimeoutError:
raise TimeoutError(f"⏱️ Memory retrieval timed out after {Constants.DATABASE_OPERATION_TIMEOUT_SEC}s")
except Exception as e:
logger.error(f"💾 Failed to retrieve user memories from database: {str(e)}")
return []
if not user_memories:
logger.info("💭 No existing memories found for consolidation")
return []
else:
logger.info(f"🚀 Reusing cached user memories for consolidation: {len(user_memories)} memories")
try:
all_similarities, _, _ = await self.memory_system._compute_similarities(user_message, user_id, user_memories)
except Exception as e:
logger.error(f"🔍 Failed to compute memory similarities for retrieval: {str(e)}")
return []
if all_similarities:
consolidation_threshold = self.memory_system._get_retrieval_threshold(is_consolidation=True)
candidates = [mem for mem in all_similarities if mem["relevance"] >= consolidation_threshold]
max_consolidation_memories = int(self.memory_system.valves.max_memories_returned * Constants.EXTENDED_MAX_MEMORY_MULTIPLIER)
candidates = candidates[:max_consolidation_memories]
threshold_info = f"{consolidation_threshold:.3f} (max: {max_consolidation_memories})"
else:
candidates = []
threshold_info = 'N/A'
logger.info(f"🎯 Found {len(candidates)} candidate memories for consolidation (threshold: {threshold_info})")
self.memory_system._log_retrieved_memories(candidates, "consolidation")
return candidates
async def generate_consolidation_plan(self, user_message: str, candidate_memories: List[Dict[str, Any]], emitter: Optional[Callable] = None) -> List[Dict[str, Any]]:
"""Generate consolidation plan using LLM with clear system/user prompt separation."""
if candidate_memories:
memory_lines = self.memory_system._format_memories_for_llm(candidate_memories)
memory_context = f"EXISTING MEMORIES FOR CONSOLIDATION:\n{chr(10).join(memory_lines)}\n\n"
else:
memory_context = "EXISTING MEMORIES FOR CONSOLIDATION:\n[]\n\nNote: No existing memories found - Focus on extracting new memories from the user message below.\n\n"
user_prompt = f"""CURRENT DATE/TIME: {self.memory_system.format_current_datetime()}
{memory_context}USER MESSAGE: {user_message}"""
try:
response = await asyncio.wait_for(
self.memory_system._query_llm(Prompts.MEMORY_CONSOLIDATION, user_prompt, response_model=Models.ConsolidationResponse),
timeout=Constants.LLM_CONSOLIDATION_TIMEOUT_SEC,
)
except Exception as e:
logger.warning(f"🤖 LLM consolidation failed during memory processing: {str(e)}")
await self.memory_system._emit_status(emitter, f"⚠️ Memory Consolidation Failed", done=True)
return []
operations = response.ops
existing_memory_ids = {memory["id"] for memory in candidate_memories}
total_operations = len(operations)
delete_operations = [op for op in operations if op.operation == Models.MemoryOperationType.DELETE]
delete_ratio = len(delete_operations) / total_operations if total_operations > 0 else 0
if delete_ratio > Constants.MAX_DELETE_OPERATIONS_RATIO and total_operations >= Constants.MIN_OPS_FOR_DELETE_RATIO_CHECK:
logger.warning(
f"⚠️ Consolidation safety: {len(delete_operations)}/{total_operations} operations are deletions ({delete_ratio*100:.1f}%) - rejecting plan"
)
return []
valid_operations = [op.model_dump() for op in operations if op.validate_operation(existing_memory_ids)]
if valid_operations:
create_count = sum(1 for op in valid_operations if op.get("operation") == Models.MemoryOperationType.CREATE.value)
update_count = sum(1 for op in valid_operations if op.get("operation") == Models.MemoryOperationType.UPDATE.value)
delete_count = sum(1 for op in valid_operations if op.get("operation") == Models.MemoryOperationType.DELETE.value)
operation_details = self.memory_system._build_operation_details(create_count, update_count, delete_count)
logger.info(f"🎯 Planned {len(valid_operations)} memory operations: {', '.join(operation_details)}")
else:
logger.info("🎯 No valid memory operations planned")
return valid_operations
async def execute_memory_operations(self, operations: List[Dict[str, Any]], user_id: str, emitter: Optional[Callable] = None) -> Tuple[int, int, int, int]:
"""Execute consolidation operations with simplified tracking."""
if not operations or not user_id:
return 0, 0, 0, 0
try:
user = await asyncio.wait_for(asyncio.to_thread(Users.get_user_by_id, user_id), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC)
except asyncio.TimeoutError:
raise TimeoutError(f"⏱️ User lookup timed out after {Constants.DATABASE_OPERATION_TIMEOUT_SEC}s")
except Exception as e:
raise RuntimeError(f"👤 User lookup failed: {str(e)}")
if not user:
raise ValueError(f"👤 User not found for consolidation: {user_id}")
created_count = updated_count = deleted_count = failed_count = 0
operations_by_type = {"CREATE": [], "UPDATE": [], "DELETE": []}
for operation_data in operations:
try:
operation = Models.MemoryOperation(**operation_data)
operations_by_type[operation.operation.value].append(operation)
except Exception as e:
failed_count += 1
operation_type = operation_data.get("operation", Models.OperationResult.UNSUPPORTED.value)
content_preview = ""
if "content" in operation_data:
content = operation_data.get("content", "")
content_preview = f" - Content: {self.memory_system._truncate_content(content, Constants.CONTENT_PREVIEW_LENGTH)}"
elif "id" in operation_data:
content_preview = f" - ID: {operation_data['id']}"
error_message = f"Failed {operation_type} operation{content_preview}: {str(e)}"
logger.error(error_message)
memory_contents_for_deletion = {}
if operations_by_type["DELETE"]:
try:
user_memories = await self.memory_system._get_user_memories(user_id)
memory_contents_for_deletion = {str(mem.id): mem.content for mem in user_memories}
except Exception as e:
logger.warning(f"⚠️ Failed to fetch memories for DELETE preview: {str(e)}")
for operation_type, ops in operations_by_type.items():
if not ops:
continue
batch_tasks = []
for operation in ops:
task = self.memory_system._execute_single_operation(operation, user)
batch_tasks.append(task)
try:
results = await asyncio.gather(*batch_tasks, return_exceptions=True)
for idx, result in enumerate(results):
operation = ops[idx]
if isinstance(result, Exception):
failed_count += 1
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False)
elif result == Models.MemoryOperationType.CREATE.value:
created_count += 1
content_preview = self.memory_system._truncate_content(operation.content)
await self.memory_system._emit_status(emitter, f"📝 Created: {content_preview}", done=False)
elif result == Models.MemoryOperationType.UPDATE.value:
updated_count += 1
content_preview = self.memory_system._truncate_content(operation.content)
await self.memory_system._emit_status(emitter, f"✏️ Updated: {content_preview}", done=False)
elif result == Models.MemoryOperationType.DELETE.value:
deleted_count += 1
content_preview = memory_contents_for_deletion.get(operation.id, operation.id)
if content_preview and content_preview != operation.id:
content_preview = self.memory_system._truncate_content(content_preview)
await self.memory_system._emit_status(emitter, f"🗑️ Deleted: {content_preview}", done=False)
elif result in [Models.OperationResult.FAILED.value, Models.OperationResult.UNSUPPORTED.value]:
failed_count += 1
await self.memory_system._emit_status(emitter, f"❌ Failed {operation_type}", done=False)
except Exception as e:
failed_count += len(ops)
logger.error(f"❌ Batch {operation_type} operations failed during memory consolidation: {str(e)}")
await self.memory_system._emit_status(emitter, f"❌ Batch {operation_type} Failed", done=False)
total_executed = created_count + updated_count + deleted_count
logger.info(
f"✅ Memory processing completed {total_executed}/{len(operations)} operations (Created {created_count}, Updated {updated_count}, Deleted {deleted_count}, Failed {failed_count})"
)
if total_executed > 0:
operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count)
logger.info(f"🔄 Memory Operations: {', '.join(operation_details)}")
await self.memory_system._manage_user_cache(user_id)
return created_count, updated_count, deleted_count, failed_count
async def run_consolidation_pipeline(
self, user_message: str, user_id: str, emitter: Optional[Callable] = None, cached_similarities: Optional[List[Dict[str, Any]]] = None
) -> None:
"""Complete consolidation pipeline with simplified flow."""
start_time = time.time()
try:
if self.memory_system._shutdown_event.is_set():
return
candidates = await self.collect_consolidation_candidates(user_message, user_id, cached_similarities)
if self.memory_system._shutdown_event.is_set():
return
operations = await self.generate_consolidation_plan(user_message, candidates, emitter)
if self.memory_system._shutdown_event.is_set():
return
if operations:
created_count, updated_count, deleted_count, failed_count = await self.execute_memory_operations(operations, user_id, emitter)
duration = time.time() - start_time
logger.info(f"💾 Memory Consolidation Complete In {duration:.2f}s")
await self.memory_system._emit_status(emitter, f"💾 Memory Consolidation Complete in {duration:.2f}s", done=False)
total_operations = created_count + updated_count + deleted_count
if total_operations > 0 or failed_count > 0:
operation_details = self.memory_system._build_operation_details(created_count, updated_count, deleted_count)
memory_word = "Memory" if total_operations == 1 else "Memories"
operations_summary = f"{', '.join(operation_details)} {memory_word}"
if failed_count > 0:
operations_summary += f" (❌ {failed_count} Failed)"
await self.memory_system._emit_status(emitter, operations_summary, done=True)
except Exception as e:
duration = time.time() - start_time
raise RuntimeError(f"❌ Memory consolidation failed after {duration:.2f}s: {str(e)}")
class Filter:
"""Enhanced multi-model embedding and memory filter with LRU caching."""
__current_event_emitter__: Callable[[dict], Any]
__user__: Dict[str, Any]
__model__: str
__request__: Request
class Valves(BaseModel):
"""Configuration valves for the Memory System."""
model: str = Field(default=Constants.DEFAULT_LLM_MODEL, description="Model name for LLM operations")
embedding_model: str = Field(default=Constants.DEFAULT_EMBEDDING_MODEL, description="Sentence transformer model for embeddings")
max_memories_returned: int = Field(default=Constants.MAX_MEMORIES_PER_RETRIEVAL, description="Maximum number of memories to return in context")
max_message_chars: int = Field(default=Constants.MAX_MESSAGE_CHARS, description="Maximum user message length before skipping memory operations")
semantic_retrieval_threshold: float = Field(default=Constants.SEMANTIC_RETRIEVAL_THRESHOLD, description="Minimum similarity threshold for memory retrieval")
relaxed_semantic_threshold_multiplier: float = Field(default=Constants.RELAXED_SEMANTIC_THRESHOLD_MULTIPLIER, description="Adjusts similarity threshold for memory consolidation (lower = more candidates)")
enable_llm_reranking: bool = Field(default=True, description="Enable LLM-based memory reranking for improved contextual selection")
llm_reranking_trigger_multiplier: float = Field(default=Constants.LLM_RERANKING_TRIGGER_MULTIPLIER, description="Controls when LLM reranking activates (lower = more aggressive)")
def __init__(self):
"""Initialize the Memory System filter with production validation."""
global _SHARED_MODEL_CACHE
self.valves = self.Valves()
self._validate_system_configuration()
self._cache_manager = UnifiedCacheManager(Constants.MAX_CACHE_ENTRIES_PER_TYPE, Constants.MAX_CONCURRENT_USER_CACHES)
self._background_tasks: set = set()
self._shutdown_event = asyncio.Event()
model_key = self.valves.embedding_model
if model_key in _SHARED_MODEL_CACHE:
logger.info(f"♻️ Reusing cached embedding model: {model_key}")
self._model = _SHARED_MODEL_CACHE[model_key]["model"]
self._skip_detector = _SHARED_MODEL_CACHE[model_key]["skip_detector"]
else:
logger.info(f"🤖 Loading embedding model: {model_key} (cache has {len(_SHARED_MODEL_CACHE)} models)")
self._model = SentenceTransformer(self.valves.embedding_model, device="cpu", trust_remote_code=True)
self._skip_detector = SkipDetector(self._model)
_SHARED_MODEL_CACHE[model_key] = {
"model": self._model,
"skip_detector": self._skip_detector
}
logger.info(f"✅ Embedding model and skip detector initialized and cached")
self._llm_reranking_service = LLMRerankingService(self)
self._llm_consolidation_service = LLMConsolidationService(self)
def _set_pipeline_context(self, __event_emitter__: Optional[Callable] = None, __user__: Optional[Dict[str, Any]] = None,
__model__: Optional[str] = None, __request__: Optional[Request] = None) -> None:
"""Set pipeline context parameters to avoid duplication in inlet/outlet methods."""
if __event_emitter__:
self.__current_event_emitter__ = __event_emitter__
if __user__:
self.__user__ = __user__
if __model__:
self.__model__ = __model__
if __request__:
self.__request__ = __request__
def _truncate_content(self, content: str, max_length: Optional[int] = None) -> str:
"""Truncate content with ellipsis if needed."""
if max_length is None:
max_length = Constants.CONTENT_PREVIEW_LENGTH
return content[:max_length] + "..." if len(content) > max_length else content
def _get_retrieval_threshold(self, is_consolidation: bool = False) -> float:
"""Calculate retrieval threshold for semantic similarity filtering."""
if is_consolidation:
return self.valves.semantic_retrieval_threshold * self.valves.relaxed_semantic_threshold_multiplier
return self.valves.semantic_retrieval_threshold
def _extract_text_from_content(self, content) -> str:
if isinstance(content, str):
return content
elif isinstance(content, list):
text_parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(item.get("text", ""))
return " ".join(text_parts)
elif isinstance(content, dict) and "text" in content:
return content["text"]
return ""
def _validate_system_configuration(self) -> None:
"""Validate configuration and fail if invalid."""
if not self.valves.model or not self.valves.model.strip():
raise ValueError("🤖 Model not specified")
if self.valves.max_memories_returned <= 0:
raise ValueError(f"📊 Invalid max memories returned: {self.valves.max_memories_returned}")
if not (0.0 <= self.valves.semantic_retrieval_threshold <= 1.0):
raise ValueError(f"🎯 Invalid semantic retrieval threshold: {self.valves.semantic_retrieval_threshold} (must be 0.0-1.0)")
logger.info("✅ Configuration validated")
async def _get_embedding_cache(self, user_id: str, key: str) -> Optional[Any]:
"""Get embedding from cache."""
return await self._cache_manager.get(user_id, self._cache_manager.EMBEDDING_CACHE, key)
async def _put_embedding_cache(self, user_id: str, key: str, value: Any) -> None:
"""Store embedding in cache."""
await self._cache_manager.put(user_id, self._cache_manager.EMBEDDING_CACHE, key, value)
def _compute_text_hash(self, text: str) -> str:
"""Compute SHA256 hash for text caching."""
return hashlib.sha256(text.encode()).hexdigest()
def _normalize_embedding(self, embedding: np.ndarray) -> np.ndarray:
"""Normalize embedding vector."""
norm = np.linalg.norm(embedding)
return embedding / norm if norm > 0 else embedding
def _generate_embeddings_sync(self, model, texts: Union[str, List[str]]) -> Union[np.ndarray, List[np.ndarray]]:
"""Synchronous embedding generation for single text or batch."""
is_single = isinstance(texts, str)
input_texts = [texts] if is_single else texts
embeddings = model.encode(input_texts, convert_to_numpy=True, show_progress_bar=False)
normalized = [self._normalize_embedding(emb) for emb in embeddings]
return normalized[0] if is_single else normalized
async def _generate_embeddings(self, texts: Union[str, List[str]], user_id: str) -> Union[np.ndarray, List[np.ndarray]]:
"""Unified embedding generation for single text or batch with optimized caching."""
is_single = isinstance(texts, str)
text_list = [texts] if is_single else texts
if not text_list:
if is_single:
raise ValueError("📏 Empty text provided for embedding generation")
return []
result_embeddings = []
uncached_texts = []
uncached_indices = []
uncached_hashes = []
for i, text in enumerate(text_list):
if not text or len(str(text).strip()) < Constants.MIN_MESSAGE_CHARS:
if is_single:
raise ValueError("📏 Text too short for embedding generation")
result_embeddings.append(None)
continue
text_hash = self._compute_text_hash(str(text))
cached = await self._get_embedding_cache(user_id, text_hash)
if cached is not None:
result_embeddings.append(cached)
else:
result_embeddings.append(None)
uncached_texts.append(text)
uncached_indices.append(i)
uncached_hashes.append(text_hash)
if uncached_texts:
loop = asyncio.get_event_loop()
new_embeddings = await loop.run_in_executor(None, self._generate_embeddings_sync, self._model, uncached_texts)
for j, embedding in enumerate(new_embeddings):
original_idx = uncached_indices[j]
text_hash = uncached_hashes[j]
await self._put_embedding_cache(user_id, text_hash, embedding)
result_embeddings[original_idx] = embedding
if is_single:
logger.info("📥 User message embedding: cache hit" if not uncached_texts else "💾 User message embedding: generated and cached")
return result_embeddings[0]
else:
valid_count = sum(1 for emb in result_embeddings if emb is not None)
logger.info(
f"🚀 Batch embedding: {len(text_list) - len(uncached_texts)} cached, {len(uncached_texts)} new, {valid_count}/{len(text_list)} valid"
)
return result_embeddings
def _should_skip_memory_operations(self, user_message: str) -> Tuple[bool, str]:
if self._skip_detector is None:
raise RuntimeError("🤖 Skip detector not initialized")
skip_reason = self._skip_detector.detect_skip_reason(user_message, self.valves.max_message_chars)
if skip_reason:
status_key = SkipDetector.SkipReason(skip_reason)
return True, SkipDetector.STATUS_MESSAGES[status_key]
return False, ""
def _process_user_message(self, body: Dict[str, Any]) -> Tuple[Optional[str], bool, str]:
"""Extract user message and determine if memory operations should be skipped."""
if not body or "messages" not in body or not isinstance(body["messages"], list):
return None, True, SkipDetector.STATUS_MESSAGES[SkipDetector.SkipReason.SKIP_SIZE]
messages = body["messages"]
user_message = None
for message in reversed(messages):
if not isinstance(message, dict) or message.get("role") != "user":
continue
content = message.get("content", "")
user_message = self._extract_text_from_content(content)
if user_message:
break
if not user_message or not user_message.strip():
return None, True, SkipDetector.STATUS_MESSAGES[SkipDetector.SkipReason.SKIP_SIZE]
should_skip, skip_reason = self._should_skip_memory_operations(user_message)
return user_message, should_skip, skip_reason
async def _get_user_memories(self, user_id: str, timeout: Optional[float] = None) -> List:
"""Get user memories with timeout handling."""
if timeout is None:
timeout = Constants.DATABASE_OPERATION_TIMEOUT_SEC
try:
return await asyncio.wait_for(asyncio.to_thread(Memories.get_memories_by_user_id, user_id), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"⏱️ Memory retrieval timed out after {timeout}s")
except Exception as e:
raise RuntimeError(f"💾 Memory retrieval failed: {str(e)}")
def _log_retrieved_memories(self, memories: List[Dict[str, Any]], context_type: str = "semantic") -> None:
"""Log retrieved memories with concise formatting showing key statistics and semantic values."""
if not memories:
return
scores = [memory["relevance"] for memory in memories]
if not scores:
return
top_score = max(scores)
lowest_score = min(scores)
median_score = sorted(scores)[len(scores) // 2]
context_label = "📊 Consolidation candidate memories" if context_type == "consolidation" else "📊 Retrieved memories"
max_scores_to_show = int(self.valves.max_memories_returned * Constants.EXTENDED_MAX_MEMORY_MULTIPLIER)
scores_str = ", ".join([f"{score:.3f}" for score in scores[:max_scores_to_show]])
suffix = "..." if len(scores) > max_scores_to_show else ""
logger.info(f"{context_label}: {len(memories)} memories | Top: {top_score:.3f} | Median: {median_score:.3f} | Lowest: {lowest_score:.3f}")
logger.info(f"Scores: [{scores_str}{suffix}]")
def _build_operation_details(self, created_count: int, updated_count: int, deleted_count: int) -> List[str]:
"""Build operation details list with consistent formatting."""
operation_details = []
operations = [(created_count, "📝 Created"), (updated_count, "✏️ Updated"), (deleted_count, "🗑️ Deleted")]
for count, label in operations:
if count > 0:
operation_details.append(f"{label} {count}")
return operation_details
def _cache_key(self, cache_type: str, user_id: str, content: Optional[str] = None) -> str:
"""Unified cache key generation for all cache types."""
if content:
content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()[: Constants.CACHE_KEY_HASH_PREFIX_LENGTH]
return f"{cache_type}_{user_id}:{content_hash}"
return f"{cache_type}_{user_id}"
def format_current_datetime(self) -> str:
try:
now = datetime.now(timezone.utc)
return now.strftime("%A %B %d %Y at %H:%M:%S UTC")
except Exception as e:
raise RuntimeError(f"📅 Failed to format datetime: {str(e)}")
def _format_memories_for_llm(self, memories: List[Dict[str, Any]]) -> List[str]:
"""Format memories for LLM consumption with hybrid format and human-readable timestamps."""
memory_lines = []
for memory in memories:
line = f"[{memory['id']}] {memory['content']}"
record_date = memory.get("updated_at") or memory.get("created_at")
if record_date:
try:
if isinstance(record_date, str):
parsed_date = datetime.fromisoformat(record_date.replace('Z', '+00:00'))
else:
parsed_date = record_date
formatted_date = parsed_date.strftime("%b %d %Y")
line += f" [noted at {formatted_date}]"
except Exception as e:
logger.warning(f"Failed to format date {record_date}: {str(e)}")
line += f" [noted at {record_date}]"
memory_lines.append(line)
return memory_lines
async def _emit_status(self, emitter: Optional[Callable], description: str, done: bool = True) -> None:
"""Emit status messages for memory operations."""
if not emitter:
return
payload = {"type": "status", "data": {"description": description, "done": done}}
try:
result = emitter(payload)
if asyncio.iscoroutine(result):
await result
except Exception:
pass
async def _retrieve_relevant_memories(
self,
user_message: str,
user_id: str,
user_memories: Optional[List] = None,
emitter: Optional[Callable] = None,
cached_similarities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Retrieve memories for injection using similarity computation with optional LLM reranking."""
if cached_similarities is not None:
memories = [m for m in cached_similarities if m.get("relevance", 0) >= self.valves.semantic_retrieval_threshold]
logger.info(f"🔍 Using cached similarities for {len(memories)} candidate memories")
final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter)
self._log_retrieved_memories(final_memories, "semantic")
return {
"memories": final_memories,
"threshold": self.valves.semantic_retrieval_threshold,
"all_similarities": cached_similarities,
"reranking_info": reranking_info,
}
if user_memories is None:
user_memories = await self._get_user_memories(user_id)
if not user_memories:
logger.info("📭 No memories found for user")
await self._emit_status(emitter, "📭 No Memories Found", done=True)
return {"memories": [], "threshold": None}
memories, threshold, all_similarities = await self._compute_similarities(user_message, user_id, user_memories)
if memories:
final_memories, reranking_info = await self._llm_reranking_service.rerank_memories(user_message, memories, emitter)
else:
logger.info("📭 No relevant memories found above similarity threshold")
await self._emit_status(emitter, "📭 No Relevant Memories Found", done=True)
final_memories = memories
reranking_info = {"llm_decision": False, "decision_reason": "no_candidates"}
self._log_retrieved_memories(final_memories, "semantic")
return {"memories": final_memories, "threshold": threshold, "all_similarities": all_similarities, "reranking_info": reranking_info}
async def _add_memory_context(
self, body: Dict[str, Any], memories: Optional[List[Dict[str, Any]]] = None, user_id: Optional[str] = None, emitter: Optional[Callable] = None
) -> None:
"""Add memory context to request body with simplified logic."""
if not body or "messages" not in body or not body["messages"]:
logger.warning("⚠️ Invalid request body or no messages found")
return
content_parts = [f"Current Date/Time: {self.format_current_datetime()}"]
memory_count = 0
if memories and user_id:
memory_count = len(memories)
memory_header = f"CONTEXT: The following {'fact' if memory_count == 1 else 'facts'} about the user are provided for background only. Not all facts may be relevant to the current request."
formatted_memories = []
for idx, memory in enumerate(memories, 1):
formatted_memory = f"- {' '.join(memory['content'].split())}"
formatted_memories.append(formatted_memory)
content_preview = self._truncate_content(memory['content'])
await self._emit_status(emitter, f"💭 {idx}/{memory_count}: {content_preview}", done=False)
memory_footer = "IMPORTANT: Do not mention or imply you received this list. These facts are for background context only."
memory_context_block = f"{memory_header}\n{chr(10).join(formatted_memories)}\n\n{memory_footer}"
content_parts.append(memory_context_block)
memory_context = "\n\n".join(content_parts)
system_index = next((i for i, msg in enumerate(body["messages"]) if msg.get("role") == "system"), None)
if system_index is not None:
body["messages"][system_index]["content"] = f"{body['messages'][system_index].get('content', '')}\n\n{memory_context}"
else:
body["messages"].insert(0, {"role": "system", "content": memory_context})
if memories and user_id:
description = f"🧠 Injected {memory_count} {'Memory' if memory_count == 1 else 'Memories'} to Context"
await self._emit_status(emitter, description, done=True)
def _build_memory_dict(self, memory, similarity: float) -> Dict[str, Any]:
"""Build memory dictionary with standardized timestamp conversion."""
memory_dict = {"id": str(memory.id), "content": memory.content, "relevance": similarity}
if hasattr(memory, "created_at") and memory.created_at:
memory_dict["created_at"] = datetime.fromtimestamp(memory.created_at, tz=timezone.utc).isoformat()
if hasattr(memory, "updated_at") and memory.updated_at:
memory_dict["updated_at"] = datetime.fromtimestamp(memory.updated_at, tz=timezone.utc).isoformat()
return memory_dict
async def _compute_similarities(
self, user_message: str, user_id: str, user_memories: List
) -> Tuple[List[Dict], float, List[Dict]]:
"""Compute similarity scores between user message and memories."""
if not user_memories:
return [], self.valves.semantic_retrieval_threshold, []
query_embedding = await self._generate_embeddings(user_message, user_id)
memory_contents = [memory.content for memory in user_memories]
memory_embeddings = await self._generate_embeddings(memory_contents, user_id)
if len(memory_embeddings) != len(user_memories):
logger.error(f"🔢 Embedding generation failed: generated {len(memory_embeddings)} embeddings but expected {len(user_memories)} for user memories")
return [], self.valves.semantic_retrieval_threshold, []
similarity_scores = []
memory_data = []
for memory_index, memory in enumerate(user_memories):
memory_embedding = memory_embeddings[memory_index]
if memory_embedding is None:
continue
similarity = float(np.dot(query_embedding, memory_embedding))
similarity_scores.append(similarity)
memory_dict = self._build_memory_dict(memory, similarity)
memory_data.append(memory_dict)
if not similarity_scores:
return [], self.valves.semantic_retrieval_threshold, []
memory_data.sort(key=lambda x: x["relevance"], reverse=True)
threshold = self.valves.semantic_retrieval_threshold
filtered_memories = [m for m in memory_data if m["relevance"] >= threshold]
return filtered_memories, threshold, memory_data
async def inlet(
self,
body: Dict[str, Any],
__event_emitter__: Optional[Callable] = None,
__user__: Optional[Dict[str, Any]] = None,
__model__: Optional[str] = None,
__request__: Optional[Request] = None,
**kwargs,
) -> Dict[str, Any]:
"""Simplified inlet processing for memory retrieval and injection."""
self._set_pipeline_context(__event_emitter__, __user__, __model__, __request__)
user_id = __user__.get("id") if body and __user__ else None
if not user_id:
return body
user_message, should_skip, skip_reason = self._process_user_message(body)
if not user_message or should_skip:
if __event_emitter__ and skip_reason:
await self._emit_status(__event_emitter__, skip_reason, done=True)
await self._add_memory_context(body, [], user_id, __event_emitter__)
return body
try:
memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id)
user_memories = await self._cache_manager.get(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key)
if user_memories is None:
user_memories = await self._get_user_memories(user_id)
if user_memories:
await self._cache_manager.put(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key, user_memories)
retrieval_result = await self._retrieve_relevant_memories(user_message, user_id, user_memories, __event_emitter__)
memories = retrieval_result.get("memories", [])
threshold = retrieval_result.get("threshold")
all_similarities = retrieval_result.get("all_similarities", [])
if all_similarities:
cache_key = self._cache_key(self._cache_manager.RETRIEVAL_CACHE, user_id, user_message)
await self._cache_manager.put(user_id, self._cache_manager.RETRIEVAL_CACHE, cache_key, all_similarities)
await self._add_memory_context(body, memories, user_id, __event_emitter__)
except Exception as e:
raise RuntimeError(f"💾 Memory retrieval failed: {str(e)}")
return body
async def outlet(
self,
body: dict,
__event_emitter__: Optional[Callable] = None,
__user__: Optional[dict] = None,
__model__: Optional[str] = None,
__request__: Optional[Request] = None,
**kwargs,
) -> dict:
"""Simplified outlet processing for background memory consolidation."""
self._set_pipeline_context(__event_emitter__, __user__, __model__, __request__)
user_id = __user__.get("id") if body and __user__ else None
if not user_id:
return body
user_message, should_skip, skip_reason = self._process_user_message(body)
if not user_message or should_skip:
return body
cache_key = self._cache_key(self._cache_manager.RETRIEVAL_CACHE, user_id, user_message)
cached_similarities = await self._cache_manager.get(user_id, self._cache_manager.RETRIEVAL_CACHE, cache_key)
task = asyncio.create_task(
self._llm_consolidation_service.run_consolidation_pipeline(user_message, user_id, __event_emitter__, cached_similarities)
)
self._background_tasks.add(task)
def safe_cleanup(t: asyncio.Task) -> None:
try:
self._background_tasks.discard(t)
if t.exception() and not t.cancelled():
exception = t.exception()
logger.error(f"❌ Background memory consolidation task failed: {str(exception)}")
except Exception as e:
logger.error(f"❌ Failed to cleanup background memory task: {str(e)}")
task.add_done_callback(safe_cleanup)
return body
async def shutdown(self) -> None:
"""Cleanup method to properly shutdown background tasks."""
self._shutdown_event.set()
if self._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True)
self._background_tasks.clear()
await self._cache_manager.clear_all_caches()
async def _manage_user_cache(self, user_id: str, clear_first: bool = False) -> None:
"""Manage user cache - clear, invalidate, and refresh as needed."""
start_time = time.time()
try:
if clear_first:
total_removed = await self._cache_manager.clear_user_cache(user_id)
logger.info(f"🧹 Cleared {total_removed} cache entries for user {user_id}")
else:
retrieval_cleared = await self._cache_manager.clear_user_cache(user_id, self._cache_manager.RETRIEVAL_CACHE)
logger.info(f"🔄 Cleared {retrieval_cleared} retrieval cache entries for user {user_id}")
user_memories = await self._get_user_memories(user_id)
memory_cache_key = self._cache_key(self._cache_manager.MEMORY_CACHE, user_id)
if not user_memories:
await self._cache_manager.put(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key, [])
logger.info("📭 No memories found for user")
return
await self._cache_manager.put(user_id, self._cache_manager.MEMORY_CACHE, memory_cache_key, user_memories)
memory_contents = [
memory.content
for memory in user_memories
if memory.content and len(memory.content.strip()) >= Constants.MIN_MESSAGE_CHARS
]
if memory_contents:
await self._generate_embeddings(memory_contents, user_id)
duration = time.time() - start_time
logger.info(f"<EFBFBD> Cache updated with {len(memory_contents)} embeddings for user {user_id} in {duration:.2f}s")
except Exception as e:
raise RuntimeError(f"🧹 Failed to manage cache for user {user_id} after {(time.time() - start_time):.2f}s: {str(e)}")
async def _execute_single_operation(self, operation: Models.MemoryOperation, user: Any) -> str:
"""Execute a single memory operation."""
try:
if operation.operation == Models.MemoryOperationType.CREATE:
if not operation.content.strip():
logger.warning(f"⚠️ Skipping CREATE operation: empty content")
return Models.OperationResult.SKIPPED_EMPTY_CONTENT.value
await asyncio.wait_for(
asyncio.to_thread(Memories.insert_new_memory, user.id, operation.content.strip()), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
)
return Models.MemoryOperationType.CREATE.value
elif operation.operation == Models.MemoryOperationType.UPDATE:
if not operation.id.strip():
logger.warning(f"⚠️ Skipping UPDATE operation: empty ID")
return Models.OperationResult.SKIPPED_EMPTY_ID.value
if not operation.content.strip():
logger.warning(f"⚠️ Skipping UPDATE operation for {operation.id}: empty content")
return Models.OperationResult.SKIPPED_EMPTY_CONTENT.value
await asyncio.wait_for(
asyncio.to_thread(Memories.update_memory_by_id_and_user_id, operation.id, user.id, operation.content.strip()),
timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC,
)
return Models.MemoryOperationType.UPDATE.value
elif operation.operation == Models.MemoryOperationType.DELETE:
if not operation.id.strip():
logger.warning(f"⚠️ Skipping DELETE operation: empty ID")
return Models.OperationResult.SKIPPED_EMPTY_ID.value
await asyncio.wait_for(
asyncio.to_thread(Memories.delete_memory_by_id_and_user_id, operation.id, user.id), timeout=Constants.DATABASE_OPERATION_TIMEOUT_SEC
)
return Models.MemoryOperationType.DELETE.value
else:
logger.error(f"❓ Unsupported operation: {operation}")
return Models.OperationResult.UNSUPPORTED.value
except Exception as e:
logger.error(f"💾 Database operation failed for {operation.operation.value}: {str(e)}")
return Models.OperationResult.FAILED.value
def _remove_refs_from_schema(self, schema: Dict[str, Any], schema_defs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Remove $ref references and ensure required fields for Azure OpenAI."""
if not isinstance(schema, dict):
return schema
if '$ref' in schema:
ref_path = schema['$ref']
if ref_path.startswith('#/$defs/'):
def_name = ref_path.split('/')[-1]
if schema_defs and def_name in schema_defs:
return self._remove_refs_from_schema(schema_defs[def_name].copy(), schema_defs)
return {'type': 'object'}
result = {}
for key, value in schema.items():
if key == '$defs':
continue
elif isinstance(value, dict):
result[key] = self._remove_refs_from_schema(value, schema_defs)
elif isinstance(value, list):
result[key] = [self._remove_refs_from_schema(item, schema_defs) if isinstance(item, dict) else item for item in value]
else:
result[key] = value
if result.get('type') == 'object' and 'properties' in result:
result['required'] = list(result['properties'].keys())
return result
async def _query_llm(self, system_prompt: str, user_prompt: str, response_model: Optional[BaseModel] = None) -> Union[str, BaseModel]:
"""Query OpenWebUI's internal model system with Pydantic model parsing."""
if not hasattr(self, "__request__") or not hasattr(self, "__user__"):
raise RuntimeError("🔧 Pipeline interface not properly initialized. __request__ and __user__ required.")
model_to_use = self.valves.model if self.valves.model else self.__model__
if not model_to_use:
raise ValueError("🤖 No model specified for LLM operations")
form_data = {
"model": model_to_use,
"messages": [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
"max_tokens": 4096,
"stream": False,
}
if response_model:
raw_schema = response_model.model_json_schema()
schema_defs = raw_schema.get('$defs', {})
schema = self._remove_refs_from_schema(raw_schema, schema_defs)
schema['type'] = 'object'
form_data["response_format"] = {"type": "json_schema", "json_schema": {"name": response_model.__name__, "strict": True, "schema": schema}}
try:
response = await asyncio.wait_for(
generate_chat_completion(self.__request__, form_data, user=await asyncio.to_thread(Users.get_user_by_id, self.__user__["id"])),
timeout=Constants.LLM_CONSOLIDATION_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
raise TimeoutError(f"⏱️ LLM query timed out after {Constants.LLM_CONSOLIDATION_TIMEOUT_SEC}s")
except Exception as e:
raise RuntimeError(f"🤖 LLM query failed: {str(e)}")
try:
if hasattr(response, "body") and hasattr(getattr(response, "body", None), "decode"):
body = getattr(response, "body")
response_data = json.loads(body.decode("utf-8"))
else:
response_data = response
except (json.JSONDecodeError, AttributeError) as e:
raise RuntimeError(f"🔍 Failed to decode response body: {str(e)}")
if isinstance(response_data, dict) and "choices" in response_data and isinstance(response_data["choices"], list) and len(response_data["choices"]) > 0:
first_choice = response_data["choices"][0]
if isinstance(first_choice, dict) and "message" in first_choice and isinstance(first_choice["message"], dict) and "content" in first_choice["message"]:
content = first_choice["message"]["content"]
else:
raise ValueError("🤖 Invalid response structure: missing content in message")
else:
raise ValueError(f"🤖 Unexpected LLM response format: {response_data}")
if response_model:
try:
parsed_data = json.loads(content)
return response_model.model_validate(parsed_data)
except json.JSONDecodeError as e:
raise ValueError(f"🔍 Invalid JSON from LLM: {str(e)}\nContent: {content}")
except PydanticValidationError as e:
raise ValueError(f"🔍 LLM response validation failed: {str(e)}\nContent: {content}")
if not content or content.strip() == "":
raise ValueError("🤖 Empty response from LLM")
return content