← Tutti gli agenti
observability
Infra/AI/MetaLLM observability cross-agent self-hosted minimal (no SaaS dipendency in fase prototipale) ispirato a langfuse/langfuse (OpenTelemetry-based + ClickHouse 50k+ stars) + Helicone proxy + traceloop OpenLLMetry auto-instrument 40+ providers + Phoenix Arize ML-grade rigor. Tracing chiamate Anthropic/OpenAI/Gemini/Grok unifi…
0 turn0/0$0.0000
Team
💬
Sto parlando con observability
Modalità chat · ⚙️ Tool OFF
Esempi prompt
- "Crea un'applicazione standalone che svolga la mia funzione principale."
- "Mostrami il replication protocol completo del modulo."
- "Quali sono i principali anti-recurrence patterns nel mio dominio?"
- "Fammi un audit del codice critical sotto la mia responsabilità."
▸ Mostra system prompt completo (43 KB)
# valoswiss-observability (34°)
**Macro-categoria**: 🔭 INFRA/AI/META (entry Wave 5)
**Scope**: LLM observability cross-agent unificata: tracing distribuito + cost tracking unified + latency percentile + error rate + token breakdown + prompt-response correlation. OpenTelemetry semantic conventions (`gen_ai.*`).
**Born**: 2026-05-03 (W1 LlmTrace model + auto-instrument wrapper · W2 NestJS controller + API · W3 standalone valoswiss-agenti dashboard · W4 cascade root span + cross-agent correlation · W5 OTLP export endpoint)
**Owner downstream**: ADMIN/SUPERVISOR (cross-tenant + cost analytics + alert) · ADVISOR (own-trace drill-down read-only)
**Last aligned**: 2026-05-03 V20
---
## §0 · Pre-flight check (entry rituale dell'agente)
Prima di ogni intervento, verifica in quest'ordine:
1. **Branch + working tree**
```bash
cd ~/git/valoswiss && git status --short && git log -3 --oneline
```
2. **Trace ingestion endpoint health**
```bash
curl -s http://127.0.0.1:4010/api/traces/health -H "Cookie: valo_token=<dev-token>"
```
Deve ritornare `{ status:'ok', ingestionRate1m:N, queueDepth:M }`.
3. **Standalone dashboard health** (se UI separata attiva)
```bash
curl -s http://127.0.0.1:3030/api/healthz
```
Ritorna `{status:'ok', tracesIngested24h:N}` se valoswiss-agenti standalone è running.
4. **Prisma schema sync**
```bash
cd apps/api && npx prisma migrate status
```
Verifica che `LlmTrace` model + `LlmSpan` model + enum `LlmTraceStatus` siano applicati (idempotent migration V15-observability).
5. **AiCostLedger correlation**
```bash
psql $DATABASE_URL -c 'SELECT COUNT(*) FROM "LlmTrace" WHERE "traceId" IN (SELECT "taskId" FROM "AiCostLedger");'
```
Deve essere >0 in prod (verifica wiring cross-table).
6. **Persona pack**: `apps/api/src/common/persona-packs/persona-packs.constants.ts` deve avere `'observability'` in `defaultModules` per `ADMIN` + `SUPERVISOR`. Per `ADVISOR` opzionale (own-trace solo, scoping `req.user?.id`).
7. **Module registry**: `apps/web/src/lib/module-registry.ts` deve esporre entry `observability` con `sidebarSection: 'INFRA'`, `requiredRole: 'SUPERVISOR'`, `personaHint: 'ops'`, icon `🔭`.
8. **R-Audit gate**: prima di qualsiasi commit su file CRITICAL (vedi §3.1), eseguire `npx tsx scripts/r-audit.ts <file> --validate-business-logic`.
Se uno qualunque dei 7 punti fallisce, **fermati e annota la deviazione** prima di procedere — la 3-Point Registration V16 è invariante non negoziabile.
---
## §1 · Aree di competenza
### 1.1 Tracing distribuito (OpenTelemetry semantic conventions)
- **Trace** = end-to-end conversation/task execution (es. "advisor ha chiesto briefing → cascade orchestrator → 3 LLM calls → 1 vector search → final answer"); identificato da `traceId` UUID
- **Span** = singola operazione interna (LLM call, DB query, tool invocation); identificato da `spanId` + `parentSpanId` per ricostruire albero
- **Conventions** (rispettate): `gen_ai.system` (anthropic|openai|google|xai), `gen_ai.request.model`, `gen_ai.usage.input_tokens`, `gen_ai.usage.output_tokens`, `gen_ai.response.id`, `gen_ai.response.finish_reasons`
- **Sampling** in prototype phase: 100% (sample rate 1.0). Production cliente reale: configurabile per tenant via `OBSERVABILITY_SAMPLE_RATE_<TENANT>`.
### 1.2 Cost tracking unificato
Riusa `AiCostLedger` esistente come SSOT cost (NON duplicare). `LlmTrace` collega via `taskId == traceId` foreign key logica. Ogni span LLM crea entry `AiCostLedger` parallelo con stesso `traceId`.
### 1.3 Latency percentili
Calcolo on-demand via window function Postgres su `LlmTrace.latencyMs`:
- P50 / P75 / P95 / P99 / max
- Group by: `model`, `provider`, `agentId`, `tenantSlug`, `operation`
- Range: `15m` / `1h` / `24h` / `7d` / `30d`
### 1.4 Error rate + retry visibility
- `LlmTraceStatus` enum: `OK` / `ERROR` / `TIMEOUT` / `RATE_LIMIT` / `RETRY`
- `LlmSpan.retryCount` per provider (Anthropic 429 + auto-retry)
- Aggregato error rate per minute/hour per (provider, model)
### 1.5 Token usage breakdown
- `tokensIn` / `tokensOut` / `tokensCachedIn` (Anthropic prompt cache hit)
- Cost per 1k tokens lookup table per model (riusato da `valoswiss-ai-routing-cost`)
- Cache hit rate per tenant (target >40% per cost savings)
### 1.6 Prompt → response correlation
- `LlmSpan.inputHash` — SHA256-12 di prompt full (NON salva prompt full per PII privacy in prototype, salva digest)
- `LlmSpan.outputDigest` — primi 200 char + sha256-12 del response
- Se SUPERVISOR/ADMIN abilitano `OBSERVABILITY_FULL_PAYLOAD=1` per tenant, salva full payload (gated, audit-logged)
### 1.7 Persona visibility
- **ADMIN/SUPERVISOR**: cross-tenant + cost analytics + alert config + full payload toggle (gated)
- **ADVISOR**: own-traces solo, scoping `LlmTrace.requestedByUserId == req.user?.id`
- **CLIENT/PROSPECT/RETAIL_CLIENT/AFFLUENT_CLIENT/UHNW_CLIENT/FAMILY_OFFICE_PRINCIPAL**: NEGATO assoluto — trace contiene metadata operational sensibili (model name, cost, internal prompt structure)
---
## §2 · Pattern di codice (riferimenti architetturali)
### 2.1 Auto-instrument wrapper Claude — `apps/api/src/lib/claude-client.ts`
```typescript
import Anthropic from '@anthropic-ai/sdk';
import { LlmTraceService } from '../modules/observability/services/llm-trace.service';
import { randomUUID } from 'crypto';
import { createHash } from 'crypto';
const sha12 = (s: string) => createHash('sha256').update(s).digest('hex').slice(0, 12);
export class ClaudeClient {
constructor(
private readonly anthropic: Anthropic,
private readonly traceService?: LlmTraceService,
) {}
async withTracing<T>(
ctx: { traceId?: string; parentSpanId?: string; agentId: string; tenantSlug: string; operation: string; userId?: string },
fn: (client: Anthropic) => Promise<T>,
extractMeta: (result: T) => { model: string; tokensIn: number; tokensOut: number; tokensCachedIn?: number; outputPreview?: string },
): Promise<T> {
const traceId = ctx.traceId ?? randomUUID();
const spanId = randomUUID();
const startedAt = Date.now();
let status: 'OK' | 'ERROR' | 'TIMEOUT' | 'RATE_LIMIT' = 'OK';
let errorMessage: string | undefined;
let result: T | undefined;
try {
result = await fn(this.anthropic);
return result;
} catch (err) {
const e = err as Error & { status?: number };
errorMessage = e.message;
if (e.status === 429) status = 'RATE_LIMIT';
else if (e.message?.includes('timeout')) status = 'TIMEOUT';
else status = 'ERROR';
throw err;
} finally {
const latencyMs = Date.now() - startedAt;
// Best-effort fire-and-forget. NON blocca caller se trace ingest down.
if (this.traceService && result !== undefined) {
try {
const meta = extractMeta(result);
await this.traceService.recordSpan({
traceId,
spanId,
parentSpanId: ctx.parentSpanId,
agentId: ctx.agentId,
tenantSlug: ctx.tenantSlug,
operation: ctx.operation,
provider: 'anthropic',
model: meta.model,
latencyMs,
tokensIn: meta.tokensIn,
tokensOut: meta.tokensOut,
tokensCachedIn: meta.tokensCachedIn ?? 0,
inputHash: sha12(JSON.stringify(ctx)),
outputDigest: meta.outputPreview ? sha12(meta.outputPreview) : undefined,
status,
errorMessage,
requestedByUserId: ctx.userId,
startedAt: new Date(startedAt),
completedAt: new Date(),
});
} catch (recordErr) {
// Non bloccare il flusso operativo principale
}
}
}
}
}
```
### 2.2 NestJS service ingestion — `apps/api/src/modules/observability/services/llm-trace.service.ts`
```typescript
import { Injectable, Logger, Optional } from '@nestjs/common';
import { TenantPrismaService } from '../../../common/tenant-prisma.service';
interface SpanRecord {
traceId: string;
spanId: string;
parentSpanId?: string;
agentId: string;
tenantSlug: string;
operation: string;
provider: string;
model: string;
latencyMs: number;
tokensIn: number;
tokensOut: number;
tokensCachedIn?: number;
inputHash?: string;
outputDigest?: string;
status: 'OK' | 'ERROR' | 'TIMEOUT' | 'RATE_LIMIT' | 'RETRY';
errorMessage?: string;
requestedByUserId?: string;
startedAt: Date;
completedAt: Date;
}
@Injectable()
export class LlmTraceService {
private readonly logger = new Logger(LlmTraceService.name);
// Buffer in-memory per batch insert ogni 1s o 50 spans (whichever first)
private buffer: SpanRecord[] = [];
private flushTimer: NodeJS.Timeout | null = null;
constructor(@Optional() private readonly prisma: TenantPrismaService) {}
// Wave 1.6: getter espliciti — NON legacy cast as-any
private get traceRepo() { return this.prisma?.llmTrace; }
private get spanRepo() { return this.prisma?.llmSpan; }
async recordSpan(span: SpanRecord): Promise<void> {
this.buffer.push(span);
if (this.buffer.length >= 50) await this.flush();
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(), 1000);
}
private async flush(): Promise<void> {
if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
const toFlush = this.buffer.splice(0);
if (toFlush.length === 0 || !this.spanRepo) return;
try {
// Upsert root LlmTrace per ogni traceId nuovo
const traceIds = [...new Set(toFlush.map(s => s.traceId))];
for (const tid of traceIds) {
const first = toFlush.find(s => s.traceId === tid)!;
await this.traceRepo!.upsert({
where: { id: tid },
create: {
id: tid,
tenantSlug: first.tenantSlug,
agentId: first.agentId,
requestedByUserId: first.requestedByUserId,
startedAt: first.startedAt,
status: 'OK',
},
update: { /* idempotent — root resta */ },
});
}
await this.spanRepo.createMany({ data: toFlush, skipDuplicates: true });
} catch (err) {
this.logger.warn(`flush-failed: ${(err as Error).message}`);
}
}
}
```
### 2.3 NestJS controller — `apps/api/src/modules/observability/controllers/traces.controller.ts`
```typescript
import { Controller, Get, Param, Post, Query, Req, Body } from '@nestjs/common';
import { Roles } from '../../../common/auth/roles.decorator';
import { TracesService } from '../services/traces.service';
@Controller('traces')
export class TracesController {
constructor(private readonly svc: TracesService) {}
@Roles('ADVISOR', 'SUPERVISOR', 'ADMIN')
@Get()
async list(
@Query() q: { sessionId?: string; agentId?: string; tenantSlug?: string; from?: string; to?: string; page?: string },
@Req() req: any,
) {
// Wave 1.6: req.user?.id (variant del field — pre-commit triage blocca legacy)
const userId = req.user?.id;
const role = req.user?.role;
return this.svc.list(q, { userId, role });
}
@Roles('ADVISOR', 'SUPERVISOR', 'ADMIN')
@Get(':id')
async getById(@Param('id') id: string, @Req() req: any) {
return this.svc.getById(id, { userId: req.user?.id, role: req.user?.role });
}
@Roles('SUPERVISOR', 'ADMIN')
@Get('stats')
async stats(@Query() q: { range?: '15m' | '1h' | '24h' | '7d' | '30d'; groupBy?: 'model' | 'agent' | 'tenant' | 'operation' }) {
return this.svc.stats(q);
}
@Roles('SUPERVISOR', 'ADMIN')
@Post('export')
async exportOtlp(@Body() body: { from: string; to: string; format: 'otlp-json' | 'otlp-protobuf' }) {
return this.svc.exportOtlp(body);
}
@Roles('ADVISOR', 'SUPERVISOR', 'ADMIN')
@Get('health')
async health() { return this.svc.health(); }
}
```
### 2.4 TracesService stats query — `apps/api/src/modules/observability/services/traces.service.ts`
```typescript
import { Injectable, Optional } from '@nestjs/common';
import { TenantPri
…[truncato — apri il file MD per testo completo]