Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content

System architecture

Scouter is a dual-protocol Rust server — Axum HTTP on :8000, Tonic gRPC on :50051 — backed by PostgreSQL for recent drift and evaluation data, and Delta Lake for long-term trace storage. Model monitoring, distributed tracing, and agent evaluation run in the same server process, but ingestion is split into typed channels after the request handler: server records, trace records, and tag records each have their own queue and worker pool. Background workers consume from storage on independent schedules.


flowchart LR
    subgraph PC ["Python client"]
        SQ[ScouterQueue]
        SCT[ScouterTracer]
        SCC[ScouterClient]
    end

    subgraph SI ["Server ingress"]
        GRPC["gRPC :50051\nMessageGrpcService"]
        HTTP["HTTP :8000\nPOST /scouter/message"]
        OTLP["HTTP :8000\nPOST /scouter/v1/traces"]
        ROUTER["record router"]
        SR["server_record_tx\nbounded 1000"]
        TR["trace_record_tx\nbounded 500"]
        TG["tag_record_tx\nbounded 200"]
        SW["server-record workers"]
        TW["trace workers"]
        GW["tag workers"]
    end

    subgraph BW ["Background workers"]
        DE["DriftExecutor\nN workers"]
        AP["AgentPoller\nN workers"]
        INBOX["Trace inbox workers\ncommit events + sweeps"]
    end

    AD["Alert dispatch\nSlack / OpsGenie / Console"]

    subgraph ST ["Storage"]
        PG[("PostgreSQL\ndrift + eval records")]
        DL[("Delta Lake\ntrace_spans + trace_summaries")]
    end

    SQ -->|"gRPC InsertMessageRequest"| GRPC
    SQ -->|"HTTP POST /scouter/message"| HTTP
    SCT -->|"TraceServerRecord\ngRPC MessageService"| GRPC
    SCT -->|"OTLP protobuf"| OTLP
    SCC -->|"register profiles\nquery results"| HTTP
    GRPC --> ROUTER
    HTTP --> ROUTER
    OTLP --> TR
    ROUTER --> SR & TR & TG
    SR --> SW -->|"drift + eval rows"| PG
    TR --> TW -->|"TraceSpanService\nTraceSummaryService"| DL
    TG --> GW --> PG
    DE -->|"SELECT skip lock\ncompute drift\nalert if triggered"| PG
    AP -->|"SELECT pending eval\nexecute tasks\nstore results"| PG
    AP --> DL
    INBOX -->|"flip awaiting_trace\nrecover missed anchors\ntimeout stale rows"| PG
    INBOX -->|"reconcile anchors"| DL
    DE --> AD
    AP --> AD

The important thing this diagram shows: traces have dedicated routes and workers, but not a separate trace ingest service. The HTTP OTLP endpoint (POST /scouter/v1/traces) accepts protobuf ExportTraceServiceRequest batches directly. gRPC uses the generic MessageService.InsertMessage; there is no separate gRPC TraceService in the protobuf. Once a trace record reaches AppState, it goes to trace_record_tx, not the server-record queue used for drift and eval rows.


Every HTTP handler, gRPC handler, and background worker holds an Arc<AppState>. This is the complete field list:

FieldTypePurpose
db_poolPool<Postgres>Shared across all handlers, polling workers, and message handlers
auth_managerAuthManagerJWT verification on every authenticated request
task_managerTaskManagerCoordinates all background workers; watch::channel broadcasts shutdown
configArc<ScouterServerConfig>Typed env-var config, frozen at startup
server_record_txSender<ServerRecords>Bounded channel for drift records, eval records, and profile updates
trace_record_txSender<TraceServerRecord>Bounded channel for OTLP trace batches
tag_record_txSender<TagRecord>Bounded channel for indexed tag writes
trace_serviceArc<TraceSpanService>Owns both Delta Lake actors; singleton for the lifetime of the server
trace_summary_serviceArc<TraceSummaryService>Hour-bucketed summary table; feeds the paginated trace list
trace_dispatch_serviceArc<TraceDispatchService>Runs trace retention and dispatch queries
genai_serviceArc<GenAiSpanService>Owns GenAI span extraction and query support
dataset_managerArc<DatasetEngineManager>Bifrost offline dataset engine
eval_scenario_serviceArc<EvalScenarioService>Offline eval scenario orchestration

Shutdown order matters. task_manager.shutdown() must complete before trace_service.signal_shutdown(), which must complete before db_pool.close(). The implementation sequences these correctly, but anything that restructures shutdown must preserve this ordering or you lose in-flight span data.


sequenceDiagram
    participant PY as Python app
    participant QB as QueueBus
    participant CH as mpsc channel
    participant EH as event handler
    participant QN as QueueNum
    participant TR as Transport
    participant WK as ConsumerWorkers
    participant MH as MessageHandler
    participant PG as PostgreSQL

    PY->>QB: queue.insert(record)
    Note over QB: sub-microsecond enqueue
    QB->>CH: Event Task(QueueItem)
    CH->>EH: recv_async()
    EH->>QN: route by record type
    QN->>QN: batch records
    Note over QN: PSI flushes every 30s
    QN->>TR: MessageRecord
    TR->>WK: deliver record
    WK->>MH: process_message_record()
    MH->>PG: INSERT into drift / eval tables
    Note over QB,EH: On shutdown CancellationToken fires

A few things worth knowing about this path:

The GIL is released during shutdown() via py.detach(). This is intentional — the wait periods inside the shutdown sequence would deadlock other Python threads otherwise. It’s not a leak.

Transport is selected once at ScouterQueue::from_path() or from_profile() and is fixed for the queue’s lifetime. You cannot switch from HTTP to Kafka on a live queue.

PSI has a 30-second background flush ticker independent of batch capacity. SPC does not — it flushes only when the batch fills. Low-traffic SPC services can accumulate records in memory longer than you’d expect.

Queue insert errors are logged but not returned to the caller. The ScouterQueue is deliberately lossy from the application’s perspective — monitoring infrastructure should not affect application latency or error paths.


graph TD
    GRPC["gRPC MessageService\nInsertMessage"]
    MSG["HTTP /scouter/message"]
    OTLP["HTTP /scouter/v1/traces\nOTLP protobuf"]
    ROUTER["MessageRecord router"]

    SR["server_record_tx\nbounded 1000"]
    TR["trace_record_tx\nbounded 500"]
    TG["tag_record_tx\nbounded 200"]

    SW["server-record workers"]
    TW["trace workers"]
    GW["tag workers"]

    R1["SPC features → spc_drift"]
    R2["PSI features → psi_drift"]
    R3["Custom metrics → custom_drift"]
    R4["EvalRecord → agent_eval_record"]
    R5["TraceServerRecord → Delta Lake\nTraceSpanService + TraceSummaryService"]
    R6["TagRecord → PostgreSQL tag index"]

    GRPC --> ROUTER
    MSG --> ROUTER
    ROUTER --> SR & TR & TG
    OTLP --> TR
    SR --> SW --> R1 & R2 & R3 & R4
    TR --> TW --> R5
    TG --> GW --> R6

The generic message routes deserialize MessageRecord and send each variant to the matching channel. The OTLP route skips the enum wrapper: it decodes protobuf bytes into ExportTraceServiceRequest and sends a TraceServerRecord directly to trace_record_tx.

There is no separate trace ingest service or gRPC TraceService. There is a dedicated HTTP OTLP route and a dedicated trace worker pool. Size TRACE_CONSUMER_WORKERS for span volume separately from SERVER_RECORD_CONSUMER_WORKERS, which handles drift and eval records.


N workers start at server boot, staggered 200ms apart to avoid a thundering herd on a cold start. Each worker runs the same poll loop independently:

flowchart TD
    A([poll_for_tasks]) --> B["get_drift_profile_task()\nSELECT ... FOR UPDATE SKIP LOCKED"]
    B -- None --> C["sleep 10s"]
    C --> A
    B -- "Some(task)" --> D["DriftProfile::from_str()\nparse JSON → profile type"]
    D --> E{drift type}
    E -->|PSI| F["fetch psi_drift rows since previous_run\ncompute PSI per bin"]
    E -->|SPC| G["fetch spc_drift rows\ngrand mean ± σ, apply WECO rules"]
    E -->|Custom| H["aggregate custom_drift rows\ncompare vs AlertThreshold"]
    E -->|Agent| I["aggregate agent_eval_workflow results\ncheck alert conditions"]
    F & G & H & I --> J{alerts?}
    J -- Yes --> K["insert_drift_alert()\ndispatch: Slack / OpsGenie / Console"]
    J -- No --> L["update_drift_profile_run_dates()\nreschedule per cron"]
    K --> L
    L --> A
Drift typeAlgorithmDefault thresholds
PSIΣ(y_i − y_b) × ln(y_i / y_b) per bin<0.1 stable / 0.1–0.25 moderate / >0.25 significant
SPCGrand mean ± σ from baseline; WECO rules (8 16 4 8 2 4 1 1)3σ zone violations
CustomAggregated metric vs. AlertThresholdBelow / Above / Outside (user-defined)

SELECT ... FOR UPDATE SKIP LOCKED is what makes N concurrent workers safe without a separate job queue. A task locked by one worker is invisible to others until the lock is released.

update_drift_profile_run_dates() is called unconditionally — even if drift computation fails. A stuck task won’t monopolize the lock forever. The tradeoff is that a failed run still advances the schedule, so you might miss one drift window on a transient error.

Drift computation reads from PostgreSQL only. Delta Lake is trace-only.


Agent evaluation is split across three background paths:

  • AgentPoller claims pending eval records and executes the task DAG.
  • The trace commit consumer writes anchor events emitted after Delta commits.
  • The trace inbox worker flips awaiting_trace records to pending, reconciles dropped anchor events from Delta, recovers stale queue leases, and times out records whose traces never arrive.

AgentPoller no longer waits and reschedules records while polling for trace spans. Trace readiness is handled before a record becomes pollable.

flowchart TD
    A([poll_for_tasks]) --> B["get_pending_agent_eval_record()\nstatus = pending\nready_at <= now()"]
    B -- None --> C["sleep 1s"]
    C --> A
    B -- "Some(record)" --> D["fetch AgentEvalProfile\nfrom Postgres"]
    D --> E{has TraceAssertionTasks?}
    E -- No --> H
    E -- Yes --> F{trace_id + span_id present?}
    F -- No --> X["mark failed\nmissing trace anchor"]
    F -- Yes --> G["fetch spans by trace_id\nfrom TraceSpanService"]
    G -- "no spans or anchor invalid" --> X
    G -- Found --> H["execute task DAG\nrespecting depends_on order"]
    H --> I["AssertionTask (in-process)\nLLMJudgeTask (external LLM)\nAgentAssertionTask (vendor-agnostic)\nTraceAssertionTask (span-based)"]
    I --> J["insert_eval_task_results_batch()\ninsert_agent_eval_workflow_record()"]
    J --> K["mark record processed"]
    X --> A
    K --> A

The SQL poller only sees rows with status = 'pending', scheduled_at <= now(), and ready_at <= now(). Records that need trace assertions are inserted as awaiting_trace until the inbox path proves the exact (record_uid, trace_id, span_id) anchor committed to Delta Lake. When the inbox worker completes the matching trace_commit_event, it flips that eval row to pending and sets ready_at = now() + SCOUTER_TRACE_VISIBILITY_BUFFER_SECS.

This changes the failure mode. A record with TraceAssertionTask and no trace_id fails as EvalRequiresTrace. A record with trace_id but no span_id fails as EvalRequiresAnchorSpan. A record with a complete anchor that never commits stays awaiting_trace until the timeout sweep marks it TraceArrivalTimeout.

Task execution follows the declared depends_on DAG. Each task sees base context plus explicitly declared dependency outputs — not the full context of every upstream task. This is intentional scoping.

AgentContextBuilder detects vendor format automatically: choices key → OpenAI, stop_reason + content → Anthropic, candidates → Gemini/Vertex. The response sub-key wrapper is supported for all vendors.

condition=True on an AssertionTask makes it a gate: failure skips all downstream tasks that declare a depends_on dependency on it. Use this to avoid LLM judge calls when cheap structural preconditions fail.

A naming note: AgentDrifter inside DriftExecutor and AgentPoller are separate workers with separate concerns. AgentDrifter aggregates stored eval workflow results and checks alert conditions on a cron schedule. AgentPoller runs the actual task evaluation. They don’t share code or state. The name overlap is confusing — keep it in mind when reading the codebase.


graph TB
    subgraph "Client"
        OT["OTEL-instrumented code\[email protected]() / with tracer.start_span()"]
        EX["ScouterSpanExporter\nbatch: 512 spans / 5s flush"]
    end

    subgraph "Server trace ingest"
        MSG["TraceServerRecord\ntrace_record_tx"]
        TW["trace workers"]
        TSS["TraceSpanService"]
        SUM["TraceSummaryService\nhour-bucketed summaries"]
    end

    subgraph "Delta Lake write pipeline"
        BUF["Buffer actor\nflush at 10K spans or 5s"]
        ENG["TraceSpanDBEngine actor\nsingle writer — one per pod"]
        BB["TraceSpanBatchBuilder\nArrow RecordBatch"]
        DL[("Delta Lake\ntrace_spans + trace_summaries")]
    end

    OT --> EX
    EX -->|"TraceServerRecord\ngRPC or HTTP"| MSG
    MSG --> TW
    TW --> TSS & SUM
    TSS --> BUF
    BUF --> ENG
    ENG --> BB
    BB -->|"Delta commit"| DL
    SUM --> DL

For full schema, compaction schedule, bloom filter configuration, and multi-pod deployment details, see Trace Storage Architecture.

Two callouts that aren’t in the storage doc:

Span hierarchy fields — depth, span_order, path, root_span_id — are not stored. They’re computed at query time via a Rust DFS traversal in build_span_tree(). This matches how Jaeger and Zipkin handle it, and avoids ingest ordering dependencies. The implication is that tree assembly is work at read time, not write time.

search_blob is pre-computed at ingest by concatenating service name, span name, and flattened attributes into a single string. Attribute filter queries use search_blob LIKE '%key:value%'. No JSON parsing at query time.

sequenceDiagram
    participant CL as Client
    participant RT as HTTP route handler
    participant TQ as TraceQueries
    participant CS as CachingStore
    participant DL as Delta Lake (Parquet)
    participant DF as DataFusion SessionContext

    CL->>RT: GET /scouter/v1/traces/{id}/spans
    RT->>TQ: get_trace_spans_by_id(trace_id)
    TQ->>DF: SQL with time filter first
    Note over DF: time filter prunes Delta log files\nbloom filter (FPP 0.01) skips row groups
    DF->>CS: range reads on Parquet files
    CS-->>DF: cached file data (1hr TTL for ≤2MB ranges)
    DF-->>TQ: RecordBatch
    TQ->>TQ: RecordBatch → FlatSpan list
    TQ->>TQ: build_span_tree() — Rust DFS\nassigns depth, span_order, path, root_span_id
    TQ-->>RT: Vec with hierarchy
    RT-->>CL: JSON response

The query planner evaluates the time range filter before the bloom filter on trace_id. With reorder_filters=true in the DataFusion session config, the bloom filter (FPP 0.01) skips ~99% of row groups for single-trace lookups. Time-bounded range queries benefit more from the partition pruning on partition_date.


graph LR
    subgraph "Offline — local process only"
        EO["EvalOrchestrator\nscenarios + agent_fn"]
        ER["EvalRunner\n3-level evaluation"]
        EM["EvalMetrics\noverall_pass_rate\nworkflow_pass_rate\nscenario_pass_rate"]
        EO --> ER --> EM
    end

    subgraph "Shared task definitions"
        AT["AssertionTask\ndeterministic"]
        LJ["LLMJudgeTask\nsemantic via LLM"]
        AA["AgentAssertionTask\nvendor-agnostic"]
        TA["TraceAssertionTask\nspan-based"]
    end

    subgraph "Online — production traffic"
        SQ["ScouterQueue.insert(EvalRecord)\nnon-blocking, samples at sample_ratio"]
        EVALPG[("PostgreSQL\nagent_eval_record")]
        INBOX["Trace inbox\nonly for TraceAssertionTask"]
        AGP["AgentPoller\nN workers"]
        PG[("PostgreSQL\neval results + alert records")]
        AL["Alert dispatch"]
        SQ --> EVALPG
        EVALPG -->|"no trace gate"| AGP
        EVALPG -->|"awaiting_trace"| INBOX -->|"pending + ready_at"| AGP
        AGP --> PG --> AL
    end

    AT & LJ & AA & TA -.->|same task types| EO
    AT & LJ & AA & TA -.->|same task types| AGP

Task definitions — AssertionTask, LLMJudgeTask, AgentAssertionTask, TraceAssertionTask — are identical between offline and online. Write them once, validate in CI, run them against production traffic.

Offline evaluation uses MockConfig() transport. Records never leave the process. EvalRunner automatically pulls spans from its run-scoped trace capture buffer — no manual wiring needed between EvalRecord insertion and TraceAssertionTask evaluation.

Online sampling happens at QueueNum::insert_agent_record() before the transport layer. Records that don’t pass the sample_ratio check are silently dropped, not transmitted. There’s no counter or log entry for dropped records by design — sampling is a rate control mechanism, not an error path.

EvalRunner.evaluate() calls block_on internally and cannot be called from an async context. This is a hard constraint, not a known issue.

The offline evaluation engine runs 3-level aggregation:

  1. Sub-agent / workflow — Per alias: flatten all records across scenarios into a single EvalDataset, evaluate against profile tasks, compute pass rate.
  2. Scenario — Per scenario: evaluate scenario-level tasks, compute whether all tasks passed.
  3. AggregateEvalMetrics: overall_pass_rate, workflow_pass_rate, dataset_pass_rates (per-alias dict), scenario_pass_rate, total_scenarios.

When the server receives SIGTERM, AppState::shutdown() runs these steps in order:

  1. task_manager.shutdown() — fires the watch::channel; all drift workers and agent pollers exit their tokio::select! loop
  2. shutdown_trace_cache(&db_pool) — flushes any pending trace cache entries to Postgres
  3. trace_service.signal_shutdown() — buffer actor drains remaining spans; engine actor processes the final Delta commit
  4. trace_summary_service.signal_shutdown() — same pattern for the summary table
  5. dataset_manager.shutdown() — Bifrost engine shutdown
  6. eval_scenario_service.signal_shutdown() — offline eval engine
  7. db_pool.close() — all connections drained

Step 7 must come last. The trace cache flush at step 2 still needs a live pool. If you restructure this sequence, test under load before deploying.


Single Delta Lake writer per pod. TraceSpanDBEngine is a single-writer actor. Multi-pod deployments designate one pod as the writer; reader pods call update_incremental() and never write. Running two writer pods against the same bucket path causes Delta log conflicts. The compaction control table prevents double-compaction but does not prevent double-writing of spans.

Trace hierarchy is query-time computed. build_span_tree() is an in-memory Rust DFS on every trace fetch. It’s fast and bounded, but it’s not free. Deep traces (hundreds of spans) will be noticeably slower to assemble than shallow ones. There is no stored index on parent-child relationships.

SKIP LOCKED is not a durable job queue. Drift profile tasks live in Postgres. The window between SELECT ... SKIP LOCKED and update_drift_profile_run_dates() is not transactional. If a drift worker crashes mid-computation, the task will be picked up again on the next tick — but double-computation is possible if the crash happens at exactly the right moment.

Trace-backed eval waits in the inbox, not inside AgentPoller. Records without TraceAssertionTask go straight to pending. Records with trace assertions stay awaiting_trace until the matching anchor span commits to Delta and the inbox worker flips the row to pending with a visibility buffer. Tune SCOUTER_TRACE_VISIBILITY_BUFFER_SECS, SCOUTER_INBOX_RECONCILE_INTERVAL_SECS, and TRACE_ARRIVAL_TIMEOUT_SECS around your Delta refresh and span arrival behavior.

Queue errors are logged, not returned. The QueueBus insert path intentionally swallows errors. In production, monitor server logs for Error inserting entity into queue — there’s no counter exported by default.

PSI flushes every 30s; SPC does not. Low-traffic SPC services may accumulate records in memory for longer than expected. If you need tighter write latency for SPC, reduce the batch size or add a custom flush trigger.