Middleware
Learn how to use middleware to run custom logic before and after agent execution. Attach documents, enrich context, validate inputs, and transform responses.
What is Middleware?
Hooks that run before and after your agent processes a request
Middleware are Python functions that intercept requests before they reach your agent and responses before they're returned. Use them to attach documents, enrich context, validate inputs, log interactions, or transform responses.
Create a file in middleware/ with the same name as your agent. For example, middleware/assistant.py applies to agents/assistant.yaml. No configuration needed!
Execution Flow
Basic Middleware
# middleware/assistant.py
"""Middleware for the assistant agent."""
import json
from datetime import datetime
from typing import Any, Dict
async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Called before the agent runs.
Args:
content: Dict with "role" and "parts" keys
context: Shared run context dict (see Context docs)
Returns:
Modified content to pass to the agent
"""
# Add current timestamp so the agent knows today's date
now = datetime.now()
time_context = f"[Current time: {now.strftime('%A, %B %d, %Y')}]"
content["parts"].insert(0, {"text": time_context})
return content
async def after(response: str, context: Dict[str, Any]) -> str:
"""
Called after the agent completes.
Args:
response: The agent's response text
context: Shared run context dict (see Context docs)
Returns:
Modified response (or original)
"""
# Wrap response in structured JSON with metadata
return json.dumps({
"response": response,
"run_id": context.get("run_id"),
"duration_ms": context.get("duration_ms"),
})Both hooks are optional. You can define just before(), just after(), or both.
The Content Dict
The before() hook receives a content dict that represents the user's message. It contains a role (always 'user') and a list of parts. Each part is a dict with either text or binary data.
# The content dict structure
content = {
"role": "user", # The role: 'user' or 'model'
"parts": [ # List of part dicts
{"text": "Hello, analyze this document"},
{"data": b"...", "mime_type": "application/pdf"},
]
}
# Accessing content parts
for part in content["parts"]:
if "text" in part:
# Text content
text = part["text"]
elif "data" in part:
# Binary file content
mime_type = part["mime_type"]
data = part["data"]Creating Parts
Create parts as simple dicts with text for text content or data + mime_type for files.
# Creating parts as dicts
# Text part
text_part = {"text": "Analyze this document"}
# File from bytes (PDFs, images, audio, video)
with open("document.pdf", "rb") as f:
pdf_part = {"data": f.read(), "mime_type": "application/pdf"}
# Image from bytes
with open("image.png", "rb") as f:
image_part = {"data": f.read(), "mime_type": "image/png"}
# Adding parts to content
content["parts"].append(pdf_part)
content["parts"].insert(0, text_part)Common Use Cases
Attaching Documents Dynamically
Add PDFs, images, or other files to the agent's context based on the user's request.
# middleware/document-agent.py
"""Attach documents dynamically based on request context."""
from typing import Any, Dict
async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Attach relevant documents to the agent context."""
# Get the user's text message
user_text = ""
for part in content["parts"]:
if "text" in part:
user_text = part["text"]
break
# Attach a knowledge base document based on topic
if "pricing" in user_text.lower():
with open("docs/pricing.pdf", "rb") as f:
content["parts"].append({
"data": f.read(),
"mime_type": "application/pdf"
})
elif "technical" in user_text.lower():
with open("docs/technical-specs.pdf", "rb") as f:
content["parts"].append({
"data": f.read(),
"mime_type": "application/pdf"
})
return contentAdding Context & Customer Data
Prepend system context or fetch data from external APIs to enrich the request.
# middleware/support-agent.py
"""Add system context and customer data to requests."""
from typing import Any, Dict
import httpx
async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Prepend system context to the user message."""
# Fetch customer data from your API
customer_id = context.get("customer_id")
customer_info = ""
if customer_id:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.example.com/customers/{customer_id}")
if resp.status_code == 200:
data = resp.json()
customer_info = f"""
Customer: {data['name']}
Plan: {data['plan']}
Account Status: {data['status']}
"""
# Prepend context as text part
content["parts"].insert(0, {
"text": f"[CUSTOMER CONTEXT]\n{customer_info}\n[END CONTEXT]"
})
return contentGraceful Stop with StopProcessing
# middleware/auth.py
"""Authentication middleware - stops processing if invalid."""
from typing import Any, Dict
from connic.core import StopProcessing
async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Validate request before agent runs."""
# Extract text from first part to check for API key
user_text = ""
for part in content["parts"]:
if "text" in part:
user_text = part["text"]
break
# Check if request contains valid auth token
if "AUTH:" not in user_text:
raise StopProcessing("Authentication required")
return contentStopProcessing vs Exceptions: StopProcessing returns a response gracefully (run marked as completed). Regular exceptions mark the run as failed.
Analytics & Logging
# middleware/analytics.py
"""Send analytics to external service after agent completes."""
import httpx
from typing import Any, Dict
async def after(response: str, context: Dict[str, Any]) -> str:
"""Send analytics after agent completes."""
try:
async with httpx.AsyncClient() as client:
await client.post(
"https://analytics.internal/events",
json={
"event": "agent_run_completed",
"run_id": context.get("run_id"),
"agent": context.get("agent_name"),
"duration_ms": context.get("duration_ms"),
"tokens": context.get("token_usage", {}),
}
)
except Exception:
pass # Don't fail the request if analytics fails
return responseResponse Transformation
# middleware/formatter.py
"""Transform response format for specific use cases."""
import json
from typing import Any, Dict
def after(response: str, context: Dict[str, Any]) -> str:
"""Wrap response in a standard API format."""
return json.dumps({
"success": True,
"data": response,
"metadata": {
"run_id": context.get("run_id"),
"processed_at": context.get("timestamp"),
}
})The context dict is shared and mutable. Values you set in before() are available in prompts via {var} syntax, in tools, and in the after() hook. See the Context documentation for full details.
Project Structure
my-agent-project/
├── agents/
│ ├── assistant.yaml
│ ├── customer-support.yaml
│ └── invoice-processor.yaml
├── middleware/
│ ├── assistant.py # Applied to 'assistant' agent
│ ├── customer-support.py # Applied to 'customer-support' agent
│ └── analytics.py # ❌ No matching agent - ignored
└── tools/
└── ...Sync and Async Support
# middleware/simple.py
"""Sync middleware also works."""
from typing import Any, Dict
def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Sync functions are automatically handled."""
# Add a simple prefix to the first text part
for part in content["parts"]:
if "text" in part:
part["text"] = "[Processed] " + part["text"]
break
return content
def after(response: str, context: Dict[str, Any]) -> str:
"""Both sync and async are supported."""
return response.strip()Use async functions for I/O operations (API calls, database queries, file reads). Sync functions are fine for simple transformations.
- Attach relevant documents: Use middleware to dynamically add context documents
- Keep files small: Large files increase latency and token usage
- Always return content/response: Forgetting to return will pass None to the next stage
- Use async for I/O: File reads, HTTP calls, database queries
- Use context for shared data: Set values in
before()to share data with prompts and tools (learn more)
If a middleware function raises an exception, the entire request fails. The run is marked as failed, and for before() the agent never executes. Use middleware as a gatekeeper for validation and authentication.