tabby/python/tabby-eval/modal/predict.py

210 lines
6.4 KiB
Python
Raw Normal View History

import asyncio
import json
import os
import pandas as pd
from datetime import datetime
from modal import Image, Stub, gpu, method
from typing import List, Optional, Tuple
GPU_CONFIG = gpu.A10G()
MODEL_ID = os.environ.get("MODEL_ID")
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
def download_model():
import subprocess
import os
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')
subprocess.run(
[
"/opt/tabby/bin/tabby",
"download",
"--model",
MODEL_ID,
]
)
image = (
Image.from_registry(
"tabbyml/tabby:0.5.5",
add_python="3.11",
)
.env({"MODEL_ID": os.environ.get("MODEL_ID")})
.dockerfile_commands("ENTRYPOINT []")
.copy_local_dir(local_path='./modal/tabby_python_client/tabby_python_client', remote_path='/root/tabby_python_client')
.pip_install(
"httpx",
"pandas"
)
.run_function(download_model)
)
stub = Stub("tabby-" + MODEL_ID.split("/")[-1], image=image)
@stub.cls(
gpu=GPU_CONFIG,
concurrency_limit=10,
allow_concurrent_inputs=2,
container_idle_timeout=60 * 10,
timeout=600,
)
class Model:
def __enter__(self):
import socket
import subprocess
import os
import time
from tabby_python_client import Client
my_env = os.environ.copy()
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1"
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)
# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
return True
except (socket.timeout, ConnectionRefusedError):
# Check if launcher webserving process has exited.
# If so, a connection can never be made.
retcode = self.launcher.poll()
if retcode is not None:
raise RuntimeError(
f"launcher exited unexpectedly with code {retcode}"
)
return False
while not webserver_ready():
time.sleep(1.0)
print("Tabby server ready!")
def __exit__(self, _exc_type, _exc_value, _traceback):
self.launcher.terminate()
@method()
async def health(self):
from tabby_python_client.api.v1 import health
resp = await health.asyncio(client=self.client)
return resp.to_dict()
@method()
async def complete(self, language: str, index: int, prompt: str) -> Tuple[int, Optional[str], Optional[str]]:
from tabby_python_client import errors
from tabby_python_client.api.v1 import completion
from tabby_python_client.models import (
CompletionRequest,
DebugOptions,
)
from tabby_python_client.types import Response
request = CompletionRequest(
language=language, debug_options=DebugOptions(raw_prompt=prompt)
)
try:
resp: Response = await completion.asyncio_detailed(
client=self.client, json_body=request
)
if resp.parsed is not None:
return index, resp.parsed.choices[0].text, None
else:
return index, None, f"<{resp.status_code}>"
except errors.UnexpectedStatus as e:
return index, None, f"error: code={e.status_code} content={e.content} error={e}"
except Exception as e:
return index, None, f"error type: {type(e)}"
def write_log(log: str):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open('./modal/log.txt', 'a') as f:
f.write(f"{now} : {log}")
f.write("\n")
def chunker(seq, size) -> List:
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def read_dataframe_from_file(language: str, file: str) -> pd.DataFrame:
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file
objs = []
with open(whole_path_file) as fin:
for line in fin:
obj = json.loads(line)
if 'crossfile_context' in obj.keys():
obj['raw_prompt'] = obj['crossfile_context']['text'] + obj['prompt']
else:
obj['raw_prompt'] = obj['prompt']
objs.append(obj)
df = pd.DataFrame(objs)
return df
@stub.local_entrypoint()
async def main(language: str, files: str):
#Multiple files seperated by ','
model = Model()
health_resp = model.health.remote()
print(f'model info:\n{health_resp}')
assert(health_resp['model'] == MODEL_ID)
files = files.split(',')
for file in files:
df = read_dataframe_from_file(language, file.strip())
write_log(f'model: {MODEL_ID}; language: {language}; file: {file}: length = {len(df)}')
if 'prediction' in df.columns:
df_no_prediction = df[df['prediction'].isna()]
else:
df_no_prediction = df
skipped = len(df) - len(df_no_prediction)
success = 0
error = 0
for group in chunker(df_no_prediction, 30):
outputs = await asyncio.gather(*[model.complete.remote.aio(language, index, row['raw_prompt']) for index, row in group.iterrows()])
for index, prediction, error_msg in outputs:
if prediction is not None:
df.loc[index, 'prediction'] = prediction
success += 1
else:
df.loc[index, 'error'] = error_msg
error += 1
write_log(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file
with open(whole_path_file, 'w') as fout:
for index, row in df.iterrows():
row_dict = row.to_dict()
json.dump(row_dict, fout)
fout.write('\n')
write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: end!\n")