feat: add `tabby-eval` package to run e2e evaluation on modal (#893)

* updating predict.py, adding tabby_python_client package

* update predict.py

* move to tabby-eval directory

* delete old files

* update predict.py

* delete other folder files
add-prompt-lookup
yan91083 2023-11-25 03:36:00 -08:00 committed by GitHub
parent 1883058008
commit 39962c79ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 2436 additions and 0 deletions

View File

@ -0,0 +1,212 @@
import asyncio
import json
import modal
import os
import pandas as pd
from collections import namedtuple
from datetime import datetime
from modal import Image, Mount, Secret, Stub, asgi_app, gpu, method
from pathlib import Path
from typing import Union, List, Optional, Any, Tuple
GPU_CONFIG = gpu.A10G()
MODEL_ID = os.environ.get("MODEL_ID")
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
def download_model():
import subprocess
import os
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')
subprocess.run(
[
"/opt/tabby/bin/tabby",
"download",
"--model",
MODEL_ID,
]
)
image = (
Image.from_registry(
"tabbyml/tabby:0.5.5",
add_python="3.11",
)
.env({"MODEL_ID": os.environ.get("MODEL_ID")})
.dockerfile_commands("ENTRYPOINT []")
.copy_local_dir(local_path='./modal/tabby_python_client/tabby_python_client', remote_path='/root/tabby_python_client')
.pip_install(
"httpx",
"pandas"
)
.run_function(download_model)
)
stub = Stub("tabby-" + MODEL_ID.split("/")[-1], image=image)
@stub.cls(
gpu=GPU_CONFIG,
concurrency_limit=10,
allow_concurrent_inputs=2,
container_idle_timeout=60 * 10,
timeout=600,
)
class Model:
def __enter__(self):
import socket
import subprocess, os
import time
from tabby_python_client import Client
my_env = os.environ.copy()
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1"
MODEL_ID = os.environ.get("MODEL_ID")
print(f'MODEL_ID={MODEL_ID}')
LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)
# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
return True
except (socket.timeout, ConnectionRefusedError):
# Check if launcher webserving process has exited.
# If so, a connection can never be made.
retcode = self.launcher.poll()
if retcode is not None:
raise RuntimeError(
f"launcher exited unexpectedly with code {retcode}"
)
return False
while not webserver_ready():
time.sleep(1.0)
print("Tabby server ready!")
def __exit__(self, _exc_type, _exc_value, _traceback):
self.launcher.terminate()
@method()
async def health(self):
from tabby_python_client.api.v1 import health
resp = await health.asyncio(client=self.client)
return resp.to_dict()
@method()
async def complete(self, language: str, index: int, prompt: str) -> Tuple[int, Optional[str], Optional[str]]:
from tabby_python_client.api.v1 import completion
from tabby_python_client.models import (
CompletionRequest,
DebugOptions,
CompletionResponse,
Segments,
)
from tabby_python_client.types import Response
request = CompletionRequest(
language=language, debug_options=DebugOptions(raw_prompt=prompt)
)
try:
resp: Response = await completion.asyncio_detailed(
client=self.client, json_body=request
)
if resp.parsed != None:
return index, resp.parsed.choices[0].text, None
else:
return index, None, f"<{resp.status_code}>"
except errors.UnexpectedStatus as e:
return index, None, f"error: code={e.status_code} content={e.content} error={e}"
except Exception as e:
return index, None, f"error type: {type(e)}"
def write_log(log: str):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open('./modal/log.txt', 'a') as f:
f.write(f"{now} : {log}")
f.write("\n")
def chunker(seq, size) -> List:
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def read_dataframe_from_file(language: str, file: str) -> pd.DataFrame:
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file
objs = []
with open(whole_path_file) as fin:
for line in fin:
obj = json.loads(line)
if 'crossfile_context' in obj.keys():
obj['raw_prompt'] = obj['crossfile_context']['text'] + obj['prompt']
else:
obj['raw_prompt'] = obj['prompt']
objs.append(obj)
df = pd.DataFrame(objs)
return df
@stub.local_entrypoint()
async def main(language: str, files: str):
#Multiple files seperated by ','
model = Model()
health_resp = model.health.remote()
print(f'model info:\n{health_resp}')
assert(health_resp['model'] == MODEL_ID)
files = files.split(',')
for file in files:
df = read_dataframe_from_file(language, file.strip())
write_log(f'model: {MODEL_ID}; language: {language}; file: {file}: length = {len(df)}')
if 'prediction' in df.columns:
df_no_prediction = df[df['prediction'].isna()]
else:
df_no_prediction = df
skipped = len(df) - len(df_no_prediction)
success = 0
error = 0
for group in chunker(df_no_prediction, 30):
outputs = await asyncio.gather(*[model.complete.remote.aio(language, index, row['raw_prompt']) for index, row in group.iterrows()])
for index, prediction, error_msg in outputs:
if prediction is not None:
df.loc[index, 'prediction'] = prediction
success += 1
else:
df.loc[index, 'error'] = error_msg
error += 1
write_log(f"Skipped {skipped} rows, {success} rows with predictions, {error} rows with errors")
whole_path_file = "./data/" + MODEL_ID.split("/")[-1] + "/" + language + "/" + file
with open(whole_path_file, 'w') as fout:
for index, row in df.iterrows():
row_dict = row.to_dict()
json.dump(row_dict, fout)
fout.write('\n')
write_log(f"model: {MODEL_ID}; language: {language}; file: {file}: end!\n")

View File

@ -0,0 +1,23 @@
__pycache__/
build/
dist/
*.egg-info/
.pytest_cache/
# pyenv
.python-version
# Environments
.env
.venv
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# JetBrains
.idea/
/coverage.xml
/.coverage

View File

@ -0,0 +1,89 @@
# tabby-python-client
A client library for accessing Tabby Server
## Usage
First, create a client:
```python
from tabby_python_client import Client
client = Client(base_url="https://api.example.com")
```
If the endpoints you're going to hit require authentication, use `AuthenticatedClient` instead:
```python
from tabby_python_client import AuthenticatedClient
client = AuthenticatedClient(base_url="https://api.example.com", token="SuperSecretToken")
```
Now call your endpoint and use your models:
```python
from tabby_python_client.models import MyDataModel
from tabby_python_client.api.my_tag import get_my_data_model
from tabby_python_client.types import Response
my_data: MyDataModel = get_my_data_model.sync(client=client)
# or if you need more info (e.g. status_code)
response: Response[MyDataModel] = get_my_data_model.sync_detailed(client=client)
```
Or do the same thing with an async version:
```python
from tabby_python_client.models import MyDataModel
from tabby_python_client.api.my_tag import get_my_data_model
from tabby_python_client.types import Response
my_data: MyDataModel = await get_my_data_model.asyncio(client=client)
response: Response[MyDataModel] = await get_my_data_model.asyncio_detailed(client=client)
```
By default, when you're calling an HTTPS API it will attempt to verify that SSL is working correctly. Using certificate verification is highly recommended most of the time, but sometimes you may need to authenticate to a server (especially an internal server) using a custom certificate bundle.
```python
client = AuthenticatedClient(
base_url="https://internal_api.example.com",
token="SuperSecretToken",
verify_ssl="/path/to/certificate_bundle.pem",
)
```
You can also disable certificate validation altogether, but beware that **this is a security risk**.
```python
client = AuthenticatedClient(
base_url="https://internal_api.example.com",
token="SuperSecretToken",
verify_ssl=False
)
```
There are more settings on the generated `Client` class which let you control more runtime behavior, check out the docstring on that class for more info.
Things to know:
1. Every path/method combo becomes a Python module with four functions:
1. `sync`: Blocking request that returns parsed data (if successful) or `None`
1. `sync_detailed`: Blocking request that always returns a `Request`, optionally with `parsed` set if the request was successful.
1. `asyncio`: Like `sync` but async instead of blocking
1. `asyncio_detailed`: Like `sync_detailed` but async instead of blocking
1. All path/query params, and bodies become method arguments.
1. If your endpoint had any tags on it, the first tag will be used as a module name for the function (my_tag above)
1. Any endpoint which did not have a tag will be in `tabby_python_client.api.default`
## Building / publishing this Client
This project uses [Poetry](https://python-poetry.org/) to manage dependencies and packaging. Here are the basics:
1. Update the metadata in pyproject.toml (e.g. authors, version)
1. If you're using a private repository, configure it with Poetry
1. `poetry config repositories.<your-repository-name> <url-to-your-repository>`
1. `poetry config http-basic.<your-repository-name> <username> <password>`
1. Publish the client with `poetry publish --build -r <your-repository-name>` or, if for public PyPI, just `poetry publish --build`
If you want to install this client into another project without publishing it (e.g. for development) then:
1. If that project **is using Poetry**, you can simply do `poetry add <path-to-this-client>` from that project
1. If that project is not using Poetry:
1. Build a wheel with `poetry build -f wheel`
1. Install that wheel from the other project `pip install <path-to-wheel>`

View File

@ -0,0 +1,16 @@
[tool.black]
line-length = 120
target_version = ['py38', 'py39', 'py310', 'py311']
exclude = '''
(
/(
| \.git
| \.venv
| \.mypy_cache
)/
)
'''
[tool.isort]
line_length = 120
profile = "black"

View File

@ -0,0 +1,18 @@
import pathlib
from setuptools import find_packages, setup
here = pathlib.Path(__file__).parent.resolve()
long_description = (here / "README.md").read_text(encoding="utf-8")
setup(
name="tabby-python-client",
version="0.4.0-dev",
description="A client library for accessing Tabby Server",
long_description=long_description,
long_description_content_type="text/markdown",
packages=find_packages(),
python_requires=">=3.8, <4",
install_requires=["httpx >= 0.15.0, < 0.25.0", "attrs >= 21.3.0", "python-dateutil >= 2.8.0, < 3"],
package_data={"tabby_python_client": ["py.typed"]},
)

View File

@ -0,0 +1,7 @@
""" A client library for accessing Tabby Server """
from .client import AuthenticatedClient, Client
__all__ = (
"AuthenticatedClient",
"Client",
)

View File

@ -0,0 +1 @@
""" Contains methods for accessing the API """

View File

@ -0,0 +1,166 @@
from http import HTTPStatus
from typing import Any, Dict, Optional, Union, cast
import httpx
from ... import errors
from ...client import Client
from ...models.completion_request import CompletionRequest
from ...models.completion_response import CompletionResponse
from ...types import Response
def _get_kwargs(
*,
client: Client,
json_body: CompletionRequest,
) -> Dict[str, Any]:
url = "{}/v1/completions".format(client.base_url)
headers: Dict[str, str] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
json_json_body = json_body.to_dict()
return {
"method": "post",
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
"follow_redirects": client.follow_redirects,
"json": json_json_body,
}
def _parse_response(*, client: Client, response: httpx.Response) -> Optional[Union[Any, CompletionResponse]]:
if response.status_code == HTTPStatus.OK:
response_200 = CompletionResponse.from_dict(response.json())
return response_200
if response.status_code == HTTPStatus.BAD_REQUEST:
response_400 = cast(Any, None)
return response_400
if client.raise_on_unexpected_status:
raise errors.UnexpectedStatus(response.status_code, response.content)
else:
return None
def _build_response(*, client: Client, response: httpx.Response) -> Response[Union[Any, CompletionResponse]]:
return Response(
status_code=HTTPStatus(response.status_code),
content=response.content,
headers=response.headers,
parsed=_parse_response(client=client, response=response),
)
def sync_detailed(
*,
client: Client,
json_body: CompletionRequest,
) -> Response[Union[Any, CompletionResponse]]:
r"""
Args:
json_body (CompletionRequest): Example: {'language': 'python', 'segments': {'prefix':
'def fib(n):\n ', 'suffix': '\n return fib(n - 1) + fib(n - 2)'}}.
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Union[Any, CompletionResponse]]
"""
kwargs = _get_kwargs(
client=client,
json_body=json_body,
)
response = httpx.request(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(client=client, response=response)
def sync(
*,
client: Client,
json_body: CompletionRequest,
) -> Optional[Union[Any, CompletionResponse]]:
r"""
Args:
json_body (CompletionRequest): Example: {'language': 'python', 'segments': {'prefix':
'def fib(n):\n ', 'suffix': '\n return fib(n - 1) + fib(n - 2)'}}.
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Union[Any, CompletionResponse]
"""
return sync_detailed(
client=client,
json_body=json_body,
).parsed
async def asyncio_detailed(
*,
client: Client,
json_body: CompletionRequest,
) -> Response[Union[Any, CompletionResponse]]:
r"""
Args:
json_body (CompletionRequest): Example: {'language': 'python', 'segments': {'prefix':
'def fib(n):\n ', 'suffix': '\n return fib(n - 1) + fib(n - 2)'}}.
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Union[Any, CompletionResponse]]
"""
kwargs = _get_kwargs(
client=client,
json_body=json_body,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.request(**kwargs)
return _build_response(client=client, response=response)
async def asyncio(
*,
client: Client,
json_body: CompletionRequest,
) -> Optional[Union[Any, CompletionResponse]]:
r"""
Args:
json_body (CompletionRequest): Example: {'language': 'python', 'segments': {'prefix':
'def fib(n):\n ', 'suffix': '\n return fib(n - 1) + fib(n - 2)'}}.
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Union[Any, CompletionResponse]
"""
return (
await asyncio_detailed(
client=client,
json_body=json_body,
)
).parsed

View File

@ -0,0 +1,110 @@
from http import HTTPStatus
from typing import Any, Dict, Optional
import httpx
from ... import errors
from ...client import Client
from ...models.log_event_request import LogEventRequest
from ...types import Response
def _get_kwargs(
*,
client: Client,
json_body: LogEventRequest,
) -> Dict[str, Any]:
url = "{}/v1/events".format(client.base_url)
headers: Dict[str, str] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
json_json_body = json_body.to_dict()
return {
"method": "post",
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
"follow_redirects": client.follow_redirects,
"json": json_json_body,
}
def _parse_response(*, client: Client, response: httpx.Response) -> Optional[Any]:
if response.status_code == HTTPStatus.OK:
return None
if response.status_code == HTTPStatus.BAD_REQUEST:
return None
if client.raise_on_unexpected_status:
raise errors.UnexpectedStatus(response.status_code, response.content)
else:
return None
def _build_response(*, client: Client, response: httpx.Response) -> Response[Any]:
return Response(
status_code=HTTPStatus(response.status_code),
content=response.content,
headers=response.headers,
parsed=_parse_response(client=client, response=response),
)
def sync_detailed(
*,
client: Client,
json_body: LogEventRequest,
) -> Response[Any]:
"""
Args:
json_body (LogEventRequest):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Any]
"""
kwargs = _get_kwargs(
client=client,
json_body=json_body,
)
response = httpx.request(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(client=client, response=response)
async def asyncio_detailed(
*,
client: Client,
json_body: LogEventRequest,
) -> Response[Any]:
"""
Args:
json_body (LogEventRequest):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Any]
"""
kwargs = _get_kwargs(
client=client,
json_body=json_body,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.request(**kwargs)
return _build_response(client=client, response=response)

View File

@ -0,0 +1,134 @@
from http import HTTPStatus
from typing import Any, Dict, Optional
import httpx
from ... import errors
from ...client import Client
from ...models.health_state import HealthState
from ...types import Response
def _get_kwargs(
*,
client: Client,
) -> Dict[str, Any]:
url = "{}/v1/health".format(client.base_url)
headers: Dict[str, str] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"method": "get",
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
"follow_redirects": client.follow_redirects,
}
def _parse_response(*, client: Client, response: httpx.Response) -> Optional[HealthState]:
if response.status_code == HTTPStatus.OK:
response_200 = HealthState.from_dict(response.json())
return response_200
if client.raise_on_unexpected_status:
raise errors.UnexpectedStatus(response.status_code, response.content)
else:
return None
def _build_response(*, client: Client, response: httpx.Response) -> Response[HealthState]:
return Response(
status_code=HTTPStatus(response.status_code),
content=response.content,
headers=response.headers,
parsed=_parse_response(client=client, response=response),
)
def sync_detailed(
*,
client: Client,
) -> Response[HealthState]:
"""
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[HealthState]
"""
kwargs = _get_kwargs(
client=client,
)
response = httpx.request(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(client=client, response=response)
def sync(
*,
client: Client,
) -> Optional[HealthState]:
"""
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
HealthState
"""
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*,
client: Client,
) -> Response[HealthState]:
"""
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[HealthState]
"""
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.request(**kwargs)
return _build_response(client=client, response=response)
async def asyncio(
*,
client: Client,
) -> Optional[HealthState]:
"""
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
HealthState
"""
return (
await asyncio_detailed(
client=client,
)
).parsed

View File

@ -0,0 +1,194 @@
from http import HTTPStatus
from typing import Any, Dict, Optional, Union, cast
import httpx
from ... import errors
from ...client import Client
from ...models.search_response import SearchResponse
from ...types import UNSET, Response, Unset
def _get_kwargs(
*,
client: Client,
q: str = "get",
limit: Union[Unset, None, int] = 20,
offset: Union[Unset, None, int] = 0,
) -> Dict[str, Any]:
url = "{}/v1beta/search".format(client.base_url)
headers: Dict[str, str] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
params: Dict[str, Any] = {}
params["q"] = q
params["limit"] = limit
params["offset"] = offset
params = {k: v for k, v in params.items() if v is not UNSET and v is not None}
return {
"method": "get",
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
"follow_redirects": client.follow_redirects,
"params": params,
}
def _parse_response(*, client: Client, response: httpx.Response) -> Optional[Union[Any, SearchResponse]]:
if response.status_code == HTTPStatus.OK:
response_200 = SearchResponse.from_dict(response.json())
return response_200
if response.status_code == HTTPStatus.NOT_IMPLEMENTED:
response_501 = cast(Any, None)
return response_501
if client.raise_on_unexpected_status:
raise errors.UnexpectedStatus(response.status_code, response.content)
else:
return None
def _build_response(*, client: Client, response: httpx.Response) -> Response[Union[Any, SearchResponse]]:
return Response(
status_code=HTTPStatus(response.status_code),
content=response.content,
headers=response.headers,
parsed=_parse_response(client=client, response=response),
)
def sync_detailed(
*,
client: Client,
q: str = "get",
limit: Union[Unset, None, int] = 20,
offset: Union[Unset, None, int] = 0,
) -> Response[Union[Any, SearchResponse]]:
"""
Args:
q (str): Default: 'get'.
limit (Union[Unset, None, int]): Default: 20.
offset (Union[Unset, None, int]):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Union[Any, SearchResponse]]
"""
kwargs = _get_kwargs(
client=client,
q=q,
limit=limit,
offset=offset,
)
response = httpx.request(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(client=client, response=response)
def sync(
*,
client: Client,
q: str = "get",
limit: Union[Unset, None, int] = 20,
offset: Union[Unset, None, int] = 0,
) -> Optional[Union[Any, SearchResponse]]:
"""
Args:
q (str): Default: 'get'.
limit (Union[Unset, None, int]): Default: 20.
offset (Union[Unset, None, int]):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Union[Any, SearchResponse]
"""
return sync_detailed(
client=client,
q=q,
limit=limit,
offset=offset,
).parsed
async def asyncio_detailed(
*,
client: Client,
q: str = "get",
limit: Union[Unset, None, int] = 20,
offset: Union[Unset, None, int] = 0,
) -> Response[Union[Any, SearchResponse]]:
"""
Args:
q (str): Default: 'get'.
limit (Union[Unset, None, int]): Default: 20.
offset (Union[Unset, None, int]):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Response[Union[Any, SearchResponse]]
"""
kwargs = _get_kwargs(
client=client,
q=q,
limit=limit,
offset=offset,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.request(**kwargs)
return _build_response(client=client, response=response)
async def asyncio(
*,
client: Client,
q: str = "get",
limit: Union[Unset, None, int] = 20,
offset: Union[Unset, None, int] = 0,
) -> Optional[Union[Any, SearchResponse]]:
"""
Args:
q (str): Default: 'get'.
limit (Union[Unset, None, int]): Default: 20.
offset (Union[Unset, None, int]):
Raises:
errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True.
httpx.TimeoutException: If the request takes longer than Client.timeout.
Returns:
Union[Any, SearchResponse]
"""
return (
await asyncio_detailed(
client=client,
q=q,
limit=limit,
offset=offset,
)
).parsed

View File

@ -0,0 +1,66 @@
import ssl
from typing import Dict, Union
import attr
@attr.s(auto_attribs=True)
class Client:
"""A class for keeping track of data related to the API
Attributes:
base_url: The base URL for the API, all requests are made to a relative path to this URL
cookies: A dictionary of cookies to be sent with every request
headers: A dictionary of headers to be sent with every request
timeout: The maximum amount of a time in seconds a request can take. API functions will raise
httpx.TimeoutException if this is exceeded.
verify_ssl: Whether or not to verify the SSL certificate of the API server. This should be True in production,
but can be set to False for testing purposes.
raise_on_unexpected_status: Whether or not to raise an errors.UnexpectedStatus if the API returns a
status code that was not documented in the source OpenAPI document.
follow_redirects: Whether or not to follow redirects. Default value is False.
"""
base_url: str
cookies: Dict[str, str] = attr.ib(factory=dict, kw_only=True)
headers: Dict[str, str] = attr.ib(factory=dict, kw_only=True)
timeout: float = attr.ib(5.0, kw_only=True)
verify_ssl: Union[str, bool, ssl.SSLContext] = attr.ib(True, kw_only=True)
raise_on_unexpected_status: bool = attr.ib(False, kw_only=True)
follow_redirects: bool = attr.ib(False, kw_only=True)
def get_headers(self) -> Dict[str, str]:
"""Get headers to be used in all endpoints"""
return {**self.headers}
def with_headers(self, headers: Dict[str, str]) -> "Client":
"""Get a new client matching this one with additional headers"""
return attr.evolve(self, headers={**self.headers, **headers})
def get_cookies(self) -> Dict[str, str]:
return {**self.cookies}
def with_cookies(self, cookies: Dict[str, str]) -> "Client":
"""Get a new client matching this one with additional cookies"""
return attr.evolve(self, cookies={**self.cookies, **cookies})
def get_timeout(self) -> float:
return self.timeout
def with_timeout(self, timeout: float) -> "Client":
"""Get a new client matching this one with a new timeout (in seconds)"""
return attr.evolve(self, timeout=timeout)
@attr.s(auto_attribs=True)
class AuthenticatedClient(Client):
"""A Client which has been authenticated for use on secured endpoints"""
token: str
prefix: str = "Bearer"
auth_header_name: str = "Authorization"
def get_headers(self) -> Dict[str, str]:
"""Get headers to be used in authenticated endpoints"""
auth_header_value = f"{self.prefix} {self.token}" if self.prefix else self.token
return {self.auth_header_name: auth_header_value, **self.headers}

View File

@ -0,0 +1,14 @@
""" Contains shared errors types that can be raised from API functions """
class UnexpectedStatus(Exception):
"""Raised by api functions when the response status an undocumented status and Client.raise_on_unexpected_status is True"""
def __init__(self, status_code: int, content: bytes):
self.status_code = status_code
self.content = content
super().__init__(f"Unexpected status code: {status_code}")
__all__ = ["UnexpectedStatus"]

View File

@ -0,0 +1,37 @@
""" Contains all the data models used in inputs/outputs """
from .chat_completion_chunk import ChatCompletionChunk
from .chat_completion_request import ChatCompletionRequest
from .choice import Choice
from .completion_request import CompletionRequest
from .completion_response import CompletionResponse
from .debug_data import DebugData
from .debug_options import DebugOptions
from .health_state import HealthState
from .hit import Hit
from .hit_document import HitDocument
from .log_event_request import LogEventRequest
from .message import Message
from .search_response import SearchResponse
from .segments import Segments
from .snippet import Snippet
from .version import Version
__all__ = (
"ChatCompletionChunk",
"ChatCompletionRequest",
"Choice",
"CompletionRequest",
"CompletionResponse",
"DebugData",
"DebugOptions",
"HealthState",
"Hit",
"HitDocument",
"LogEventRequest",
"Message",
"SearchResponse",
"Segments",
"Snippet",
"Version",
)

View File

@ -0,0 +1,57 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="ChatCompletionChunk")
@attr.s(auto_attribs=True)
class ChatCompletionChunk:
"""
Attributes:
content (str):
"""
content: str
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
content = self.content
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"content": content,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
content = d.pop("content")
chat_completion_chunk = cls(
content=content,
)
chat_completion_chunk.additional_properties = d
return chat_completion_chunk
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,76 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar
import attr
if TYPE_CHECKING:
from ..models.message import Message
T = TypeVar("T", bound="ChatCompletionRequest")
@attr.s(auto_attribs=True)
class ChatCompletionRequest:
"""
Example:
{'messages': [{'content': 'What is tail recursion?', 'role': 'user'}, {'content': "It's a kind of optimization
in compiler?", 'role': 'assistant'}, {'content': 'Could you share more details?', 'role': 'user'}]}
Attributes:
messages (List['Message']):
"""
messages: List["Message"]
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
messages = []
for messages_item_data in self.messages:
messages_item = messages_item_data.to_dict()
messages.append(messages_item)
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"messages": messages,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.message import Message
d = src_dict.copy()
messages = []
_messages = d.pop("messages")
for messages_item_data in _messages:
messages_item = Message.from_dict(messages_item_data)
messages.append(messages_item)
chat_completion_request = cls(
messages=messages,
)
chat_completion_request.additional_properties = d
return chat_completion_request
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,64 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="Choice")
@attr.s(auto_attribs=True)
class Choice:
"""
Attributes:
index (int):
text (str):
"""
index: int
text: str
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
index = self.index
text = self.text
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"index": index,
"text": text,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
index = d.pop("index")
text = d.pop("text")
choice = cls(
index=index,
text=text,
)
choice.additional_properties = d
return choice
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,115 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.debug_options import DebugOptions
from ..models.segments import Segments
T = TypeVar("T", bound="CompletionRequest")
@attr.s(auto_attribs=True)
class CompletionRequest:
r"""
Example:
{'language': 'python', 'segments': {'prefix': 'def fib(n):\n ', 'suffix': '\n return fib(n - 1) +
fib(n - 2)'}}
Attributes:
language (Union[Unset, None, str]): Language identifier, full list is maintained at
https://code.visualstudio.com/docs/languages/identifiers Example: python.
segments (Union[Unset, None, Segments]):
user (Union[Unset, None, str]): A unique identifier representing your end-user, which can help Tabby to monitor
& generating
reports.
debug_options (Union[Unset, None, DebugOptions]):
"""
language: Union[Unset, None, str] = UNSET
segments: Union[Unset, None, "Segments"] = UNSET
user: Union[Unset, None, str] = UNSET
debug_options: Union[Unset, None, "DebugOptions"] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
language = self.language
segments: Union[Unset, None, Dict[str, Any]] = UNSET
if not isinstance(self.segments, Unset):
segments = self.segments.to_dict() if self.segments else None
user = self.user
debug_options: Union[Unset, None, Dict[str, Any]] = UNSET
if not isinstance(self.debug_options, Unset):
debug_options = self.debug_options.to_dict() if self.debug_options else None
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if language is not UNSET:
field_dict["language"] = language
if segments is not UNSET:
field_dict["segments"] = segments
if user is not UNSET:
field_dict["user"] = user
if debug_options is not UNSET:
field_dict["debug_options"] = debug_options
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.debug_options import DebugOptions
from ..models.segments import Segments
d = src_dict.copy()
language = d.pop("language", UNSET)
_segments = d.pop("segments", UNSET)
segments: Union[Unset, None, Segments]
if _segments is None:
segments = None
elif isinstance(_segments, Unset):
segments = UNSET
else:
segments = Segments.from_dict(_segments)
user = d.pop("user", UNSET)
_debug_options = d.pop("debug_options", UNSET)
debug_options: Union[Unset, None, DebugOptions]
if _debug_options is None:
debug_options = None
elif isinstance(_debug_options, Unset):
debug_options = UNSET
else:
debug_options = DebugOptions.from_dict(_debug_options)
completion_request = cls(
language=language,
segments=segments,
user=user,
debug_options=debug_options,
)
completion_request.additional_properties = d
return completion_request
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,104 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.choice import Choice
from ..models.debug_data import DebugData
T = TypeVar("T", bound="CompletionResponse")
@attr.s(auto_attribs=True)
class CompletionResponse:
"""
Example:
{'choices': [{'index': 0, 'text': 'string'}], 'id': 'string'}
Attributes:
id (str):
choices (List['Choice']):
debug_data (Union[Unset, None, DebugData]):
"""
id: str
choices: List["Choice"]
debug_data: Union[Unset, None, "DebugData"] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
id = self.id
choices = []
for choices_item_data in self.choices:
choices_item = choices_item_data.to_dict()
choices.append(choices_item)
debug_data: Union[Unset, None, Dict[str, Any]] = UNSET
if not isinstance(self.debug_data, Unset):
debug_data = self.debug_data.to_dict() if self.debug_data else None
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"id": id,
"choices": choices,
}
)
if debug_data is not UNSET:
field_dict["debug_data"] = debug_data
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.choice import Choice
from ..models.debug_data import DebugData
d = src_dict.copy()
id = d.pop("id")
choices = []
_choices = d.pop("choices")
for choices_item_data in _choices:
choices_item = Choice.from_dict(choices_item_data)
choices.append(choices_item)
_debug_data = d.pop("debug_data", UNSET)
debug_data: Union[Unset, None, DebugData]
if _debug_data is None:
debug_data = None
elif isinstance(_debug_data, Unset):
debug_data = UNSET
else:
debug_data = DebugData.from_dict(_debug_data)
completion_response = cls(
id=id,
choices=choices,
debug_data=debug_data,
)
completion_response.additional_properties = d
return completion_response
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,86 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.snippet import Snippet
T = TypeVar("T", bound="DebugData")
@attr.s(auto_attribs=True)
class DebugData:
"""
Attributes:
snippets (Union[Unset, None, List['Snippet']]):
prompt (Union[Unset, None, str]):
"""
snippets: Union[Unset, None, List["Snippet"]] = UNSET
prompt: Union[Unset, None, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
snippets: Union[Unset, None, List[Dict[str, Any]]] = UNSET
if not isinstance(self.snippets, Unset):
if self.snippets is None:
snippets = None
else:
snippets = []
for snippets_item_data in self.snippets:
snippets_item = snippets_item_data.to_dict()
snippets.append(snippets_item)
prompt = self.prompt
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if snippets is not UNSET:
field_dict["snippets"] = snippets
if prompt is not UNSET:
field_dict["prompt"] = prompt
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.snippet import Snippet
d = src_dict.copy()
snippets = []
_snippets = d.pop("snippets", UNSET)
for snippets_item_data in _snippets or []:
snippets_item = Snippet.from_dict(snippets_item_data)
snippets.append(snippets_item)
prompt = d.pop("prompt", UNSET)
debug_data = cls(
snippets=snippets,
prompt=prompt,
)
debug_data.additional_properties = d
return debug_data
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,85 @@
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="DebugOptions")
@attr.s(auto_attribs=True)
class DebugOptions:
"""
Attributes:
raw_prompt (Union[Unset, None, str]): When `raw_prompt` is specified, it will be passed directly to the
inference engine for completion. `segments` field in `CompletionRequest` will be ignored.
This is useful for certain requests that aim to test the tabby's e2e quality.
return_snippets (Union[Unset, bool]): When true, returns `snippets` in `debug_data`.
return_prompt (Union[Unset, bool]): When true, returns `prompt` in `debug_data`.
disable_retrieval_augmented_code_completion (Union[Unset, bool]): When true, disable retrieval augmented code
completion.
"""
raw_prompt: Union[Unset, None, str] = UNSET
return_snippets: Union[Unset, bool] = UNSET
return_prompt: Union[Unset, bool] = UNSET
disable_retrieval_augmented_code_completion: Union[Unset, bool] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
raw_prompt = self.raw_prompt
return_snippets = self.return_snippets
return_prompt = self.return_prompt
disable_retrieval_augmented_code_completion = self.disable_retrieval_augmented_code_completion
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update({})
if raw_prompt is not UNSET:
field_dict["raw_prompt"] = raw_prompt
if return_snippets is not UNSET:
field_dict["return_snippets"] = return_snippets
if return_prompt is not UNSET:
field_dict["return_prompt"] = return_prompt
if disable_retrieval_augmented_code_completion is not UNSET:
field_dict["disable_retrieval_augmented_code_completion"] = disable_retrieval_augmented_code_completion
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
raw_prompt = d.pop("raw_prompt", UNSET)
return_snippets = d.pop("return_snippets", UNSET)
return_prompt = d.pop("return_prompt", UNSET)
disable_retrieval_augmented_code_completion = d.pop("disable_retrieval_augmented_code_completion", UNSET)
debug_options = cls(
raw_prompt=raw_prompt,
return_snippets=return_snippets,
return_prompt=return_prompt,
disable_retrieval_augmented_code_completion=disable_retrieval_augmented_code_completion,
)
debug_options.additional_properties = d
return debug_options
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,117 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar, Union, cast
import attr
from ..types import UNSET, Unset
if TYPE_CHECKING:
from ..models.version import Version
T = TypeVar("T", bound="HealthState")
@attr.s(auto_attribs=True)
class HealthState:
"""
Attributes:
model (str):
device (str):
arch (str):
cpu_info (str):
cpu_count (int):
cuda_devices (List[str]):
version (Version):
chat_model (Union[Unset, None, str]):
"""
model: str
device: str
arch: str
cpu_info: str
cpu_count: int
cuda_devices: List[str]
version: "Version"
chat_model: Union[Unset, None, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
model = self.model
device = self.device
arch = self.arch
cpu_info = self.cpu_info
cpu_count = self.cpu_count
cuda_devices = self.cuda_devices
version = self.version.to_dict()
chat_model = self.chat_model
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"model": model,
"device": device,
"arch": arch,
"cpu_info": cpu_info,
"cpu_count": cpu_count,
"cuda_devices": cuda_devices,
"version": version,
}
)
if chat_model is not UNSET:
field_dict["chat_model"] = chat_model
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.version import Version
d = src_dict.copy()
model = d.pop("model")
device = d.pop("device")
arch = d.pop("arch")
cpu_info = d.pop("cpu_info")
cpu_count = d.pop("cpu_count")
cuda_devices = cast(List[str], d.pop("cuda_devices"))
version = Version.from_dict(d.pop("version"))
chat_model = d.pop("chat_model", UNSET)
health_state = cls(
model=model,
device=device,
arch=arch,
cpu_info=cpu_info,
cpu_count=cpu_count,
cuda_devices=cuda_devices,
version=version,
chat_model=chat_model,
)
health_state.additional_properties = d
return health_state
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,78 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar
import attr
if TYPE_CHECKING:
from ..models.hit_document import HitDocument
T = TypeVar("T", bound="Hit")
@attr.s(auto_attribs=True)
class Hit:
"""
Attributes:
score (float):
doc (HitDocument):
id (int):
"""
score: float
doc: "HitDocument"
id: int
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
score = self.score
doc = self.doc.to_dict()
id = self.id
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"score": score,
"doc": doc,
"id": id,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.hit_document import HitDocument
d = src_dict.copy()
score = d.pop("score")
doc = HitDocument.from_dict(d.pop("doc"))
id = d.pop("id")
hit = cls(
score=score,
doc=doc,
id=id,
)
hit.additional_properties = d
return hit
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,92 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="HitDocument")
@attr.s(auto_attribs=True)
class HitDocument:
"""
Attributes:
body (str):
filepath (str):
git_url (str):
kind (str):
language (str):
name (str):
"""
body: str
filepath: str
git_url: str
kind: str
language: str
name: str
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
body = self.body
filepath = self.filepath
git_url = self.git_url
kind = self.kind
language = self.language
name = self.name
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"body": body,
"filepath": filepath,
"git_url": git_url,
"kind": kind,
"language": language,
"name": name,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
body = d.pop("body")
filepath = d.pop("filepath")
git_url = d.pop("git_url")
kind = d.pop("kind")
language = d.pop("language")
name = d.pop("name")
hit_document = cls(
body=body,
filepath=filepath,
git_url=git_url,
kind=kind,
language=language,
name=name,
)
hit_document.additional_properties = d
return hit_document
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,71 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="LogEventRequest")
@attr.s(auto_attribs=True)
class LogEventRequest:
"""
Attributes:
type (str): Event type, should be `view` or `select`. Example: view.
completion_id (str):
choice_index (int):
"""
type: str
completion_id: str
choice_index: int
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
type = self.type
completion_id = self.completion_id
choice_index = self.choice_index
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"type": type,
"completion_id": completion_id,
"choice_index": choice_index,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
type = d.pop("type")
completion_id = d.pop("completion_id")
choice_index = d.pop("choice_index")
log_event_request = cls(
type=type,
completion_id=completion_id,
choice_index=choice_index,
)
log_event_request.additional_properties = d
return log_event_request
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,64 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="Message")
@attr.s(auto_attribs=True)
class Message:
"""
Attributes:
role (str):
content (str):
"""
role: str
content: str
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
role = self.role
content = self.content
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"role": role,
"content": content,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
role = d.pop("role")
content = d.pop("content")
message = cls(
role=role,
content=content,
)
message.additional_properties = d
return message
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,79 @@
from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar
import attr
if TYPE_CHECKING:
from ..models.hit import Hit
T = TypeVar("T", bound="SearchResponse")
@attr.s(auto_attribs=True)
class SearchResponse:
"""
Attributes:
num_hits (int):
hits (List['Hit']):
"""
num_hits: int
hits: List["Hit"]
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
num_hits = self.num_hits
hits = []
for hits_item_data in self.hits:
hits_item = hits_item_data.to_dict()
hits.append(hits_item)
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"num_hits": num_hits,
"hits": hits,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
from ..models.hit import Hit
d = src_dict.copy()
num_hits = d.pop("num_hits")
hits = []
_hits = d.pop("hits")
for hits_item_data in _hits:
hits_item = Hit.from_dict(hits_item_data)
hits.append(hits_item)
search_response = cls(
num_hits=num_hits,
hits=hits,
)
search_response.additional_properties = d
return search_response
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,67 @@
from typing import Any, Dict, List, Type, TypeVar, Union
import attr
from ..types import UNSET, Unset
T = TypeVar("T", bound="Segments")
@attr.s(auto_attribs=True)
class Segments:
"""
Attributes:
prefix (str): Content that appears before the cursor in the editor window.
suffix (Union[Unset, None, str]): Content that appears after the cursor in the editor window.
"""
prefix: str
suffix: Union[Unset, None, str] = UNSET
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
prefix = self.prefix
suffix = self.suffix
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"prefix": prefix,
}
)
if suffix is not UNSET:
field_dict["suffix"] = suffix
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
prefix = d.pop("prefix")
suffix = d.pop("suffix", UNSET)
segments = cls(
prefix=prefix,
suffix=suffix,
)
segments.additional_properties = d
return segments
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,71 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="Snippet")
@attr.s(auto_attribs=True)
class Snippet:
"""
Attributes:
filepath (str):
body (str):
score (float):
"""
filepath: str
body: str
score: float
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
filepath = self.filepath
body = self.body
score = self.score
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"filepath": filepath,
"body": body,
"score": score,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
filepath = d.pop("filepath")
body = d.pop("body")
score = d.pop("score")
snippet = cls(
filepath=filepath,
body=body,
score=score,
)
snippet.additional_properties = d
return snippet
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1,78 @@
from typing import Any, Dict, List, Type, TypeVar
import attr
T = TypeVar("T", bound="Version")
@attr.s(auto_attribs=True)
class Version:
"""
Attributes:
build_date (str):
build_timestamp (str):
git_sha (str):
git_describe (str):
"""
build_date: str
build_timestamp: str
git_sha: str
git_describe: str
additional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)
def to_dict(self) -> Dict[str, Any]:
build_date = self.build_date
build_timestamp = self.build_timestamp
git_sha = self.git_sha
git_describe = self.git_describe
field_dict: Dict[str, Any] = {}
field_dict.update(self.additional_properties)
field_dict.update(
{
"build_date": build_date,
"build_timestamp": build_timestamp,
"git_sha": git_sha,
"git_describe": git_describe,
}
)
return field_dict
@classmethod
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
d = src_dict.copy()
build_date = d.pop("build_date")
build_timestamp = d.pop("build_timestamp")
git_sha = d.pop("git_sha")
git_describe = d.pop("git_describe")
version = cls(
build_date=build_date,
build_timestamp=build_timestamp,
git_sha=git_sha,
git_describe=git_describe,
)
version.additional_properties = d
return version
@property
def additional_keys(self) -> List[str]:
return list(self.additional_properties.keys())
def __getitem__(self, key: str) -> Any:
return self.additional_properties[key]
def __setitem__(self, key: str, value: Any) -> None:
self.additional_properties[key] = value
def __delitem__(self, key: str) -> None:
del self.additional_properties[key]
def __contains__(self, key: str) -> bool:
return key in self.additional_properties

View File

@ -0,0 +1 @@
# Marker file for PEP 561

View File

@ -0,0 +1,44 @@
""" Contains some shared types for properties """
from http import HTTPStatus
from typing import BinaryIO, Generic, Literal, MutableMapping, Optional, Tuple, TypeVar
import attr
class Unset:
def __bool__(self) -> Literal[False]:
return False
UNSET: Unset = Unset()
FileJsonType = Tuple[Optional[str], BinaryIO, Optional[str]]
@attr.s(auto_attribs=True)
class File:
"""Contains information for file uploads"""
payload: BinaryIO
file_name: Optional[str] = None
mime_type: Optional[str] = None
def to_tuple(self) -> FileJsonType:
"""Return a tuple representation that httpx will accept for multipart/form-data"""
return self.file_name, self.payload, self.mime_type
T = TypeVar("T")
@attr.s(auto_attribs=True)
class Response(Generic[T]):
"""A response from an endpoint"""
status_code: HTTPStatus
content: bytes
headers: MutableMapping[str, str]
parsed: Optional[T]
__all__ = ["File", "Response", "FileJsonType"]