From d8d1399f6d04ac85e974d5c43ac7ea35539a9458 Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Thu, 7 Dec 2023 13:56:28 +0800 Subject: [PATCH] refactor: merge tabby-stream as StreamAdapter (#971) --- ee/tabby-ui/lib/hooks/use-patch-fetch.ts | 76 ++++++++++++++++++++++-- ee/tabby-ui/lib/hooks/use-workers.ts | 2 +- ee/tabby-ui/lib/tabby-stream.ts | 71 ---------------------- 3 files changed, 73 insertions(+), 76 deletions(-) delete mode 100644 ee/tabby-ui/lib/tabby-stream.ts diff --git a/ee/tabby-ui/lib/hooks/use-patch-fetch.ts b/ee/tabby-ui/lib/hooks/use-patch-fetch.ts index cfd3535..38e56ba 100644 --- a/ee/tabby-ui/lib/hooks/use-patch-fetch.ts +++ b/ee/tabby-ui/lib/hooks/use-patch-fetch.ts @@ -1,6 +1,9 @@ -import { type Message } from 'ai/react' +import { + type AIStreamCallbacksAndOptions, + createCallbacksTransformer, + createStreamDataTransformer +} from 'ai' import { StreamingTextResponse } from 'ai' -import { TabbyStream } from '@/lib/tabby-stream' import { useEffect } from 'react' const serverUrl = process.env.NEXT_PUBLIC_TABBY_SERVER_URL || '' @@ -14,7 +17,6 @@ export function usePatchFetch() { return fetch(url, options) } - const { messages } = JSON.parse(options!.body as string) const res = await fetch(`${serverUrl}/v1beta/chat/completions`, { ...options, method: 'POST', @@ -23,8 +25,74 @@ export function usePatchFetch() { } }) - const stream = TabbyStream(res, undefined) + const stream = StreamAdapter(res, undefined) return new StreamingTextResponse(stream) } }, []) } + +const utf8Decoder = new TextDecoder('utf-8') + +async function processLines( + lines: string[], + controller: ReadableStreamDefaultController +) { + for (const line of lines) { + const { content } = JSON.parse(line) + controller.enqueue(content) + } +} + +async function readAndProcessLines( + reader: ReadableStreamDefaultReader, + controller: ReadableStreamDefaultController +) { + let segment = '' + + while (true) { + const { value: chunk, done } = await reader.read() + if (done) { + break + } + + segment += utf8Decoder.decode(chunk, { stream: true }) + + const linesArray = segment.split(/\r\n|\n|\r/g) + segment = linesArray.pop() || '' + + await processLines(linesArray, controller) + } + + if (segment) { + const linesArray = [segment] + await processLines(linesArray, controller) + } + + controller.close() +} + +function createParser(res: Response) { + const reader = res.body?.getReader() + + return new ReadableStream({ + async start(controller): Promise { + if (!reader) { + controller.close() + return + } + + await readAndProcessLines(reader, controller) + } + }) +} + +function StreamAdapter( + reader: Response, + callbacks?: AIStreamCallbacksAndOptions +): ReadableStream { + return createParser(reader) + .pipeThrough(createCallbacksTransformer(callbacks)) + .pipeThrough( + createStreamDataTransformer(callbacks?.experimental_streamData) + ) +} diff --git a/ee/tabby-ui/lib/hooks/use-workers.ts b/ee/tabby-ui/lib/hooks/use-workers.ts index f1183a4..e810dda 100644 --- a/ee/tabby-ui/lib/hooks/use-workers.ts +++ b/ee/tabby-ui/lib/hooks/use-workers.ts @@ -3,7 +3,7 @@ import { groupBy, findIndex, slice } from 'lodash-es' import { Worker, WorkerKind } from '@/lib/gql/generates/graphql' import type { HealthInfo } from './use-health' import { graphql } from '@/lib/gql/generates' -import { useGraphQLQuery } from '../tabby/gql' +import { useGraphQLQuery } from '@/lib/tabby/gql' const modelNameMap: Record = { [WorkerKind.Chat]: 'chat_model', diff --git a/ee/tabby-ui/lib/tabby-stream.ts b/ee/tabby-ui/lib/tabby-stream.ts deleted file mode 100644 index 5d2168f..0000000 --- a/ee/tabby-ui/lib/tabby-stream.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { - type AIStreamCallbacksAndOptions, - createCallbacksTransformer, - createStreamDataTransformer -} from 'ai' - -const utf8Decoder = new TextDecoder('utf-8') - -async function processLines( - lines: string[], - controller: ReadableStreamDefaultController -) { - for (const line of lines) { - const { content } = JSON.parse(line) - controller.enqueue(content) - } -} - -async function readAndProcessLines( - reader: ReadableStreamDefaultReader, - controller: ReadableStreamDefaultController -) { - let segment = '' - - while (true) { - const { value: chunk, done } = await reader.read() - if (done) { - break - } - - segment += utf8Decoder.decode(chunk, { stream: true }) - - const linesArray = segment.split(/\r\n|\n|\r/g) - segment = linesArray.pop() || '' - - await processLines(linesArray, controller) - } - - if (segment) { - const linesArray = [segment] - await processLines(linesArray, controller) - } - - controller.close() -} - -function createParser(res: Response) { - const reader = res.body?.getReader() - - return new ReadableStream({ - async start(controller): Promise { - if (!reader) { - controller.close() - return - } - - await readAndProcessLines(reader, controller) - } - }) -} - -export function TabbyStream( - reader: Response, - callbacks?: AIStreamCallbacksAndOptions -): ReadableStream { - return createParser(reader) - .pipeThrough(createCallbacksTransformer(callbacks)) - .pipeThrough( - createStreamDataTransformer(callbacks?.experimental_streamData) - ) -}