import { type AIStreamCallbacksAndOptions, createCallbacksTransformer, createStreamDataTransformer } from 'ai' const utf8Decoder = new TextDecoder('utf-8') async function processLines( lines: string[], controller: ReadableStreamDefaultController ) { for (const line of lines) { const { content } = JSON.parse(line) controller.enqueue(content) } } async function readAndProcessLines( reader: ReadableStreamDefaultReader, controller: ReadableStreamDefaultController ) { let segment = '' while (true) { const { value: chunk, done } = await reader.read() if (done) { break } segment += utf8Decoder.decode(chunk, { stream: true }) const linesArray = segment.split(/\r\n|\n|\r/g) segment = linesArray.pop() || '' await processLines(linesArray, controller) } if (segment) { const linesArray = [segment] await processLines(linesArray, controller) } controller.close() } function createParser(res: Response) { const reader = res.body?.getReader() return new ReadableStream({ async start(controller): Promise { if (!reader) { controller.close() return } await readAndProcessLines(reader, controller) } }) } export function TabbyStream( reader: Response, callbacks?: AIStreamCallbacksAndOptions ): ReadableStream { return createParser(reader) .pipeThrough(createCallbacksTransformer(callbacks)) .pipeThrough( createStreamDataTransformer(callbacks?.experimental_streamData) ) }