Connic
Connectors

Apache Kafka

Connect your agents to Apache Kafka for real-time event streaming. Consume messages to trigger agents (inbound) or produce agent results to topics (outbound).

Inbound (Consumer)

Consume messages from Kafka topics and trigger agent runs. Each message is processed by all linked agents, with full Kafka metadata preserved.

How Inbound Works

Inbound Kafka connectors act as consumers that subscribe to a topic. When a message arrives, it's parsed and dispatched to all linked agents. Best for event-driven processing, real-time pipelines, streaming analytics, and microservices communication.

Configuration

  • Bootstrap Servers: Kafka broker addresses (e.g., kafka:9092)
  • Topic: The topic to consume from
  • Group ID (optional): Consumer group identifier
  • Auto Offset Reset (optional): Latest (new only) or Earliest (from beginning)

Security Settings

  • Security Protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL (recommended)
  • SASL Mechanism: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512
  • Username & Password: Required for SASL

Tip: For managed Kafka (Confluent, MSK, Aiven), use SASL_SSL with SCRAM-SHA-256.

Message Payload

json
{
  "order_id": "12345",
  "customer": "john@example.com",
  "items": ["widget-a", "widget-b"],
  "_kafka": {
    "topic": "orders",
    "partition": 0,
    "offset": 1542,
    "timestamp": 1705312800000,
    "key": "order-12345"
  }
}

The _kafka metadata includes: topic, partition, offset, timestamp, and key.

End-to-End Example

Create an inbound connector for your source topic, link it to an agent, and link an outbound connector to publish the agent output to a destination topic.

yaml
# agents/order-processor.yaml
version: "1.0"

name: order-processor
type: llm
model: gemini/gemini-2.5-flash
description: "Validate orders and compute routing"
system_prompt: |
  You receive an order event in JSON (from Kafka).
  1) Call orders.validate_order
  2) Call orders.score_risk
  3) Return JSON with order_id, status, risk_score, route

tools:
  - orders.validate_order
  - orders.score_risk
output_schema: order-result.json
python
# tools/orders.py
from typing import Dict, Any

def validate_order(order_id: str, items: list[str]) -> Dict[str, Any]:
    """Basic order validation."""
    if not order_id or not items:
        return {"ok": False, "reason": "missing_fields"}
    return {"ok": True}

async def score_risk(customer: str, total: float) -> Dict[str, Any]:
    """Return a simple risk score and routing hint."""
    score = 0.02 if total < 100 else 0.12
    route = "standard" if score < 0.1 else "manual_review"
    return {"risk_score": score, "route": route}

The outbound Kafka connector publishes the agent's JSON output as the message body:

json
{
  "order_id": "12345",
  "status": "approved",
  "risk_score": 0.02,
  "route": "standard"
}

Consumer Groups

Each connector uses a consumer group to track processed messages. Different group IDs = same messages to all. Same group ID = load sharing.

Connection Resilience

  • Auto reconnection, exponential backoff, config change detection, offset tracking

Outbound (Producer)

Produce agent results to Kafka topics when runs complete. Results are automatically sent to your configured topic with preserved message keys for correlation.

How Outbound Works

Outbound Kafka connectors act as producers. When linked agents complete runs, results are automatically published to the configured topic. Best for event-driven architectures and processing pipelines.

Configuration

  • Bootstrap Servers: Kafka broker addresses
  • Topic: Topic to publish results to

Security

Same as inbound: SASL_SSL with SCRAM-SHA-256 recommended for managed Kafka services.

Output Payload

json
{
  "run_id": "550e8400-e29b-41d4-a716-446655440000",
  "agent_name": "order-processor",
  "status": "completed",
  "output": "Order processed successfully. Total: $234.56",
  "error": null,
  "started_at": "2024-01-15T10:30:00Z",
  "ended_at": "2024-01-15T10:30:05Z",
  "token_usage": {
    "prompt_tokens": 150,
    "candidates_tokens": 50,
    "total_tokens": 200
  }
}

Includes run_id, agent_name, status, output, error, timestamps, and token_usage.

Message Key Correlation

When triggered by an inbound message with a key, the outbound producer uses the same key, preserving ordering within partitions.

bash
# Inbound message with key "order-123"
Message received Agent triggered Run completes

# Outbound message uses same key "order-123"
Result published with key: "order-123" (source: original)

# If no inbound key, uses run_id
Result published with key: "550e8400-e29b..." (source: run_id)

Delivery Guarantees

  • Full replication acknowledgment, automatic retries, connection pooling, run tracking

Advanced Patterns

For multi-topic streams, create one inbound connector per topic and link them to the same correlator agent. Use _kafka.key (or a shared order_id) to correlate events and store partial state in an external DB/Redis via tools. Retries are configured on the agent:

yaml
# agents/event-correlator.yaml
version: "1.0"

name: event-correlator
type: llm
model: gemini/gemini-2.5-pro
description: "Correlate order + shipment events across topics"
system_prompt: |
  Use the correlation tools to store incoming events by _kafka.key.
  Only respond when both "orders" and "shipments" are present.

tools:
  - correlation.upsert_event
  - correlation.build_snapshot

retry_options:
  attempts: 5
  max_delay: 30