Docs
CodeRabbit
Cloudflare
AG Grid
Netlify
Neon
WorkOS
Clerk
Convex
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
CodeRabbit
Cloudflare
AG Grid
Netlify
Neon
WorkOS
Clerk
Convex
Electric
PowerSync
Sentry
Railway
Prisma
Strapi
Unkey
Class References
Function References
Interface References
Type Alias References
Variable References
Class References

StreamProcessor

Class: StreamProcessor

Defined in: activities/chat/stream/processor.ts:114

StreamProcessor - State machine for processing AI response streams

Manages the full UIMessage[] conversation and emits events on changes.

State tracking:

  • Full message array
  • Current assistant message being streamed
  • Text content accumulation
  • Multiple parallel tool calls
  • Tool call completion detection

Tool call completion is detected when:

  1. A new tool call starts at a different index
  2. Text content arrives
  3. Stream ends

Constructors

Constructor

ts
new StreamProcessor(options): StreamProcessor;

Defined in: activities/chat/stream/processor.ts:142

Parameters

options

StreamProcessorOptions = {}

Returns

StreamProcessor

Methods

addToolApprovalResponse()

ts
addToolApprovalResponse(approvalId, approved): void;

Defined in: activities/chat/stream/processor.ts:255

Add an approval response (called by client after handling onApprovalRequest)

Parameters

approvalId

string

approved

boolean

Returns

void


addToolResult()

ts
addToolResult(
   toolCallId, 
   output, 
   error?): void;

Defined in: activities/chat/stream/processor.ts:211

Add a tool result (called by client after handling onToolCall)

Parameters

toolCallId

string

output

any

error?

string

Returns

void


addUserMessage()

ts
addUserMessage(content): UIMessage;

Defined in: activities/chat/stream/processor.ts:169

Add a user message to the conversation

Parameters

content

string

Returns

UIMessage


areAllToolsComplete()

ts
areAllToolsComplete(): boolean;

Defined in: activities/chat/stream/processor.ts:286

Check if all tool calls in the last assistant message are complete Useful for auto-continue logic

Returns

boolean


clearMessages()

ts
clearMessages(): void;

Defined in: activities/chat/stream/processor.ts:318

Clear all messages

Returns

void


finalizeStream()

ts
finalizeStream(): void;

Defined in: activities/chat/stream/processor.ts:814

Finalize the stream - complete all pending operations

Returns

void


getMessages()

ts
getMessages(): UIMessage[];

Defined in: activities/chat/stream/processor.ts:278

Get current messages

Returns

UIMessage[]


getRecording()

ts
getRecording(): ChunkRecording | null;

Defined in: activities/chat/stream/processor.ts:893

Get the current recording

Returns

ChunkRecording | null


getState()

ts
getState(): ProcessorState;

Defined in: activities/chat/stream/processor.ts:866

Get current processor state

Returns

ProcessorState


process()

ts
process(stream): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:331

Process a stream and emit events through handlers

Parameters

stream

AsyncIterable<any>

Returns

Promise<ProcessorResult>


processChunk()

ts
processChunk(chunk): void;

Defined in: activities/chat/stream/processor.ts:359

Process a single chunk from the stream

Parameters

chunk

AGUIEvent

Returns

void


removeMessagesAfter()

ts
removeMessagesAfter(index): void;

Defined in: activities/chat/stream/processor.ts:310

Remove messages after a certain index (for reload/retry)

Parameters

index

number

Returns

void


reset()

ts
reset(): void;

Defined in: activities/chat/stream/processor.ts:916

Full reset (including messages)

Returns

void


setMessages()

ts
setMessages(messages): void;

Defined in: activities/chat/stream/processor.ts:161

Set the messages array (e.g., from persisted state)

Parameters

messages

UIMessage[]

Returns

void


startAssistantMessage()

ts
startAssistantMessage(): string;

Defined in: activities/chat/stream/processor.ts:187

Start streaming a new assistant message Returns the message ID

Returns

string


startRecording()

ts
startRecording(): void;

Defined in: activities/chat/stream/processor.ts:880

Start recording chunks

Returns

void


toModelMessages()

ts
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown, unknown>[]
  | null>[];

Defined in: activities/chat/stream/processor.ts:267

Get the conversation as ModelMessages (for sending to LLM)

Returns

ModelMessage< | string | ContentPart<unknown, unknown, unknown, unknown, unknown>[] | null>[]


replay()

ts
static replay(recording, options?): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:925

Replay a recording through the processor

Parameters

recording

ChunkRecording

options?

StreamProcessorOptions

Returns

Promise<ProcessorResult>