Apache Kafka
Connect your agents to Apache Kafka for real-time event streaming. Consume messages to trigger agents (inbound) or produce agent results to topics (outbound).
Inbound (Consumer)
Consume messages from Kafka topics and trigger agent runs. Each message is processed by all linked agents, with full Kafka metadata preserved.
How Inbound Works
Inbound Kafka connectors act as consumers that subscribe to a topic. When a message arrives, it's parsed and dispatched to all linked agents. Best for event-driven processing, real-time pipelines, streaming analytics, and microservices communication.
Configuration
- Bootstrap Servers: Kafka broker addresses (e.g.,
kafka:9092) - Topic: The topic to consume from
- Group ID (optional): Consumer group identifier
- Auto Offset Reset (optional): Latest (new only) or Earliest (from beginning)
Security Settings
- Security Protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL (recommended)
- SASL Mechanism: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512
- Username & Password: Required for SASL
Tip: For managed Kafka (Confluent, MSK, Aiven), use SASL_SSL with SCRAM-SHA-256.
Message Payload
{
"order_id": "12345",
"customer": "john@example.com",
"items": ["widget-a", "widget-b"],
"_kafka": {
"topic": "orders",
"partition": 0,
"offset": 1542,
"timestamp": 1705312800000,
"key": "order-12345"
}
}The _kafka metadata includes: topic, partition, offset, timestamp, and key.
End-to-End Example
Create an inbound connector for your source topic, link it to an agent, and link an outbound connector to publish the agent output to a destination topic.
# agents/order-processor.yaml
version: "1.0"
name: order-processor
type: llm
model: gemini/gemini-2.5-flash
description: "Validate orders and compute routing"
system_prompt: |
You receive an order event in JSON (from Kafka).
1) Call orders.validate_order
2) Call orders.score_risk
3) Return JSON with order_id, status, risk_score, route
tools:
- orders.validate_order
- orders.score_risk
output_schema: order-result.json# tools/orders.py
from typing import Dict, Any
def validate_order(order_id: str, items: list[str]) -> Dict[str, Any]:
"""Basic order validation."""
if not order_id or not items:
return {"ok": False, "reason": "missing_fields"}
return {"ok": True}
async def score_risk(customer: str, total: float) -> Dict[str, Any]:
"""Return a simple risk score and routing hint."""
score = 0.02 if total < 100 else 0.12
route = "standard" if score < 0.1 else "manual_review"
return {"risk_score": score, "route": route}The outbound Kafka connector publishes the agent's JSON output as the message body:
{
"order_id": "12345",
"status": "approved",
"risk_score": 0.02,
"route": "standard"
}Consumer Groups
Each connector uses a consumer group to track processed messages. Different group IDs = same messages to all. Same group ID = load sharing.
Connection Resilience
- Auto reconnection, exponential backoff, config change detection, offset tracking
Outbound (Producer)
Produce agent results to Kafka topics when runs complete. Results are automatically sent to your configured topic with preserved message keys for correlation.
How Outbound Works
Outbound Kafka connectors act as producers. When linked agents complete runs, results are automatically published to the configured topic. Best for event-driven architectures and processing pipelines.
Configuration
- Bootstrap Servers: Kafka broker addresses
- Topic: Topic to publish results to
Security
Same as inbound: SASL_SSL with SCRAM-SHA-256 recommended for managed Kafka services.
Output Payload
{
"run_id": "550e8400-e29b-41d4-a716-446655440000",
"agent_name": "order-processor",
"status": "completed",
"output": "Order processed successfully. Total: $234.56",
"error": null,
"started_at": "2024-01-15T10:30:00Z",
"ended_at": "2024-01-15T10:30:05Z",
"token_usage": {
"prompt_tokens": 150,
"candidates_tokens": 50,
"total_tokens": 200
}
}Includes run_id, agent_name, status, output, error, timestamps, and token_usage.
Message Key Correlation
When triggered by an inbound message with a key, the outbound producer uses the same key, preserving ordering within partitions.
# Inbound message with key "order-123"
Message received → Agent triggered → Run completes
# Outbound message uses same key "order-123"
Result published with key: "order-123" (source: original)
# If no inbound key, uses run_id
Result published with key: "550e8400-e29b..." (source: run_id)Delivery Guarantees
- Full replication acknowledgment, automatic retries, connection pooling, run tracking
Advanced Patterns
For multi-topic streams, create one inbound connector per topic and link them to the same correlator agent. Use _kafka.key (or a shared order_id) to correlate events and store partial state in an external DB/Redis via tools. Retries are configured on the agent:
# agents/event-correlator.yaml
version: "1.0"
name: event-correlator
type: llm
model: gemini/gemini-2.5-pro
description: "Correlate order + shipment events across topics"
system_prompt: |
Use the correlation tools to store incoming events by _kafka.key.
Only respond when both "orders" and "shipments" are present.
tools:
- correlation.upsert_event
- correlation.build_snapshot
retry_options:
attempts: 5
max_delay: 30