Documentation
Framework
Version
Class References
Function References
Interface References
Type Alias References
Variable References

StreamProcessor

Class: StreamProcessor

Defined in: stream/processor.ts:171

StreamProcessor - State machine for processing AI response streams

Manages the full UIMessage[] conversation and emits events on changes.

State tracking:

  • Full message array
  • Current assistant message being streamed
  • Text content accumulation
  • Multiple parallel tool calls
  • Tool call completion detection

Tool call completion is detected when:

  1. A new tool call starts at a different index
  2. Text content arrives
  3. Stream ends

Constructors

Constructor

ts
new StreamProcessor(options): StreamProcessor;
new StreamProcessor(options): StreamProcessor;

Defined in: stream/processor.ts:200

Parameters

options

StreamProcessorOptions = {}

Returns

StreamProcessor

Methods

addToolApprovalResponse()

ts
addToolApprovalResponse(approvalId, approved): void;
addToolApprovalResponse(approvalId, approved): void;

Defined in: stream/processor.ts:314

Add an approval response (called by client after handling onApprovalRequest)

Parameters

approvalId

string

approved

boolean

Returns

void


addToolResult()

ts
addToolResult(
   toolCallId, 
   output, 
   error?): void;
addToolResult(
   toolCallId, 
   output, 
   error?): void;

Defined in: stream/processor.ts:270

Add a tool result (called by client after handling onToolCall)

Parameters

toolCallId

string

output

any

error?

string

Returns

void


addUserMessage()

ts
addUserMessage(content): UIMessage;
addUserMessage(content): UIMessage;

Defined in: stream/processor.ts:228

Add a user message to the conversation

Parameters

content

string

Returns

UIMessage


areAllToolsComplete()

ts
areAllToolsComplete(): boolean;
areAllToolsComplete(): boolean;

Defined in: stream/processor.ts:345

Check if all tool calls in the last assistant message are complete Useful for auto-continue logic

Returns

boolean


clearMessages()

ts
clearMessages(): void;
clearMessages(): void;

Defined in: stream/processor.ts:377

Clear all messages

Returns

void


finalizeStream()

ts
finalizeStream(): void;
finalizeStream(): void;

Defined in: stream/processor.ts:951

Finalize the stream - complete all pending operations

Returns

void


getMessages()

ts
getMessages(): UIMessage[];
getMessages(): UIMessage[];

Defined in: stream/processor.ts:337

Get current messages

Returns

UIMessage[]


getRecording()

ts
getRecording(): ChunkRecording | null;
getRecording(): ChunkRecording | null;

Defined in: stream/processor.ts:1037

Get the current recording

Returns

ChunkRecording | null


getState()

ts
getState(): ProcessorState;
getState(): ProcessorState;

Defined in: stream/processor.ts:1010

Get current processor state (legacy)

Returns

ProcessorState


process()

ts
process(stream): Promise<ProcessorResult>;
process(stream): Promise<ProcessorResult>;

Defined in: stream/processor.ts:390

Process a stream and emit events through handlers

Parameters

stream

AsyncIterable<any>

Returns

Promise<ProcessorResult>


processChunk()

ts
processChunk(chunk): void;
processChunk(chunk): void;

Defined in: stream/processor.ts:418

Process a single chunk from the stream

Parameters

chunk

StreamChunk

Returns

void


removeMessagesAfter()

ts
removeMessagesAfter(index): void;
removeMessagesAfter(index): void;

Defined in: stream/processor.ts:369

Remove messages after a certain index (for reload/retry)

Parameters

index

number

Returns

void


reset()

ts
reset(): void;
reset(): void;

Defined in: stream/processor.ts:1060

Full reset (including messages)

Returns

void


setMessages()

ts
setMessages(messages): void;
setMessages(messages): void;

Defined in: stream/processor.ts:220

Set the messages array (e.g., from persisted state)

Parameters

messages

UIMessage[]

Returns

void


startAssistantMessage()

ts
startAssistantMessage(): string;
startAssistantMessage(): string;

Defined in: stream/processor.ts:246

Start streaming a new assistant message Returns the message ID

Returns

string


startRecording()

ts
startRecording(): void;
startRecording(): void;

Defined in: stream/processor.ts:1024

Start recording chunks

Returns

void


toModelMessages()

ts
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown>[]
  | null>[];
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown>[]
  | null>[];

Defined in: stream/processor.ts:326

Get the conversation as ModelMessages (for sending to LLM)

Returns

ModelMessage< | string | ContentPart<unknown, unknown, unknown, unknown>[] | null>[]


replay()

ts
static replay(recording, options?): Promise<ProcessorResult>;
static replay(recording, options?): Promise<ProcessorResult>;

Defined in: stream/processor.ts:1069

Replay a recording through the processor

Parameters

recording

ChunkRecording

options?

StreamProcessorOptions

Returns

Promise<ProcessorResult>