add events pipeline

add-dagster-data-pipeline
Meng Zhang 2023-10-13 21:43:00 -07:00
parent d80e675211
commit 1b52a83dcc
2 changed files with 53 additions and 6 deletions

View File

@ -9,23 +9,21 @@ from . import constants
DatasetDataFrame = create_dagster_pandas_dataframe_type(
name="DatasetDataFrame",
columns = [
columns=[
PandasColumn.string_column("git_url"),
PandasColumn.string_column("filepath"),
PandasColumn.string_column("content"),
PandasColumn.string_column("language"),
PandasColumn.integer_column("max_line_length"),
PandasColumn.float_column("avg_line_length"),
PandasColumn.float_column("alphanum_fraction"),
PandasColumn.exists("tags"),
]
],
)
@asset(dagster_type=DatasetDataFrame)
def dataset_files():
def dataset():
"""Get source code information from TABBY_ROOT"""
ds = []
@ -35,4 +33,51 @@ def dataset_files():
ds.append(json.loads(line))
df = pd.DataFrame(ds)
return Output(df, metadata={"num_files": len(df) })
metadata = {
"num_records": len(df),
"preview": MetadataValue.md(
df.head()[
[
"git_url",
"filepath",
"language",
"max_line_length",
"avg_line_length",
"alphanum_fraction",
]
].to_markdown()
),
}
return Output(df, metadata=metadata)
EventDataFrame = create_dagster_pandas_dataframe_type(
name="EventDataFrame",
columns=[
PandasColumn.integer_column("ts"),
PandasColumn.exists("event"),
],
)
@asset(dagster_type=EventDataFrame)
def events():
"""Get events information from TABBY_ROOT"""
ds = []
for path in glob.glob(constants.TABBY_EVENTS_FILEPATTERN):
with open(path, "r") as f:
for line in f.readlines():
ds.append(json.loads(line))
df = pd.DataFrame(ds)
metadata = {
"num_records": len(df),
"preview": MetadataValue.md(
df.head()[
[
"ts",
"event"
]
].to_markdown()
),
}
return Output(df, metadata=metadata)

View File

@ -3,3 +3,5 @@ import os
TABBY_ROOT = os.environ.get("TABBY_ROOT", os.path.expanduser("~/.tabby"))
TABBY_DATASET_FILEPATTERN = os.path.join(TABBY_ROOT, "dataset/*.jsonl")
TABBY_EVENTS_FILEPATTERN = os.path.join(TABBY_ROOT, "events/*.json")