refactor: merge tabby-stream as StreamAdapter (#971)

support-auth-token
Meng Zhang 2023-12-07 13:56:28 +08:00 committed by GitHub
parent bfc2de49a3
commit d8d1399f6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 76 deletions

View File

@ -1,6 +1,9 @@
import { type Message } from 'ai/react'
import {
type AIStreamCallbacksAndOptions,
createCallbacksTransformer,
createStreamDataTransformer
} from 'ai'
import { StreamingTextResponse } from 'ai'
import { TabbyStream } from '@/lib/tabby-stream'
import { useEffect } from 'react'
const serverUrl = process.env.NEXT_PUBLIC_TABBY_SERVER_URL || ''
@ -14,7 +17,6 @@ export function usePatchFetch() {
return fetch(url, options)
}
const { messages } = JSON.parse(options!.body as string)
const res = await fetch(`${serverUrl}/v1beta/chat/completions`, {
...options,
method: 'POST',
@ -23,8 +25,74 @@ export function usePatchFetch() {
}
})
const stream = TabbyStream(res, undefined)
const stream = StreamAdapter(res, undefined)
return new StreamingTextResponse(stream)
}
}, [])
}
const utf8Decoder = new TextDecoder('utf-8')
async function processLines(
lines: string[],
controller: ReadableStreamDefaultController<string>
) {
for (const line of lines) {
const { content } = JSON.parse(line)
controller.enqueue(content)
}
}
async function readAndProcessLines(
reader: ReadableStreamDefaultReader<Uint8Array>,
controller: ReadableStreamDefaultController<string>
) {
let segment = ''
while (true) {
const { value: chunk, done } = await reader.read()
if (done) {
break
}
segment += utf8Decoder.decode(chunk, { stream: true })
const linesArray = segment.split(/\r\n|\n|\r/g)
segment = linesArray.pop() || ''
await processLines(linesArray, controller)
}
if (segment) {
const linesArray = [segment]
await processLines(linesArray, controller)
}
controller.close()
}
function createParser(res: Response) {
const reader = res.body?.getReader()
return new ReadableStream<string>({
async start(controller): Promise<void> {
if (!reader) {
controller.close()
return
}
await readAndProcessLines(reader, controller)
}
})
}
function StreamAdapter(
reader: Response,
callbacks?: AIStreamCallbacksAndOptions
): ReadableStream {
return createParser(reader)
.pipeThrough(createCallbacksTransformer(callbacks))
.pipeThrough(
createStreamDataTransformer(callbacks?.experimental_streamData)
)
}

View File

@ -3,7 +3,7 @@ import { groupBy, findIndex, slice } from 'lodash-es'
import { Worker, WorkerKind } from '@/lib/gql/generates/graphql'
import type { HealthInfo } from './use-health'
import { graphql } from '@/lib/gql/generates'
import { useGraphQLQuery } from '../tabby/gql'
import { useGraphQLQuery } from '@/lib/tabby/gql'
const modelNameMap: Record<WorkerKind, 'chat_model' | 'model'> = {
[WorkerKind.Chat]: 'chat_model',

View File

@ -1,71 +0,0 @@
import {
type AIStreamCallbacksAndOptions,
createCallbacksTransformer,
createStreamDataTransformer
} from 'ai'
const utf8Decoder = new TextDecoder('utf-8')
async function processLines(
lines: string[],
controller: ReadableStreamDefaultController<string>
) {
for (const line of lines) {
const { content } = JSON.parse(line)
controller.enqueue(content)
}
}
async function readAndProcessLines(
reader: ReadableStreamDefaultReader<Uint8Array>,
controller: ReadableStreamDefaultController<string>
) {
let segment = ''
while (true) {
const { value: chunk, done } = await reader.read()
if (done) {
break
}
segment += utf8Decoder.decode(chunk, { stream: true })
const linesArray = segment.split(/\r\n|\n|\r/g)
segment = linesArray.pop() || ''
await processLines(linesArray, controller)
}
if (segment) {
const linesArray = [segment]
await processLines(linesArray, controller)
}
controller.close()
}
function createParser(res: Response) {
const reader = res.body?.getReader()
return new ReadableStream<string>({
async start(controller): Promise<void> {
if (!reader) {
controller.close()
return
}
await readAndProcessLines(reader, controller)
}
})
}
export function TabbyStream(
reader: Response,
callbacks?: AIStreamCallbacksAndOptions
): ReadableStream {
return createParser(reader)
.pipeThrough(createCallbacksTransformer(callbacks))
.pipeThrough(
createStreamDataTransformer(callbacks?.experimental_streamData)
)
}