System architecture
Scouter is a dual-protocol Rust server — Axum HTTP on :8000, Tonic gRPC on :50051 — backed by PostgreSQL for recent drift and evaluation data, and Delta Lake for long-term trace storage. Model monitoring, distributed tracing, and agent evaluation run in the same server process, but ingestion is split into typed channels after the request handler: server records, trace records, and tag records each have their own queue and worker pool. Background workers consume from storage on independent schedules.
System overview
Section titled “System overview”flowchart LR
subgraph PC ["Python client"]
SQ[ScouterQueue]
SCT[ScouterTracer]
SCC[ScouterClient]
end
subgraph SI ["Server ingress"]
GRPC["gRPC :50051\nMessageGrpcService"]
HTTP["HTTP :8000\nPOST /scouter/message"]
OTLP["HTTP :8000\nPOST /scouter/v1/traces"]
ROUTER["record router"]
SR["server_record_tx\nbounded 1000"]
TR["trace_record_tx\nbounded 500"]
TG["tag_record_tx\nbounded 200"]
SW["server-record workers"]
TW["trace workers"]
GW["tag workers"]
end
subgraph BW ["Background workers"]
DE["DriftExecutor\nN workers"]
AP["AgentPoller\nN workers"]
INBOX["Trace inbox workers\ncommit events + sweeps"]
end
AD["Alert dispatch\nSlack / OpsGenie / Console"]
subgraph ST ["Storage"]
PG[("PostgreSQL\ndrift + eval records")]
DL[("Delta Lake\ntrace_spans + trace_summaries")]
end
SQ -->|"gRPC InsertMessageRequest"| GRPC
SQ -->|"HTTP POST /scouter/message"| HTTP
SCT -->|"TraceServerRecord\ngRPC MessageService"| GRPC
SCT -->|"OTLP protobuf"| OTLP
SCC -->|"register profiles\nquery results"| HTTP
GRPC --> ROUTER
HTTP --> ROUTER
OTLP --> TR
ROUTER --> SR & TR & TG
SR --> SW -->|"drift + eval rows"| PG
TR --> TW -->|"TraceSpanService\nTraceSummaryService"| DL
TG --> GW --> PG
DE -->|"SELECT skip lock\ncompute drift\nalert if triggered"| PG
AP -->|"SELECT pending eval\nexecute tasks\nstore results"| PG
AP --> DL
INBOX -->|"flip awaiting_trace\nrecover missed anchors\ntimeout stale rows"| PG
INBOX -->|"reconcile anchors"| DL
DE --> AD
AP --> AD
The important thing this diagram shows: traces have dedicated routes and workers, but not a separate trace ingest service. The HTTP OTLP endpoint (POST /scouter/v1/traces) accepts protobuf ExportTraceServiceRequest batches directly. gRPC uses the generic MessageService.InsertMessage; there is no separate gRPC TraceService in the protobuf. Once a trace record reaches AppState, it goes to trace_record_tx, not the server-record queue used for drift and eval rows.
AppState: shared server root
Section titled “AppState: shared server root”Every HTTP handler, gRPC handler, and background worker holds an Arc<AppState>. This is the complete field list:
| Field | Type | Purpose |
|---|---|---|
db_pool | Pool<Postgres> | Shared across all handlers, polling workers, and message handlers |
auth_manager | AuthManager | JWT verification on every authenticated request |
task_manager | TaskManager | Coordinates all background workers; watch::channel broadcasts shutdown |
config | Arc<ScouterServerConfig> | Typed env-var config, frozen at startup |
server_record_tx | Sender<ServerRecords> | Bounded channel for drift records, eval records, and profile updates |
trace_record_tx | Sender<TraceServerRecord> | Bounded channel for OTLP trace batches |
tag_record_tx | Sender<TagRecord> | Bounded channel for indexed tag writes |
trace_service | Arc<TraceSpanService> | Owns both Delta Lake actors; singleton for the lifetime of the server |
trace_summary_service | Arc<TraceSummaryService> | Hour-bucketed summary table; feeds the paginated trace list |
trace_dispatch_service | Arc<TraceDispatchService> | Runs trace retention and dispatch queries |
genai_service | Arc<GenAiSpanService> | Owns GenAI span extraction and query support |
dataset_manager | Arc<DatasetEngineManager> | Bifrost offline dataset engine |
eval_scenario_service | Arc<EvalScenarioService> | Offline eval scenario orchestration |
Shutdown order matters. task_manager.shutdown() must complete before trace_service.signal_shutdown(), which must complete before db_pool.close(). The implementation sequences these correctly, but anything that restructures shutdown must preserve this ordering or you lose in-flight span data.
ScouterQueue: client-side ingestion
Section titled “ScouterQueue: client-side ingestion”sequenceDiagram
participant PY as Python app
participant QB as QueueBus
participant CH as mpsc channel
participant EH as event handler
participant QN as QueueNum
participant TR as Transport
participant WK as ConsumerWorkers
participant MH as MessageHandler
participant PG as PostgreSQL
PY->>QB: queue.insert(record)
Note over QB: sub-microsecond enqueue
QB->>CH: Event Task(QueueItem)
CH->>EH: recv_async()
EH->>QN: route by record type
QN->>QN: batch records
Note over QN: PSI flushes every 30s
QN->>TR: MessageRecord
TR->>WK: deliver record
WK->>MH: process_message_record()
MH->>PG: INSERT into drift / eval tables
Note over QB,EH: On shutdown CancellationToken fires
A few things worth knowing about this path:
The GIL is released during shutdown() via py.detach(). This is intentional — the wait periods inside the shutdown sequence would deadlock other Python threads otherwise. It’s not a leak.
Transport is selected once at ScouterQueue::from_path() or from_profile() and is fixed for the queue’s lifetime. You cannot switch from HTTP to Kafka on a live queue.
PSI has a 30-second background flush ticker independent of batch capacity. SPC does not — it flushes only when the batch fills. Low-traffic SPC services can accumulate records in memory longer than you’d expect.
Queue insert errors are logged but not returned to the caller. The ScouterQueue is deliberately lossy from the application’s perspective — monitoring infrastructure should not affect application latency or error paths.
Server-side message consumption
Section titled “Server-side message consumption”graph TD
GRPC["gRPC MessageService\nInsertMessage"]
MSG["HTTP /scouter/message"]
OTLP["HTTP /scouter/v1/traces\nOTLP protobuf"]
ROUTER["MessageRecord router"]
SR["server_record_tx\nbounded 1000"]
TR["trace_record_tx\nbounded 500"]
TG["tag_record_tx\nbounded 200"]
SW["server-record workers"]
TW["trace workers"]
GW["tag workers"]
R1["SPC features → spc_drift"]
R2["PSI features → psi_drift"]
R3["Custom metrics → custom_drift"]
R4["EvalRecord → agent_eval_record"]
R5["TraceServerRecord → Delta Lake\nTraceSpanService + TraceSummaryService"]
R6["TagRecord → PostgreSQL tag index"]
GRPC --> ROUTER
MSG --> ROUTER
ROUTER --> SR & TR & TG
OTLP --> TR
SR --> SW --> R1 & R2 & R3 & R4
TR --> TW --> R5
TG --> GW --> R6
The generic message routes deserialize MessageRecord and send each variant to the matching channel. The OTLP route skips the enum wrapper: it decodes protobuf bytes into ExportTraceServiceRequest and sends a TraceServerRecord directly to trace_record_tx.
There is no separate trace ingest service or gRPC TraceService. There is a dedicated HTTP OTLP route and a dedicated trace worker pool. Size TRACE_CONSUMER_WORKERS for span volume separately from SERVER_RECORD_CONSUMER_WORKERS, which handles drift and eval records.
Drift detection: DriftExecutor
Section titled “Drift detection: DriftExecutor”N workers start at server boot, staggered 200ms apart to avoid a thundering herd on a cold start. Each worker runs the same poll loop independently:
flowchart TD
A([poll_for_tasks]) --> B["get_drift_profile_task()\nSELECT ... FOR UPDATE SKIP LOCKED"]
B -- None --> C["sleep 10s"]
C --> A
B -- "Some(task)" --> D["DriftProfile::from_str()\nparse JSON → profile type"]
D --> E{drift type}
E -->|PSI| F["fetch psi_drift rows since previous_run\ncompute PSI per bin"]
E -->|SPC| G["fetch spc_drift rows\ngrand mean ± σ, apply WECO rules"]
E -->|Custom| H["aggregate custom_drift rows\ncompare vs AlertThreshold"]
E -->|Agent| I["aggregate agent_eval_workflow results\ncheck alert conditions"]
F & G & H & I --> J{alerts?}
J -- Yes --> K["insert_drift_alert()\ndispatch: Slack / OpsGenie / Console"]
J -- No --> L["update_drift_profile_run_dates()\nreschedule per cron"]
K --> L
L --> A
| Drift type | Algorithm | Default thresholds |
|---|---|---|
| PSI | Σ(y_i − y_b) × ln(y_i / y_b) per bin | <0.1 stable / 0.1–0.25 moderate / >0.25 significant |
| SPC | Grand mean ± σ from baseline; WECO rules (8 16 4 8 2 4 1 1) | 3σ zone violations |
| Custom | Aggregated metric vs. AlertThreshold | Below / Above / Outside (user-defined) |
SELECT ... FOR UPDATE SKIP LOCKED is what makes N concurrent workers safe without a separate job queue. A task locked by one worker is invisible to others until the lock is released.
update_drift_profile_run_dates() is called unconditionally — even if drift computation fails. A stuck task won’t monopolize the lock forever. The tradeoff is that a failed run still advances the schedule, so you might miss one drift window on a transient error.
Drift computation reads from PostgreSQL only. Delta Lake is trace-only.
Agent evaluation: AgentPoller
Section titled “Agent evaluation: AgentPoller”Agent evaluation is split across three background paths:
AgentPollerclaimspendingeval records and executes the task DAG.- The trace commit consumer writes anchor events emitted after Delta commits.
- The trace inbox worker flips
awaiting_tracerecords topending, reconciles dropped anchor events from Delta, recovers stale queue leases, and times out records whose traces never arrive.
AgentPoller no longer waits and reschedules records while polling for trace spans. Trace readiness is handled before a record becomes pollable.
flowchart TD
A([poll_for_tasks]) --> B["get_pending_agent_eval_record()\nstatus = pending\nready_at <= now()"]
B -- None --> C["sleep 1s"]
C --> A
B -- "Some(record)" --> D["fetch AgentEvalProfile\nfrom Postgres"]
D --> E{has TraceAssertionTasks?}
E -- No --> H
E -- Yes --> F{trace_id + span_id present?}
F -- No --> X["mark failed\nmissing trace anchor"]
F -- Yes --> G["fetch spans by trace_id\nfrom TraceSpanService"]
G -- "no spans or anchor invalid" --> X
G -- Found --> H["execute task DAG\nrespecting depends_on order"]
H --> I["AssertionTask (in-process)\nLLMJudgeTask (external LLM)\nAgentAssertionTask (vendor-agnostic)\nTraceAssertionTask (span-based)"]
I --> J["insert_eval_task_results_batch()\ninsert_agent_eval_workflow_record()"]
J --> K["mark record processed"]
X --> A
K --> A
The SQL poller only sees rows with status = 'pending', scheduled_at <= now(), and ready_at <= now(). Records that need trace assertions are inserted as awaiting_trace until the inbox path proves the exact (record_uid, trace_id, span_id) anchor committed to Delta Lake. When the inbox worker completes the matching trace_commit_event, it flips that eval row to pending and sets ready_at = now() + SCOUTER_TRACE_VISIBILITY_BUFFER_SECS.
This changes the failure mode. A record with TraceAssertionTask and no trace_id fails as EvalRequiresTrace. A record with trace_id but no span_id fails as EvalRequiresAnchorSpan. A record with a complete anchor that never commits stays awaiting_trace until the timeout sweep marks it TraceArrivalTimeout.
Task execution follows the declared depends_on DAG. Each task sees base context plus explicitly declared dependency outputs — not the full context of every upstream task. This is intentional scoping.
AgentContextBuilder detects vendor format automatically: choices key → OpenAI, stop_reason + content → Anthropic, candidates → Gemini/Vertex. The response sub-key wrapper is supported for all vendors.
condition=True on an AssertionTask makes it a gate: failure skips all downstream tasks that declare a depends_on dependency on it. Use this to avoid LLM judge calls when cheap structural preconditions fail.
A naming note: AgentDrifter inside DriftExecutor and AgentPoller are separate workers with separate concerns. AgentDrifter aggregates stored eval workflow results and checks alert conditions on a cron schedule. AgentPoller runs the actual task evaluation. They don’t share code or state. The name overlap is confusing — keep it in mind when reading the codebase.
Distributed tracing: ingest to query
Section titled “Distributed tracing: ingest to query”Write path
Section titled “Write path”graph TB
subgraph "Client"
OT["OTEL-instrumented code\[email protected]() / with tracer.start_span()"]
EX["ScouterSpanExporter\nbatch: 512 spans / 5s flush"]
end
subgraph "Server trace ingest"
MSG["TraceServerRecord\ntrace_record_tx"]
TW["trace workers"]
TSS["TraceSpanService"]
SUM["TraceSummaryService\nhour-bucketed summaries"]
end
subgraph "Delta Lake write pipeline"
BUF["Buffer actor\nflush at 10K spans or 5s"]
ENG["TraceSpanDBEngine actor\nsingle writer — one per pod"]
BB["TraceSpanBatchBuilder\nArrow RecordBatch"]
DL[("Delta Lake\ntrace_spans + trace_summaries")]
end
OT --> EX
EX -->|"TraceServerRecord\ngRPC or HTTP"| MSG
MSG --> TW
TW --> TSS & SUM
TSS --> BUF
BUF --> ENG
ENG --> BB
BB -->|"Delta commit"| DL
SUM --> DL
For full schema, compaction schedule, bloom filter configuration, and multi-pod deployment details, see Trace Storage Architecture.
Two callouts that aren’t in the storage doc:
Span hierarchy fields — depth, span_order, path, root_span_id — are not stored. They’re computed at query time via a Rust DFS traversal in build_span_tree(). This matches how Jaeger and Zipkin handle it, and avoids ingest ordering dependencies. The implication is that tree assembly is work at read time, not write time.
search_blob is pre-computed at ingest by concatenating service name, span name, and flattened attributes into a single string. Attribute filter queries use search_blob LIKE '%key:value%'. No JSON parsing at query time.
Query path
Section titled “Query path”sequenceDiagram
participant CL as Client
participant RT as HTTP route handler
participant TQ as TraceQueries
participant CS as CachingStore
participant DL as Delta Lake (Parquet)
participant DF as DataFusion SessionContext
CL->>RT: GET /scouter/v1/traces/{id}/spans
RT->>TQ: get_trace_spans_by_id(trace_id)
TQ->>DF: SQL with time filter first
Note over DF: time filter prunes Delta log files\nbloom filter (FPP 0.01) skips row groups
DF->>CS: range reads on Parquet files
CS-->>DF: cached file data (1hr TTL for ≤2MB ranges)
DF-->>TQ: RecordBatch
TQ->>TQ: RecordBatch → FlatSpan list
TQ->>TQ: build_span_tree() — Rust DFS\nassigns depth, span_order, path, root_span_id
TQ-->>RT: Vec with hierarchy
RT-->>CL: JSON response
The query planner evaluates the time range filter before the bloom filter on trace_id. With reorder_filters=true in the DataFusion session config, the bloom filter (FPP 0.01) skips ~99% of row groups for single-trace lookups. Time-bounded range queries benefit more from the partition pruning on partition_date.
Offline vs. online evaluation
Section titled “Offline vs. online evaluation”graph LR
subgraph "Offline — local process only"
EO["EvalOrchestrator\nscenarios + agent_fn"]
ER["EvalRunner\n3-level evaluation"]
EM["EvalMetrics\noverall_pass_rate\nworkflow_pass_rate\nscenario_pass_rate"]
EO --> ER --> EM
end
subgraph "Shared task definitions"
AT["AssertionTask\ndeterministic"]
LJ["LLMJudgeTask\nsemantic via LLM"]
AA["AgentAssertionTask\nvendor-agnostic"]
TA["TraceAssertionTask\nspan-based"]
end
subgraph "Online — production traffic"
SQ["ScouterQueue.insert(EvalRecord)\nnon-blocking, samples at sample_ratio"]
EVALPG[("PostgreSQL\nagent_eval_record")]
INBOX["Trace inbox\nonly for TraceAssertionTask"]
AGP["AgentPoller\nN workers"]
PG[("PostgreSQL\neval results + alert records")]
AL["Alert dispatch"]
SQ --> EVALPG
EVALPG -->|"no trace gate"| AGP
EVALPG -->|"awaiting_trace"| INBOX -->|"pending + ready_at"| AGP
AGP --> PG --> AL
end
AT & LJ & AA & TA -.->|same task types| EO
AT & LJ & AA & TA -.->|same task types| AGP
Task definitions — AssertionTask, LLMJudgeTask, AgentAssertionTask, TraceAssertionTask — are identical between offline and online. Write them once, validate in CI, run them against production traffic.
Offline evaluation uses MockConfig() transport. Records never leave the process. EvalRunner automatically pulls spans from its run-scoped trace capture buffer — no manual wiring needed between EvalRecord insertion and TraceAssertionTask evaluation.
Online sampling happens at QueueNum::insert_agent_record() before the transport layer. Records that don’t pass the sample_ratio check are silently dropped, not transmitted. There’s no counter or log entry for dropped records by design — sampling is a rate control mechanism, not an error path.
EvalRunner.evaluate() calls block_on internally and cannot be called from an async context. This is a hard constraint, not a known issue.
The offline evaluation engine runs 3-level aggregation:
- Sub-agent / workflow — Per alias: flatten all records across scenarios into a single
EvalDataset, evaluate against profile tasks, compute pass rate. - Scenario — Per scenario: evaluate scenario-level tasks, compute whether all tasks passed.
- Aggregate —
EvalMetrics:overall_pass_rate,workflow_pass_rate,dataset_pass_rates(per-alias dict),scenario_pass_rate,total_scenarios.
Shutdown sequence
Section titled “Shutdown sequence”When the server receives SIGTERM, AppState::shutdown() runs these steps in order:
task_manager.shutdown()— fires thewatch::channel; all drift workers and agent pollers exit theirtokio::select!loopshutdown_trace_cache(&db_pool)— flushes any pending trace cache entries to Postgrestrace_service.signal_shutdown()— buffer actor drains remaining spans; engine actor processes the final Delta committrace_summary_service.signal_shutdown()— same pattern for the summary tabledataset_manager.shutdown()— Bifrost engine shutdowneval_scenario_service.signal_shutdown()— offline eval enginedb_pool.close()— all connections drained
Step 7 must come last. The trace cache flush at step 2 still needs a live pool. If you restructure this sequence, test under load before deploying.
Trade-offs and known constraints
Section titled “Trade-offs and known constraints”Single Delta Lake writer per pod. TraceSpanDBEngine is a single-writer actor. Multi-pod deployments designate one pod as the writer; reader pods call update_incremental() and never write. Running two writer pods against the same bucket path causes Delta log conflicts. The compaction control table prevents double-compaction but does not prevent double-writing of spans.
Trace hierarchy is query-time computed. build_span_tree() is an in-memory Rust DFS on every trace fetch. It’s fast and bounded, but it’s not free. Deep traces (hundreds of spans) will be noticeably slower to assemble than shallow ones. There is no stored index on parent-child relationships.
SKIP LOCKED is not a durable job queue. Drift profile tasks live in Postgres. The window between SELECT ... SKIP LOCKED and update_drift_profile_run_dates() is not transactional. If a drift worker crashes mid-computation, the task will be picked up again on the next tick — but double-computation is possible if the crash happens at exactly the right moment.
Trace-backed eval waits in the inbox, not inside AgentPoller. Records without TraceAssertionTask go straight to pending. Records with trace assertions stay awaiting_trace until the matching anchor span commits to Delta and the inbox worker flips the row to pending with a visibility buffer. Tune SCOUTER_TRACE_VISIBILITY_BUFFER_SECS, SCOUTER_INBOX_RECONCILE_INTERVAL_SECS, and TRACE_ARRIVAL_TIMEOUT_SECS around your Delta refresh and span arrival behavior.
Queue errors are logged, not returned. The QueueBus insert path intentionally swallows errors. In production, monitor server logs for Error inserting entity into queue — there’s no counter exported by default.
PSI flushes every 30s; SPC does not. Low-traffic SPC services may accumulate records in memory for longer than expected. If you need tighter write latency for SPC, reduce the batch size or add a custom flush trigger.