Skip to main content
The LangChain handler is in preview. Pin exactly (calado-langchain==0.0.1) until 1.0 — see Upgrading.

Overview

calado-langchain is a BaseCallbackHandler subclass. You attach it to a chain and LangChain itself dispatches the events calado needs — chain inputs and outputs, LLM messages, tool calls, retrievals, errors, latency, and the parent-child run tree. There is nothing to call from your own code. Compared to wrapping the LLM client directly, the callback handler captures the layer above:
CapturedWrap-SDKCallback handler
LLM messagesyesyes
Tool boundariesnoyes
Retrievalsnoyes
Chain hierarchynoyes
Retries and intermediate reasoningnoyes
Use this page if your agent runs on LangChain or LangGraph and you do not already run a tracer. The callback handler is the path for LangChain users with no tracer in place. If you already trace your agent with LangSmith, use the LangSmith adapter instead. It forwards the runs LangSmith already collects and needs no package in your runtime. For raw anthropic / openai Python clients, send events with the Direct API.

LangGraph

LangGraph reuses LangChain’s callback system. The same handler works — node transitions arrive as nested chain runs and are rendered as a tree in the dashboard. No extra configuration.
LangGraph
import os
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from calado_langchain import CaladoCallbackHandler

handler = CaladoCallbackHandler()  # reads CALADO_API_KEY

llm = ChatAnthropic(model="claude-sonnet-4-5")

def plan(state: dict) -> dict:
    msg = llm.invoke([{"role": "user", "content": state["question"]}])
    return {"plan": msg.content}

def act(state: dict) -> dict:
    msg = llm.invoke([{"role": "user", "content": f"Execute: {state['plan']}"}])
    return {"answer": msg.content}

builder = StateGraph(dict)
builder.add_node("plan", plan)
builder.add_node("act", act)
builder.set_entry_point("plan")
builder.add_edge("plan", "act")
builder.add_edge("act", END)
graph = builder.compile()

result = graph.invoke(
    {"question": "Summarize today's release notes."},
    config={"callbacks": [handler], "configurable": {"session_id": "demo-1"}},
)

Installation

pip install calado-langchain
Requirements: Python 3.9-3.13, langchain-core>=0.2.10,<0.5. Your API key is generated in your agent’s Settings page on app.calado.ai. See Quickstart for the step-by-step.

Quick start

Construct the handler with no arguments (it reads CALADO_API_KEY from the environment) and pass it on the chain’s config.
import os
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from calado_langchain import CaladoCallbackHandler

handler = CaladoCallbackHandler()  # reads CALADO_API_KEY

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}"),
])
chain = prompt | ChatAnthropic(model="claude-sonnet-4-5")

user_session_id = "abc123"  # whatever stable id identifies this user/session in your app

answer = chain.invoke(
    {"question": "hello"},
    config={
        "callbacks": [handler],
        "configurable": {"session_id": f"session_{user_session_id}"},
    },
)
The session_id is optional — calado still captures runs without it — but strongly recommended. Without it, every top-level chain invocation is treated as a one-off and patterns are harder to surface.

Verifying it works

On the first successful POST, calado prints a one-line message so you know the wire is up:
[calado] Connected. First batch sent. View at https://app.calado.ai/agents/<agent_id>
Output is routed through Python’s logging module (logging.getLogger("calado")) and mirrored to stderr when no handler is attached, so the line is visible by default and easy to silence or redirect. For programmatic checks, call status():
print(handler.status())
# {
#   "events_sent": 12,
#   "events_dropped": 0,
#   "events_masked": 0,
#   "mask_failures": 0,
#   "batch_backlog": 0,
#   "consecutive_401s": 0,
#   "last_flush_at": "2026-05-14T10:32:11Z",
#   "last_flush_status_code": 200,
#   "last_error": None,
#   "enabled": True,
# }
Pass debug=True at construction for per-batch stderr lines:
[calado] flush batch_size=10 url=https://app.calado.ai/api/ingest status=200 duration=124ms
If you saw the Connected line and events_sent > 0, the integration is working. Open your agent page on app.calado.ai to see the conversation.

Zero-touch mode

Set environment variables and skip the per-chain wiring entirely:
export CALADO_API_KEY=cl_...
export CALADO_ENABLED=true
On import, the handler registers itself as a global LangChain callback and attaches to every chain in the process.
Global mode attaches the handler to every chain in the process. If you run mixed regulated and non-regulated agents in the same process, prefer per-chain mode and attach the handler only to the chains you want reported.

Scoping to one or more chains

Pass the handler on the chains you want reported. Group turns into a session by passing configurable.session_id:
session_id = "abc123"  # any stable string: session token, DB row id, UUID

config = {
    "callbacks": [handler],
    "configurable": {"session_id": f"session_{session_id}"},
}

chain.invoke({"question": "turn 1"}, config=config)
chain.invoke({"question": "turn 2"}, config=config)
chain.invoke({"question": "turn 3"}, config=config)
Use any stable string: a session token, a database row id, a UUID. Without it, each top-level chain invocation is recorded as a one-off conversation. You can run multiple handler instances side-by-side with different API keys — for example, an orchestrator and a sub-agent reported as separate calado agents.

What gets captured

calado fieldLangChain source
conversation.messagesRoot chain inputs and outputs
Step nameRun.name
Step hierarchyrun_id / parent_run_id
System promptSerialized prompt template (serialized.kwargs)
Tool schemasSerialized tool definitions (serialized.kwargs.tools)
Per-step inline definitionA child run’s system-role input message, attached as that step’s inlineDefinition so the sub-agent is analyzed against its own prompt
Tool callson_tool_start / on_tool_end
Retrievalson_retriever_end
Errorson_*_error
LatencyComputed from start/end timestamps
Session idRunnableConfig.configurable.session_id
Tags and metadataPass-through from RunnableConfig
Sub-chains, LLM calls, tool runs, retrievers, embeddings, parsers, and prompts all become steps. The dashboard renders them as a tree under the conversation.

Redacting sensitive data

Pass a mask callable at construction. It runs in-process, per child run, before transport. Return a modified dict, or return None to drop just that step (the rest of the tree is captured).
from typing import Optional

from calado_langchain import CaladoCallbackHandler

def mask(run: dict) -> Optional[dict]:
    # return None to drop this step, or a modified dict
    return run

handler = CaladoCallbackHandler(mask=mask)
Both sync and async functions are accepted. The handler probes once at construction (via a sentinel call) and caches whether to await the mask. If the mask raises 100 times in a row, the transport disables itself fail-closed and prints:
[calado] mask hook failed 100 times in a row. Transport disabled to prevent burning your logs. Check your mask function for exceptions; once fixed, restart the process. See https://docs.calado.ai/ingestion/redaction#debugging
Mask patterns (emails, phone numbers, Presidio integration, stable placeholders) live on Redacting sensitive data. The package does not ship recipe helpers.

Runtime behavior

  • Sandboxed. LangChain catches and logs handler exceptions. calado cannot break your chain.
  • Streaming. Per-token events buffer in the accumulator and materialize on on_llm_end. One row per LLM call, not per token.
  • Batching. Events accumulate until batch_size (default 10) or flush_interval_s (default 30s) is reached. The first event of a process force-flushes immediately so you don’t wait 30 seconds for the Connected line.
  • Root-end flush. Each conversation is materialized on the root on_chain_end and queued atomically.
  • Crash safety. atexit drains the queue synchronously on the foreground thread with a 5s bounded timeout. Hard crashes and Ctrl-C may lose in-flight trees.
  • Retry. 5xx and network errors retry with backoff. 4xx responses drop immediately.
  • 401 auto-disable. After three consecutive 401 responses, the transport disables and prints one line:
    [calado] API key rejected (401). Calado is disabled for this process. Check CALADO_API_KEY and restart.
    
  • Child-run cap. A single root run may have at most 950 child runs. Above the cap the server returns a structured 400 and the handler prints:
    [calado] Server returned 400 too_many_child_runs (cap=950, got=N) for root_run_id=<id>. This chain is too large for one ingest payload. Split via configurable.session_id, or reduce trace verbosity. See https://docs.calado.ai/ingestion/langchain#caps
    
    In practice the 5 MB request body cap is often the binding constraint and is reached before 950 runs when payloads are large. Whichever limit you hit first, the fix is the same: split via configurable.session_id or reduce trace verbosity.
  • Hung runs. Accumulator entries older than max_run_age_s (default 1h) are evicted, counted in events_dropped, and logged at WARNING.
  • Logging. All output routes through logging.getLogger("calado"). Attach your own handler to route to Datadog, Sentry, structured logs, or silence it entirely. Stderr is the default mirror.
  • Threading. Transport runs on a dedicated daemon thread; it does not block your asyncio event loop.

API reference

Constructor

CaladoCallbackHandler is keyword-only. Positional construction is forbidden.
api_key
str
default:"env CALADO_API_KEY"
Agent API key. Generated in Settings on the agent page.
base_url
str
default:"https://app.calado.ai"
Override for self-hosted or staging environments.
batch_size
int
default:"10"
Events accumulated before an HTTP flush.
flush_interval_s
float
default:"30.0"
Seconds before the background thread flushes a partial batch.
max_run_age_s
int
default:"3600"
Evict in-flight accumulator entries older than this many seconds.
max_queue_bytes
int
default:"10485760"
Hard ceiling on queued bytes. On overflow the oldest event is dropped.
debug
bool
default:"False"
Print one stderr line per batch flush.
mask
Mask
default:"None"
Sync or async callable. See Redacting sensitive data.

Methods

MethodPurpose
flush()Drain the queue synchronously.
shutdown()Flush and stop the background thread.
status()Return the status dict.
astatus()Async wrapper over status() (uses asyncio.to_thread). Use this from a FastAPI async endpoint.

Type aliases

For mypy --strict users (the package ships py.typed):
from typing import Awaitable, Callable, Optional, Union

MaskFn      = Callable[[dict], Optional[dict]]
AsyncMaskFn = Callable[[dict], Awaitable[Optional[dict]]]
Mask        = Union[MaskFn, AsyncMaskFn]

Ingest response shape

The POST /api/ingest endpoint returns:
{
  "ok": true,
  "agent_id": "agt_...",
  "ingest_id": "ing_...",
  "min_sdk_version": "0.0.1"
}
The handler reads agent_id to render the deep link in the Connected log line and warns when the installed package version is below min_sdk_version. Older servers may omit these fields; the handler falls back to the generic View at https://app.calado.ai/agents URL.

Testing

Don’t construct the handler in tests. Without CALADO_API_KEY, construction logs a warning and transport stays disabled. For explicit teardown after construction:
handler.shutdown()

Troubleshooting

[calado] Connected. never prints

# Drain the queue and inspect:
handler.flush()
print(handler.status())
Read in order:
  • enabled: False → see the 401 section below.
  • mask_failures > 0 → your mask is throwing. Fix and restart.
  • events_sent == 0 and last_error set → inspect last_error. Often a 4xx from a malformed payload or an unreachable base_url.
  • events_sent == 0 and no error → the chain hasn’t completed a root run yet, or CALADO_API_KEY was empty at construction.

401 from the server

calado auto-disables after three consecutive 401 responses. Check that CALADO_API_KEY starts with cl_ and matches an agent you own, then restart the process.

Inspect from a FastAPI endpoint

Mount the async status dict on a hidden route so on-call can read it without a deploy:
from fastapi import FastAPI
from calado_langchain import CaladoCallbackHandler

app = FastAPI()
handler = CaladoCallbackHandler()

@app.get("/_calado/status")
async def calado_status():
    return await handler.astatus()
Use astatus() from async code — it wraps status() via asyncio.to_thread and does not block the event loop.

Run the doctor

python -m calado_langchain doctor
Prints the installed calado-langchain version, the installed langchain-core version, the supported range, the Mask import status, and the fix command if pip resolution fails.

Upgrading

calado-langchain follows a pre-1.0 semver policy: any 0.x.y minor bump may introduce breaking changes. Pin exactly until 1.0:
# pyproject.toml
calado-langchain = "==0.0.1"
# requirements.txt
calado-langchain==0.0.1
After 1.0, normal semver resumes. Until then, read the CHANGELOG before each bump. The server returns a min_sdk_version on every ingest response (see Ingest response shape). When your installed version drops below it, the handler logs a WARNING via the calado logger.

Migrating after a langchain-core major bump

langchain-core is pinned to >=0.2.10,<0.5. When the next major lands, follow this template:
  1. Pin langchain-core and calado-langchain exactly in a feature branch.
  2. Run python -m calado_langchain doctor and confirm the supported range covers your target.
  3. Update both pins, then run the doctor again.
  4. Run your chain locally. Confirm the Connected line and one ingested conversation in the dashboard.
  5. Roll out behind the same flag as the langchain-core bump.

Production deployment

Uvicorn and Gunicorn

preload_app=False is the supported mode. The handler lazy-registers its atexit hook on the first event in each worker, so each worker drains its own queue cleanly on shutdown.
gunicorn app:app -k uvicorn.workers.UvicornWorker --workers 4
# preload_app=False is the default — leave it.
preload_app=True is a silent-loss landmine: the master process’s atexit races with each child’s atexit on fork. Use preload_app=False. If you must preload, set CALADO_DISABLE_ATEXIT=true and call handler.flush() from your shutdown hook explicitly.

AWS Lambda

Lambda sends SIGKILL on freeze, which skips atexit. Disable the hook and flush from your handler:
import os
os.environ["CALADO_DISABLE_ATEXIT"] = "true"

from calado_langchain import CaladoCallbackHandler

handler = CaladoCallbackHandler()

def lambda_handler(event, context):
    try:
        return run_chain(event, callbacks=[handler])
    finally:
        handler.flush()

Google Cloud Run

Cloud Run sends SIGTERM (10s grace) then SIGKILL. Trap SIGTERM and call flush() before exit, or set CALADO_DISABLE_ATEXIT=true and rely on an explicit per-request flush.
import signal, sys

def _drain(*_):
    handler.flush()
    sys.exit(0)

signal.signal(signal.SIGTERM, _drain)

Forked workers

The handler registers an os.register_at_fork(after_in_child=...) hook that clears the parent accumulator in the child. You don’t need to do anything — child workers start with a clean state.

Caps

LimitValue
Request body size5 MB
Child runs per root950
Conversations + definitions per request1,000
Per-message content size1 MB

Next: redacting sensitive data

Strip PII before events leave your process.