Connic
Connic Composer SDK

Middleware

Learn how to use middleware to run custom logic before and after agent execution. Attach documents, enrich context, validate inputs, and transform responses.

What is Middleware?

Hooks that run before and after your agent processes a request

Middleware are Python functions that intercept requests before they reach your agent and responses before they're returned. Use them to attach documents, enrich context, validate inputs, log interactions, or transform responses.

Auto-Discovery by Agent Name

Create a file in middleware/ with the same name as your agent. For example, middleware/assistant.py applies to agents/assistant.yaml. No configuration needed!

Execution Flow

Request
before()
Agent
after()
Response

Basic Middleware

python
# middleware/assistant.py
"""Middleware for the assistant agent."""
import json
from datetime import datetime
from typing import Any, Dict


async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """
    Called before the agent runs.
    
    Args:
        content: Dict with "role" and "parts" keys
        context: Shared run context dict (see Context docs)
    
    Returns:
        Modified content to pass to the agent
    """
    # Add current timestamp so the agent knows today's date
    now = datetime.now()
    time_context = f"[Current time: {now.strftime('%A, %B %d, %Y')}]"
    content["parts"].insert(0, {"text": time_context})
    
    return content


async def after(response: str, context: Dict[str, Any]) -> str:
    """
    Called after the agent completes.
    
    Args:
        response: The agent's response text
        context: Shared run context dict (see Context docs)
    
    Returns:
        Modified response (or original)
    """
    # Wrap response in structured JSON with metadata
    return json.dumps({
        "response": response,
        "run_id": context.get("run_id"),
        "duration_ms": context.get("duration_ms"),
    })

Both hooks are optional. You can define just before(), just after(), or both.

The Content Dict

The before() hook receives a content dict that represents the user's message. It contains a role (always 'user') and a list of parts. Each part is a dict with either text or binary data.

python
# The content dict structure
content = {
    "role": "user",         # The role: 'user' or 'model'
    "parts": [              # List of part dicts
        {"text": "Hello, analyze this document"},
        {"data": b"...", "mime_type": "application/pdf"},
    ]
}

# Accessing content parts
for part in content["parts"]:
    if "text" in part:
        # Text content
        text = part["text"]
    elif "data" in part:
        # Binary file content
        mime_type = part["mime_type"]
        data = part["data"]

Creating Parts

Create parts as simple dicts with text for text content or data + mime_type for files.

python
# Creating parts as dicts

# Text part
text_part = {"text": "Analyze this document"}

# File from bytes (PDFs, images, audio, video)
with open("document.pdf", "rb") as f:
    pdf_part = {"data": f.read(), "mime_type": "application/pdf"}

# Image from bytes
with open("image.png", "rb") as f:
    image_part = {"data": f.read(), "mime_type": "image/png"}

# Adding parts to content
content["parts"].append(pdf_part)
content["parts"].insert(0, text_part)

Common Use Cases

Attaching Documents Dynamically

Add PDFs, images, or other files to the agent's context based on the user's request.

python
# middleware/document-agent.py
"""Attach documents dynamically based on request context."""
from typing import Any, Dict


async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Attach relevant documents to the agent context."""
    
    # Get the user's text message
    user_text = ""
    for part in content["parts"]:
        if "text" in part:
            user_text = part["text"]
            break
    
    # Attach a knowledge base document based on topic
    if "pricing" in user_text.lower():
        with open("docs/pricing.pdf", "rb") as f:
            content["parts"].append({
                "data": f.read(),
                "mime_type": "application/pdf"
            })
    
    elif "technical" in user_text.lower():
        with open("docs/technical-specs.pdf", "rb") as f:
            content["parts"].append({
                "data": f.read(),
                "mime_type": "application/pdf"
            })
    
    return content

Adding Context & Customer Data

Prepend system context or fetch data from external APIs to enrich the request.

python
# middleware/support-agent.py
"""Add system context and customer data to requests."""
from typing import Any, Dict
import httpx


async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Prepend system context to the user message."""
    
    # Fetch customer data from your API
    customer_id = context.get("customer_id")
    customer_info = ""
    
    if customer_id:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"https://api.example.com/customers/{customer_id}")
            if resp.status_code == 200:
                data = resp.json()
                customer_info = f"""
Customer: {data['name']}
Plan: {data['plan']}
Account Status: {data['status']}
"""
    
    # Prepend context as text part
    content["parts"].insert(0, {
        "text": f"[CUSTOMER CONTEXT]\n{customer_info}\n[END CONTEXT]"
    })
    
    return content

Graceful Stop with StopProcessing

python
# middleware/auth.py
"""Authentication middleware - stops processing if invalid."""
from typing import Any, Dict
from connic.core import StopProcessing


async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Validate request before agent runs."""
    
    # Extract text from first part to check for API key
    user_text = ""
    for part in content["parts"]:
        if "text" in part:
            user_text = part["text"]
            break
    
    # Check if request contains valid auth token
    if "AUTH:" not in user_text:
        raise StopProcessing("Authentication required")
    
    return content

StopProcessing vs Exceptions: StopProcessing returns a response gracefully (run marked as completed). Regular exceptions mark the run as failed.

Analytics & Logging

python
# middleware/analytics.py
"""Send analytics to external service after agent completes."""
import httpx
from typing import Any, Dict


async def after(response: str, context: Dict[str, Any]) -> str:
    """Send analytics after agent completes."""
    
    try:
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://analytics.internal/events",
                json={
                    "event": "agent_run_completed",
                    "run_id": context.get("run_id"),
                    "agent": context.get("agent_name"),
                    "duration_ms": context.get("duration_ms"),
                    "tokens": context.get("token_usage", {}),
                }
            )
    except Exception:
        pass  # Don't fail the request if analytics fails
    
    return response

Response Transformation

python
# middleware/formatter.py
"""Transform response format for specific use cases."""
import json
from typing import Any, Dict


def after(response: str, context: Dict[str, Any]) -> str:
    """Wrap response in a standard API format."""
    
    return json.dumps({
        "success": True,
        "data": response,
        "metadata": {
            "run_id": context.get("run_id"),
            "processed_at": context.get("timestamp"),
        }
    })

The context dict is shared and mutable. Values you set in before() are available in prompts via {var} syntax, in tools, and in the after() hook. See the Context documentation for full details.

Project Structure

plaintext
my-agent-project/
├── agents/
│   ├── assistant.yaml
│   ├── customer-support.yaml
│   └── invoice-processor.yaml
├── middleware/
│   ├── assistant.py           # Applied to 'assistant' agent
│   ├── customer-support.py    # Applied to 'customer-support' agent
│   └── analytics.py           # ❌ No matching agent - ignored
└── tools/
    └── ...

Sync and Async Support

python
# middleware/simple.py
"""Sync middleware also works."""
from typing import Any, Dict


def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Sync functions are automatically handled."""
    # Add a simple prefix to the first text part
    for part in content["parts"]:
        if "text" in part:
            part["text"] = "[Processed] " + part["text"]
            break
    return content


def after(response: str, context: Dict[str, Any]) -> str:
    """Both sync and async are supported."""
    return response.strip()

Use async functions for I/O operations (API calls, database queries, file reads). Sync functions are fine for simple transformations.

Best Practices
  • Attach relevant documents: Use middleware to dynamically add context documents
  • Keep files small: Large files increase latency and token usage
  • Always return content/response: Forgetting to return will pass None to the next stage
  • Use async for I/O: File reads, HTTP calls, database queries
  • Use context for shared data: Set values in before() to share data with prompts and tools (learn more)
Error Handling

If a middleware function raises an exception, the entire request fails. The run is marked as failed, and for before() the agent never executes. Use middleware as a gatekeeper for validation and authentication.