Consolidator Agent
Merge duplicate and highly similar memories to reduce redundancy and improve memory quality.
Overview
The Consolidator Agent identifies and merges duplicate or highly similar memories across all memory types. It uses embedding similarity to find candidates and intelligently merges their content, preventing memory bloat and improving retrieval quality.
When to Use
- When memory grows beyond 1000+ entries
- To deduplicate entity mentions (e.g., "John", "John Smith", "@john")
- To merge similar knowledge base facts
- As a scheduled background task (e.g., daily at 3 AM)
- Before archival (consolidate then compress)
- When retrieval quality degrades due to duplicates
Dual-Mode Operation
Mode 1: Embedding Similarity Only (No LLM)
- Finds similar memories using cosine similarity
- Merges by keeping the longer content
- Fast and deterministic
- Zero LLM costs
- Output: Primary memory with merged metadata
Mode 2: LLM-Powered Intelligent Merging
- Uses embedding similarity to find candidates
- Uses LLM to intelligently merge content
- Removes redundancy while preserving unique information
- Higher quality merged memories
- Output: Comprehensive merged memory with all important details
API Methods
consolidate_memories
Find and merge duplicate memories.
async def consolidate_memories(
min_memories: int = 5,
memory_type: str | None = None
) -> dict[str, int]
Parameters:
min_memories: Minimum number of memories required before consolidation runs (default: 5)memory_type: Optional memory type to consolidate (None = all types)
Returns: Dictionary with merged and deleted counts
Example:
from memharness import MemoryHarness
from memharness.agents import ConsolidatorAgent
from langchain.chat_models import init_chat_model
async with MemoryHarness("sqlite:///memory.db") as harness:
# Heuristic mode
agent_basic = ConsolidatorAgent(harness, threshold=0.85)
result = await agent_basic.consolidate_memories(min_memories=5)
# Output: {"merged": 0, "deleted": 0} # Simplified implementation
# LLM mode (intelligent merging)
llm = init_chat_model("gpt-4o")
agent_smart = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
result = await agent_smart.consolidate_memories(min_memories=10)
# Output: {"merged": 3, "deleted": 3}
run
Execute the consolidator agent (standard agent interface).
async def run(
min_memories: int = 5,
memory_type: str | None = None,
**kwargs
) -> dict[str, Any]
Parameters:
min_memories: Minimum memories before consolidationmemory_type: Optional memory type to consolidate**kwargs: Additional arguments (ignored)
Returns: Dictionary with merged, deleted, and threshold keys
Example:
result = await agent.run(min_memories=10, memory_type="entity")
# Returns: {"merged": 3, "deleted": 3, "threshold": 0.85}
Implementation Details
Similarity Detection
The agent uses cosine similarity to find duplicate candidates:
def _cosine_similarity(
self,
embedding1: list[float],
embedding2: list[float]
) -> float:
"""Calculate cosine similarity between two embeddings."""
if len(embedding1) != len(embedding2):
return 0.0
dot_product = sum(a * b for a, b in zip(embedding1, embedding2, strict=False))
norm1 = sum(a * a for a in embedding1) ** 0.5
norm2 = sum(b * b for b in embedding2) ** 0.5
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
Similarity Threshold:
0.95+: Nearly identical (definitely merge)0.85-0.94: Very similar (likely merge)0.70-0.84: Similar but distinct (review manually)< 0.70: Different (don't merge)
Heuristic Mode
The heuristic mode keeps the longer content:
async def _merge_memories_heuristic(
self,
memory1: MemoryUnit,
memory2: MemoryUnit
) -> MemoryUnit:
"""Merge two memories using heuristic approach (keep longer content)."""
# Keep the one with longer content
if len(memory1.content) >= len(memory2.content):
primary, secondary = memory1, memory2
else:
primary, secondary = memory2, memory1
# Merge metadata
merged_metadata = {**secondary.metadata, **primary.metadata}
merged_metadata["merged_from"] = [memory1.id, memory2.id]
# Return primary with merged metadata
return MemoryUnit(
id=primary.id,
memory_type=primary.memory_type,
content=primary.content, # Keep primary content
embedding=primary.embedding,
metadata=merged_metadata,
namespace=primary.namespace,
)
Advantages:
- Simple and fast
- No LLM costs
- Preserves more complete information
Limitations:
- Doesn't remove redundancy
- Doesn't synthesize information
- May keep duplicate facts
LLM Mode
The LLM mode intelligently merges content:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Create prompt
prompt = ChatPromptTemplate.from_messages([
("system",
"You are a memory consolidation system. Merge the two similar "
"memories below into a single, comprehensive memory. Keep all "
"important information and remove redundancy."),
("user", "Memory 1: {content1}\n\nMemory 2: {content2}")
])
# Build chain
chain = prompt | self.llm | StrOutputParser()
# Generate merged content
merged_content = await chain.ainvoke({
"content1": memory1.content,
"content2": memory2.content
})
# Use memory1 as base
merged_metadata = {**memory2.metadata, **memory1.metadata}
merged_metadata["merged_from"] = [memory1.id, memory2.id]
return MemoryUnit(
id=memory1.id,
memory_type=memory1.memory_type,
content=merged_content, # LLM-generated merged content
embedding=memory1.embedding,
metadata=merged_metadata,
namespace=memory1.namespace,
)
Advantages:
- Removes redundancy
- Synthesizes information
- Natural language output
- Preserves all important details
Limitations:
- Requires LLM API access
- Incurs API costs (use GPT-4o for quality)
- Slower than heuristic mode
- May lose subtle details
Fallback Strategy
LLM mode automatically falls back to heuristic on errors:
try:
merged_content = await chain.ainvoke({...})
return MemoryUnit(...)
except Exception:
# Fall back to heuristic on error
return await self._merge_memories_heuristic(memory1, memory2)
Configuration
Initialization Parameters
from memharness.agents import ConsolidatorAgent
from langchain.chat_models import init_chat_model
# Basic initialization (heuristic mode)
agent = ConsolidatorAgent(harness, threshold=0.85)
# With LLM for intelligent merging
llm = init_chat_model("gpt-4o") # Use GPT-4o for quality
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
Threshold Guidelines:
0.95: Very conservative (only merge near-duplicates)0.90: Recommended for entity consolidation0.85: Standard threshold for general consolidation0.80: Aggressive (may merge distinct memories)
YAML Configuration
agents:
consolidator:
enabled: true
llm: gpt-4o
threshold: 0.90
# Run daily at 3 AM
schedule: "0 3 * * *"
# Minimum memories before running
min_memories: 10
# Target memory types
memory_types:
- entity
- knowledge_base
Integration Patterns
1. Scheduled Consolidation (Background)
Run consolidation as a background task:
import asyncio
from datetime import datetime
async def nightly_consolidation():
"""Consolidate memories every night."""
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
# Consolidate entities (most likely to have duplicates)
entity_result = await agent.consolidate_memories(
min_memories=10,
memory_type="entity"
)
print(f"Entities: merged {entity_result['merged']}, deleted {entity_result['deleted']}")
# Consolidate knowledge base
kb_result = await agent.consolidate_memories(
min_memories=20,
memory_type="knowledge_base"
)
print(f"Knowledge: merged {kb_result['merged']}, deleted {kb_result['deleted']}")
# Schedule daily at 3 AM (use APScheduler, Celery, etc.)
2. On-Demand Consolidation
Manually trigger consolidation:
async def consolidate_on_demand():
"""User-triggered consolidation."""
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.85)
# Consolidate all memory types
result = await agent.consolidate_memories(min_memories=5)
print(f"Consolidation complete:")
print(f" Merged: {result['merged']}")
print(f" Deleted: {result['deleted']}")
3. Policy-Triggered Consolidation
Trigger based on memory size:
async def consolidate_if_needed():
"""Consolidate when memory exceeds threshold."""
# Check memory size (implementation-specific)
memory_count = await harness.count_memories()
if memory_count > 1000:
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
result = await agent.consolidate_memories(min_memories=10)
print(f"Memory exceeded 1000 entries. Consolidated: {result}")
4. Post-Import Consolidation
After bulk imports, consolidate duplicates:
async def import_and_consolidate(documents: list[str]):
"""Import documents and consolidate duplicates."""
# Import documents
for doc in documents:
await harness.add_knowledge(content=doc, source="import")
# Consolidate duplicates
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
result = await agent.consolidate_memories(
min_memories=len(documents),
memory_type="knowledge_base"
)
print(f"Imported {len(documents)}, consolidated {result['merged']}")
Best Practices
1. Use High Thresholds for Safety
# Conservative (safe for production)
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
# Aggressive (risky — may merge distinct memories)
# agent = ConsolidatorAgent(harness, llm=llm, threshold=0.75) # ❌ Too low
2. Use LLM Mode for Entity Consolidation
# Entities have many duplicates — use LLM for quality
llm = init_chat_model("gpt-4o")
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
# Example: Merge "John", "John Smith", "@john"
result = await agent.consolidate_memories(
min_memories=5,
memory_type="entity"
)
3. Run on a Schedule
# Use APScheduler for production
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Daily at 3 AM
scheduler.add_job(
nightly_consolidation,
trigger="cron",
hour=3,
minute=0
)
scheduler.start()
4. Monitor Consolidation Results
async def consolidate_with_logging():
"""Consolidate with detailed logging."""
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
before_count = await harness.count_memories()
result = await agent.consolidate_memories(min_memories=10)
after_count = await harness.count_memories()
print(f"Consolidation Report:")
print(f" Before: {before_count} memories")
print(f" After: {after_count} memories")
print(f" Merged: {result['merged']}")
print(f" Deleted: {result['deleted']}")
print(f" Reduction: {before_count - after_count} memories ({(before_count - after_count) / before_count * 100:.1f}%)")
5. Combine with Summarization
async def cleanup_pipeline():
"""Full memory cleanup pipeline."""
# Step 1: Consolidate duplicates
consolidator = ConsolidatorAgent(harness, llm=llm, threshold=0.90)
consolidate_result = await consolidator.consolidate_memories(min_memories=10)
# Step 2: Summarize old conversations
summarizer = SummarizerAgent(harness, llm=llm)
# (summarization logic)
print(f"Cleanup complete:")
print(f" Consolidated: {consolidate_result['merged']}")
Entity Consolidation Example
The most common use case is consolidating duplicate entities:
from memharness import MemoryHarness
from memharness.agents import ConsolidatorAgent
from langchain.chat_models import init_chat_model
async def consolidate_entities():
"""Consolidate duplicate entity mentions."""
harness = MemoryHarness("sqlite:///memory.db")
llm = init_chat_model("gpt-4o")
# Add duplicate entities (from different sources)
await harness.add_entity("John", "person", "User mentioned in chat")
await harness.add_entity("John Smith", "person", "Full name from profile")
await harness.add_entity("@john", "person", "Twitter handle")
await harness.add_entity("john@example.com", "person", "Email address")
# Consolidate (threshold 0.85 will merge similar names)
agent = ConsolidatorAgent(harness, llm=llm, threshold=0.85)
result = await agent.consolidate_memories(
min_memories=3,
memory_type="entity"
)
# Result: 4 entities → 1 merged entity
# Content: "John Smith: User mentioned in chat, known by @john and john@example.com"
print(f"Merged {result['merged']} entities")
Current Implementation Status
Note: The current implementation in memharness v0.5.x returns empty results ({"merged": 0, "deleted": 0}). This is a simplified placeholder implementation. A full production implementation would:
- Query the backend for all memories of a given type
- Compare embeddings pairwise to find similar memories (above threshold)
- Group similar memories into clusters
- Merge each cluster using heuristic or LLM mode
- Update the backend with merged memories
- Delete the original duplicates
Related Components
- Entity Extractor — Extracts entities (may create duplicates)
- Entity Memory — Stores entities
- Garbage Collector — Cleans old memories
- Context Assembly Agent — Benefits from consolidated memory
Next Steps
- Garbage Collector — Memory cleanup and archival
- Entity Memory — Entity storage details
- Context Assembler — Optimal context assembly