Usage with LangChain
memharness wraps your LangChain agent with a single middleware that handles all memory operations — BEFORE and AFTER every model call.
Install
pip install memharness langchain langchain-anthropic
Quick Start — Three Middleware
import asyncio
from memharness import MemoryHarness
from memharness.tools import get_read_tools
from memharness.agents import ContextAssemblyAgent
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# ── BEFORE middleware: load context ──────────────────────────────────
class ContextMiddleware(AgentMiddleware):
"""BEFORE: inject KB, entities, workflows, persona as SystemMessage."""
def __init__(self, harness: MemoryHarness, thread_id: str, max_tokens: int = 4000):
super().__init__()
self._ctx = ContextAssemblyAgent(harness, max_tokens=max_tokens)
self.thread_id = thread_id
async def abefore_model(self, state, runtime):
messages = state.get("messages", [])
query = next((m.content for m in reversed(messages) if isinstance(m, HumanMessage)), "")
if not query:
return None
ctx = await self._ctx.assemble(query=query, thread_id=self.thread_id, include_tools=False)
sections = []
if ctx.persona: sections.append(f"## Agent Persona\n{ctx.persona}")
if ctx.knowledge: sections.append(f"## Relevant Knowledge\n{ctx.knowledge}")
if ctx.entities: sections.append(f"## Known Entities\n{ctx.entities}")
if ctx.workflows: sections.append(f"## Relevant Workflows\n{ctx.workflows}")
if not sections:
return None
return {"messages": [SystemMessage(content="\n\n".join(sections))] + list(messages)}
async def aafter_model(self, state, runtime):
return None
# ── BEFORE+AFTER middleware: conversation persistence ────────────────
class ConversationMiddleware(AgentMiddleware):
"""BEFORE: load past messages. AFTER: save new messages."""
def __init__(self, harness: MemoryHarness, thread_id: str):
super().__init__()
self.harness = harness
self.thread_id = thread_id
self._loaded = 0
async def abefore_model(self, state, runtime):
memories = await self.harness.get_conversational(self.thread_id, limit=50)
if not memories:
return None
past = []
for m in memories:
role = m.metadata.get("role", "user")
if role in ("user", "human"):
past.append(HumanMessage(content=m.content))
elif role in ("assistant", "ai"):
past.append(AIMessage(content=m.content))
current = list(state.get("messages", []))
self._loaded = len(current)
return {"messages": past + current}
async def aafter_model(self, state, runtime):
messages = state.get("messages", [])
for msg in messages[self._loaded:]:
if isinstance(msg, HumanMessage):
await self.harness.add_conversational(self.thread_id, "user", msg.content)
elif isinstance(msg, AIMessage) and msg.content:
await self.harness.add_conversational(self.thread_id, "assistant", msg.content)
self._loaded = len(messages)
return None
# ── AFTER middleware: invokes the LangGraph workflow from package ────
from memharness.agents.agent_workflow import create_after_workflow
class AfterMiddleware(AgentMiddleware):
"""Wraps the packaged LangGraph workflow as a LangChain AFTER middleware.
The workflow runs: save_response → extract_entities → save_workflow → check_summarization
"""
def __init__(self, harness, thread_id):
super().__init__()
self.harness = harness
self.thread_id = thread_id
self._graph = create_after_workflow(harness)
self._steps = []
async def abefore_model(self, state, runtime):
return None
async def aafter_model(self, state, runtime):
msgs = state.get("messages", [])
last = msgs[-1] if msgs else None
# Track tool calls
if hasattr(last, "tool_calls") and last.tool_calls:
for tc in last.tool_calls:
self._steps.append(f"{tc.get('name', '?')}()")
return None
# Final answer — run the LangGraph workflow
if not isinstance(last, AIMessage) or not last.content:
return None
await self._graph.ainvoke({
"messages": msgs,
"thread_id": self.thread_id,
"response_text": last.content,
"steps": self._steps,
"entities_extracted": 0,
"workflow_saved": False,
"summarized": False,
})
self._steps = []
return None
async def main():
harness = MemoryHarness("sqlite:///agent_memory.db")
await harness.connect()
# Pre-load knowledge (one-time)
await harness.add_knowledge("Deployments require platform team approval", source="runbook")
thread_id = "user-alice"
agent = create_agent(
model="anthropic:claude-sonnet-4-6",
tools=get_read_tools(harness), # 5 read-only tools
middleware=[
ContextMiddleware(harness, thread_id), # BEFORE: inject context
ConversationMiddleware(harness, thread_id), # BEFORE+AFTER: messages
AfterMiddleware(harness, thread_id), # AFTER: extract entities
],
)
# Turn 1
r1 = await agent.ainvoke({
"messages": [{"role": "user", "content": "How do I deploy to production?"}]
})
print("Turn 1:", r1["messages"][-1].content)
# Turn 2 — memory persists!
r2 = await agent.ainvoke({
"messages": [{"role": "user", "content": "What did I just ask about?"}]
})
print("Turn 2:", r2["messages"][-1].content)
await harness.disconnect()
asyncio.run(main())
How It Works
Your agent only gets read tools — it can search and read memory. The middleware handles all writes — saves messages, extracts entities, manages summaries.
What the Agent Sees
On every turn, the middleware injects context so the LLM sees:
[SystemMessage]
## Agent Persona
You are a helpful DevOps assistant...
## Relevant Knowledge
- Deployments require approval from the platform team
- Use kubectl apply -f deployment.yaml
## Known Entities
- Alice (PERSON): Engineer at SAP
## Relevant Workflows
- Deploy app: Build → Test → Push → Apply
[HumanMessage] "previous question"
[AIMessage] "previous answer"
[HumanMessage] "How do I deploy?" ← current query
The agent doesn't need to call any tools to get this context — the middleware provides it automatically.
The 5 Read Tools
Your agent also gets 5 tools for deeper memory exploration:
from memharness.tools import get_read_tools
tools = get_read_tools(harness)
| Tool | What the agent can do |
|---|---|
memory_search | Search across all memory types by query |
memory_read | Read a specific memory by ID |
expand_summary | Expand a compacted summary to full content |
assemble_context | Manually trigger full context assembly |
toolbox_search | Discover available tools (VFS tree + grep) |
These are for when the agent needs to dig deeper — the middleware provides the basics automatically.
Summarization
After summarization, the middleware loads summary + recent messages (not all messages):
Before summarization: [msg1, msg2, ... msg50] ← 50 messages
After summarization: [Summary of msg1-40] + [msg41-50] ← compact!
If the agent needs detail from the summary → it calls the expand_summary tool.
Configurable:
max_tokens=4000— context budget (default 4000, estimated as chars/4)summarize_threshold=0.8— trigger summarization at 80%
Advanced: Granular Middleware
If you want fine-grained control, split into separate middleware:
# Instead of one MemharnessMiddleware, use individual ones:
middleware=[
MemharnessSummarizationMiddleware(harness, thread_id),
MemharnessContextMiddleware(harness, thread_id),
MemharnessConversationMiddleware(harness, thread_id),
MemharnessEntityMiddleware(harness),
MemharnessWorkflowMiddleware(harness, thread_id),
]
See the full individual middleware implementations below.
Individual Middleware Implementations (click to expand)
MemharnessConversationMiddleware
class MemharnessConversationMiddleware(AgentMiddleware):
"""Persist conversation history to/from memharness."""
def __init__(self, harness, thread_id):
super().__init__()
self.harness = harness
self.thread_id = thread_id
self._loaded_count = 0
async def abefore_model(self, state, runtime):
memories = await self.harness.get_conversational(self.thread_id, limit=50)
if not memories:
return None
past = []
for m in memories:
role = m.metadata.get("role", "user")
if role in ("user", "human"):
past.append(HumanMessage(content=m.content))
elif role in ("assistant", "ai"):
past.append(AIMessage(content=m.content))
current = list(state.get("messages", []))
self._loaded_count = len(current)
return {"messages": past + current}
async def aafter_model(self, state, runtime):
messages = state.get("messages", [])
for msg in messages[self._loaded_count:]:
if isinstance(msg, HumanMessage):
await self.harness.add_conversational(self.thread_id, "user", msg.content)
elif isinstance(msg, AIMessage) and msg.content:
await self.harness.add_conversational(self.thread_id, "assistant", msg.content)
self._loaded_count = len(messages)
return None
MemharnessContextMiddleware
class MemharnessContextMiddleware(AgentMiddleware):
"""Inject KB, entities, workflows, persona as SystemMessage."""
def __init__(self, harness, thread_id, max_tokens=4000):
super().__init__()
self.harness = harness
self.thread_id = thread_id
self._ctx_agent = ContextAssemblyAgent(harness, max_tokens=max_tokens)
async def abefore_model(self, state, runtime):
messages = state.get("messages", [])
query = next((m.content for m in reversed(messages) if isinstance(m, HumanMessage)), "")
if not query:
return None
ctx = await self._ctx_agent.assemble(query=query, thread_id=self.thread_id, include_tools=False)
sections = []
if ctx.persona: sections.append(f"## Agent Persona\n{ctx.persona}")
if ctx.knowledge: sections.append(f"## Relevant Knowledge\n{ctx.knowledge}")
if ctx.entities: sections.append(f"## Known Entities\n{ctx.entities}")
if ctx.workflows: sections.append(f"## Relevant Workflows\n{ctx.workflows}")
if not sections:
return None
return {"messages": [SystemMessage(content="\n\n".join(sections))] + list(messages)}
MemharnessEntityMiddleware
class MemharnessEntityMiddleware(AgentMiddleware):
"""Extract entities from AI responses."""
def __init__(self, harness):
super().__init__()
self.harness = harness
self._extractor = EntityExtractorAgent(harness)
async def aafter_model(self, state, runtime):
messages = state.get("messages", [])
last = messages[-1] if messages else None
if not isinstance(last, AIMessage) or not last.content:
return None
try:
entities = await self._extractor.extract_entities(last.content)
for cat, names in entities.items():
for name in names:
await self.harness.add_entity(name, cat, f"{cat}: {name}")
except Exception:
pass
return None