Skip to content

Streaming check_output

The non-streaming check_output blocks until the detector finishes evaluating the full text. For long outputs (>1200 chars) that’s 5–8 seconds of dead air on the user’s screen. Streaming runs detection on rolling windows as the LLM emits tokens, so the customer sees violations as they’re found.

MetricNon-streaming check_outputStreaming
Time to first violation event5–8 s (long mode) / 1.5–2.5 s (short mode)~1.5 s regardless of total output length
Total work (token cost)One ~6 s call~3-5 short-mode calls (~1.5 s each), comparable total
UX during the waitSpinnerProgressive results, optional cancellation
Best forAsync review, auditReal-time chat, voice agents, long generations

You stream output chunks from your LLM into the SDK’s streamCheckOutput. The SDK buffers chunks and calls POST /api/v1/agent/governance/check-output/stream every ~200 chars (configurable). Each call:

  1. Detects on the new segment + a 200-char overlap window (so violations spanning chunk boundaries still get caught).
  2. Filters violations whose first occurrence is in the prior window (already emitted).
  3. Returns the new violations + a watermark to pass back next call.

At end-of-stream, the SDK calls POST /api/v1/agent/governance/check-output/finalize for the consolidated verdict (safe, suggestedRedaction, full violations list).

The SDK exposes the whole flow as an async iterable. You consume violation, progress, and complete events with for await.

import { InPolicyClient } from '@inpolicy/sdk';
import OpenAI from 'openai';
const ip = new InPolicyClient({ apiKey: process.env.INPOLICY_API_KEY! });
const openai = new OpenAI();
async function* outputChunks(stream: AsyncIterable<{
choices: { delta: { content?: string | null } }[];
}>) {
for await (const part of stream) {
yield part.choices[0]?.delta?.content ?? '';
}
}
async function respond(sessionId: string, userMessage: string) {
// 1. Pre-inference (fast, ~100 ms)
const pre = await ip.recordTurn({
sessionId,
turn: { role: 'user', content: userMessage },
});
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
stream: true,
messages: [
{ role: 'system', content: pre.injectionBlock },
{ role: 'user', content: userMessage },
],
});
// 2. Drive incremental detection on the streamed output.
let unsafe = false;
for await (const evt of ip.streamCheckOutput({
sessionId,
source: outputChunks(completion as never),
})) {
switch (evt.type) {
case 'violation':
console.warn(`[InPolicy] ${evt.violation.matchedText}: ${evt.violation.explanation}`);
// Optionally cancel generation here, or just buffer and warn at the end.
break;
case 'progress':
// Useful for live UI: "checked X chars so far"
break;
case 'complete':
unsafe = !evt.verdict.safe;
if (unsafe && evt.verdict.suggestedRedaction) {
// Send the redacted version, or regenerate.
}
break;
case 'error':
console.error('[InPolicy] streaming error:', evt.message);
break;
}
}
}
Use caseRecommended
Real-time customer-service chat (interactive UI)Streaming
Voice agent (sub-second latency required)Streaming, with cancellation on first fix violation
Email drafting tool (human reviews before sending)Non-streaming check_output is fine
Compliance audit pipeline (offline)Non-streaming, queued
Code review / copilot suggestionsNon-streaming
Long generation (>1500 chars, e.g. document drafting)Streaming; non-streaming hits long mode (5–8 s)

Streaming costs slightly more. Total token usage is comparable to one full long-mode pass, but you get more API calls to handle. Each stream call is short-mode (cheaper) and bounded; at end-of-stream the finalize call is one full pass to compute suggestedRedaction and the consolidated verdict.

Streaming gives you cancellation. If a fix-level violation surfaces 1.5s in, your agent can stop generation, return suggestedRedaction, and save the rest of the LLM cost. Non-streaming can’t do this. By the time you see the violation, the LLM has already finished.

Streaming has a watermark contract. Pass back the watermark from the previous call as lastCheckedOffset on the next. The SDK does this automatically; if you call streamCheckOutputOnce directly, you manage it yourself.

If you only care about per-chunk violation events and want to skip the end-of-stream finalize call:

for await (const evt of ip.streamCheckOutput({
sessionId,
source: outputChunks(...),
finalize: false, // ← skips /check-output/finalize
})) { ... }

Saves one round-trip and one short-mode LLM call. Cost: you don’t get suggestedRedaction or the consolidated safe verdict.

Direct API access (without the SDK helper)

Section titled “Direct API access (without the SDK helper)”

If you need lower-level control:

POST /api/v1/agent/governance/check-output/stream
Authorization: Bearer inp_live_…
Content-Type: application/json
{
"sessionId": "sess_abc",
"fullText": "Sure, here is the refund for John Smith (john@acme.com)",
"lastCheckedOffset": 0,
"isFinal": false
}

Response shape:

{
"sessionId": "sess_abc",
"newViolations": [
{
"policyId": "policy_pii",
"matchedText": "John Smith (john@acme.com)",
"explanation": "Customer name and email...",
"severity": 4.5,
"confidence": 0.92,
"enforcementType": "fix",
"enforcementText": "[REDACTED]",
"offset": 28
}
],
"watermark": 56,
"isComplete": false,
"riskScore": 0.92,
"traceId": "trc_…"
}

Pass watermark (56 above) as lastCheckedOffset on the next call. When the LLM finishes, set isFinal: true to force detection on any tail content below the chunk threshold. Then call POST /api/v1/agent/governance/check-output/finalize for the consolidated verdict.

OperationTypicalp95
Single /check-output/stream call1.0–2.5 s3 s
End-of-stream /finalize1.5–6 s (depends on full output length)8 s

Time-to-first-violation in practice is dominated by the first /check-output/stream call once you cross chunkSize of buffered text. Using the default 200-char chunk size, this is ~1.5 s after the LLM emits its first ~200 chars.

Same as non-streaming check_output: requires post_inference_checking: "allowed" on the API key. Streaming endpoints return 403 otherwise.