Connic
Connic Composer SDK

Middleware

Learn how to use middleware to run custom logic before and after agent execution. Attach documents, enrich context, validate inputs, and transform responses.

What is Middleware?

Hooks that run before and after your agent processes a request

Middleware are Python functions that intercept requests before they reach your agent and responses before they're returned. Use them to attach documents, enrich context, validate inputs, log interactions, or transform responses.

Auto-Discovery by Agent Name

Create a file in middleware/ with the same name as your agent. For example, middleware/assistant.py applies to the agent whose YAML has name: assistant, even if that YAML lives in a nested folder under agents/. No configuration needed!

Execution Flow

Request
before()
Agent
after()
Response

Basic Middleware

middleware/assistant.py
"""Middleware for the assistant agent."""
import json
from datetime import datetime
from typing import Any, Dict

async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """
    Called before the agent runs.

    Args:
        content: Dict with "role" and "parts" keys
        context: Shared run context dict (see Context docs)

    Returns:
        Modified content to pass to the agent
    """
    print(f"Before middleware running for run {context.get('run_id')}")

    # Add current timestamp so the agent knows today's date
    now = datetime.now()
    time_context = f"[Current time: {now.strftime('%A, %B %d, %Y')}]"
    content["parts"].insert(0, {"text": time_context})

    return content

async def after(response: str, context: Dict[str, Any]) -> str:
    """
    Called after the agent completes.

    Args:
        response: The agent's response text
        context: Shared run context dict (see Context docs)

    Returns:
        Modified response (or original)
    """
    print(f"After middleware — response length: {len(response)}")

    # Wrap response in structured JSON with metadata
    return json.dumps({
        "response": response,
        "run_id": context.get("run_id"),
        "duration_ms": context.get("duration_ms"),
    })

Both hooks are optional. You can define just before(), just after(), or both.

The Content Dict

The before() hook receives a content dict that represents the user's message. It contains a role (always 'user') and a list of parts. Each part is a dict with either text or binary data.

middleware/example.py
# The content dict structure
content = {
    "role": "user",         # The role: 'user' or 'model'
    "parts": [              # List of part dicts
        {"text": "Hello, analyze this document"},
        {"data": b"...", "mime_type": "application/pdf"},
    ]
}

# Accessing content parts
for part in content["parts"]:
    if "text" in part:
        # Text content
        text = part["text"]
    elif "data" in part:
        # Binary file content
        mime_type = part["mime_type"]
        data = part["data"]

Creating Parts

Create parts as simple dicts with text for text content or data + mime_type for files.

middleware/example.py
# Creating parts as dicts

# Text part
text_part = {"text": "Analyze this document"}

# File from bytes (PDFs, images, audio, video)
with open("document.pdf", "rb") as f:
    pdf_part = {"data": f.read(), "mime_type": "application/pdf"}

# Image from bytes
with open("image.png", "rb") as f:
    image_part = {"data": f.read(), "mime_type": "image/png"}

# Adding parts to content
content["parts"].append(pdf_part)
content["parts"].insert(0, text_part)

Common Use Cases

Attaching Documents Dynamically

Add PDFs, images, or other files to the agent's context based on the user's request.

middleware/document-agent.py
"""Attach documents dynamically based on request context."""
from typing import Any, Dict

async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Attach relevant documents to the agent context."""
    
    # Get the user's text message
    user_text = ""
    for part in content["parts"]:
        if "text" in part:
            user_text = part["text"]
            break
    
    # Attach a knowledge base document based on topic
    if "pricing" in user_text.lower():
        with open("docs/pricing.pdf", "rb") as f:
            content["parts"].append({
                "data": f.read(),
                "mime_type": "application/pdf"
            })
    
    elif "technical" in user_text.lower():
        with open("docs/technical-specs.pdf", "rb") as f:
            content["parts"].append({
                "data": f.read(),
                "mime_type": "application/pdf"
            })
    
    return content

Adding Context & Customer Data

Prepend system context or fetch data from external APIs to enrich the request.

middleware/support-agent.py
"""Add system context and customer data to requests."""
from typing import Any, Dict
import httpx

async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Prepend system context to the user message."""

    # Fetch customer data from your API
    customer_id = context.get("customer_id")
    customer_info = ""

    if customer_id:
        print(f"Fetching customer data for {customer_id}")
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"https://api.example.com/customers/{customer_id}")
            if resp.status_code == 200:
                data = resp.json()
                customer_info = f"""
Customer: {data['name']}
Plan: {data['plan']}
Account Status: {data['status']}
"""
                print(f"Loaded customer: {data['name']} ({data['plan']})")

    # Prepend context as text part
    content["parts"].insert(0, {
        "text": f"[CUSTOMER CONTEXT]\n{customer_info}\n[END CONTEXT]"
    })

    return content

Graceful Stop with StopProcessing

middleware/auth.py
"""Authentication middleware - stops processing if invalid."""
from typing import Any, Dict
from connic import StopProcessing

async def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Validate request before agent runs."""
    
    # Extract text from first part to check for API key
    user_text = ""
    for part in content["parts"]:
        if "text" in part:
            user_text = part["text"]
            break
    
    # Check if request contains valid auth token
    if "AUTH:" not in user_text:
        raise StopProcessing("Authentication required")
    
    return content

StopProcessing vs Exceptions: StopProcessing returns a response gracefully (run marked as completed). Regular exceptions mark the run as failed. The same mechanism works for tools in tools/. See Write Tools

Analytics & Logging

middleware/analytics.py
"""Send analytics to external service after agent completes."""
import httpx
from typing import Any, Dict

async def after(response: str, context: Dict[str, Any]) -> str:
    """Send analytics after agent completes."""
    
    try:
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://analytics.internal/events",
                json={
                    "event": "agent_run_completed",
                    "run_id": context.get("run_id"),
                    "agent": context.get("agent_name"),
                    "duration_ms": context.get("duration_ms"),
                    "tokens": context.get("token_usage", {}),
                }
            )
    except Exception:
        pass  # Don't fail the request if analytics fails
    
    return response

Response Transformation

middleware/formatter.py
"""Transform response format for specific use cases."""
import json
from typing import Any, Dict

def after(response: str, context: Dict[str, Any]) -> str:
    """Wrap response in a standard API format."""
    
    return json.dumps({
        "success": True,
        "data": response,
        "metadata": {
            "run_id": context.get("run_id"),
            "processed_at": context.get("timestamp"),
        }
    })

The context dict is shared and mutable. Values you set in before() are available in prompts via {var} syntax, in tools, and in the after() hook. See the Context documentation for full details.

Project Structure

project-structure
my-agent-project/
├── agents/
│   ├── assistant.yaml
│   ├── customer-support.yaml
│   └── invoice-processor.yaml
├── middleware/
│   ├── assistant.py           # Applied to 'assistant' agent
│   ├── customer-support.py    # Applied to 'customer-support' agent
│   └── analytics.py           # ❌ No matching agent - ignored
└── tools/
    └── ...

Sync and Async Support

middleware/simple.py
"""Sync middleware also works."""
from typing import Any, Dict

def before(content: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    """Sync functions are automatically handled."""
    # Add a simple prefix to the first text part
    for part in content["parts"]:
        if "text" in part:
            part["text"] = "[Processed] " + part["text"]
            break
    return content

def after(response: str, context: Dict[str, Any]) -> str:
    """Both sync and async are supported."""
    return response.strip()

Use async functions for I/O operations (API calls, database queries, file reads). Sync functions are fine for simple transformations.

Best Practices
  • Attach relevant documents: Use middleware to dynamically add context documents
  • Keep files small: Large files increase latency and token usage
  • Always return content/response: Forgetting to return will pass None to the next stage
  • Use async for I/O: File reads, HTTP calls, database queries
  • Use context for shared data: Set values in before() to share data with prompts and tools (learn more)
Error Handling

If a middleware function raises an exception, the entire request fails. The run is marked as failed, and for before() the agent never executes. Use middleware as a gatekeeper for validation and authentication.

Middleware & Retries

By default, before() runs once before agent execution, even when retries are configured. To re-run the before() middleware on each retry attempt, set retry_options.rerun_middleware: true in your agent YAML.

When enabled, each retry rebuilds the content from the original payload and calls before() again, allowing middleware to refresh dynamic context (e.g., fetching updated external state after partial tool changes). The context dict accumulates state across retries — use context.get("retry_attempt", 0) to check which attempt is running. The after() middleware always runs once after all retries complete.