feat: add `tabby scheduler` command (#194)

* feat: add `tabby scheduler` command

* update test cases

* fix fmt
docs-add-demo
Meng Zhang 2023-06-05 11:29:38 -07:00 committed by GitHub
parent 4e74f7e93b
commit e8b1c10738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 317 additions and 90 deletions

193
Cargo.lock generated
View File

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
@ -173,6 +182,21 @@ dependencies = [
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide 0.6.2",
"object",
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.13.1"
@ -427,6 +451,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cron"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab00a636277f7ea5d8dd92ac7a5099fc9a46e5327bba84d3640b41ae127eada9"
dependencies = [
"chrono",
"error-chain",
"nom 4.1.1",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
@ -690,6 +725,15 @@ dependencies = [
"libc",
]
[[package]]
name = "error-chain"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8"
dependencies = [
"backtrace",
]
[[package]]
name = "esaxx-rs"
version = "0.1.8"
@ -736,7 +780,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
dependencies = [
"crc32fast",
"miniz_oxide",
"miniz_oxide 0.7.1",
]
[[package]]
@ -861,6 +905,12 @@ dependencies = [
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "gimli"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0a93d233ebf96623465aad4046a8d3aa4da22d4f4beba5388838c8a434bbb4"
[[package]]
name = "glob"
version = "0.3.1"
@ -1162,6 +1212,17 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "job_scheduler"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51f368c9c76dde2282714ae32dc274b79c27527a0c06c816f6dda048904d0d7c"
dependencies = [
"chrono",
"cron",
"uuid 0.8.2",
]
[[package]]
name = "jobserver"
version = "0.1.26"
@ -1285,6 +1346,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa"
dependencies = [
"adler",
]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
@ -1345,6 +1415,15 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nom"
version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c349f68f25f596b9f44cf0e7c69752a5c633b0550c3ff849518bfba0233774a"
dependencies = [
"memchr",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -1355,6 +1434,16 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-traits"
version = "0.2.15"
@ -1386,6 +1475,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.30.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.17.1"
@ -1458,6 +1556,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1832,6 +1936,12 @@ dependencies = [
"walkdir",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.37.19"
@ -2007,6 +2117,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "shellexpand"
version = "2.1.2"
@ -2057,7 +2176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5851699c4033c63636f7ea4cf7b7c1f1bf06d0cc03cfb42e711de5a5c46cf326"
dependencies = [
"base64 0.13.1",
"nom",
"nom 7.1.3",
"serde",
"unicode-segmentation",
]
@ -2154,12 +2273,15 @@ dependencies = [
"strfmt",
"strum",
"tabby-common",
"tabby-scheduler",
"tokio",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"uuid",
"uuid 1.3.3",
]
[[package]]
@ -2177,8 +2299,10 @@ name = "tabby-scheduler"
version = "0.1.0"
dependencies = [
"filenamify",
"job_scheduler",
"tabby-common",
"temp_testdir",
"tracing",
]
[[package]]
@ -2240,6 +2364,16 @@ dependencies = [
"syn 2.0.18",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "time"
version = "0.1.45"
@ -2461,9 +2595,21 @@ dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.18",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@ -2471,6 +2617,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2618,6 +2790,15 @@ dependencies = [
"zip",
]
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom",
]
[[package]]
name = "uuid"
version = "1.3.3"
@ -2640,6 +2821,12 @@ dependencies = [
"syn 2.0.18",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -19,3 +19,5 @@ serde = { version = "1.0", features = ["derive"] }
serdeconv = "0.4.1"
tokio = "1.28"
tokio-util = "0.7"
tracing = "0.1"
tracing-subscriber = "0.3"

View File

@ -7,7 +7,9 @@ edition = "2021"
[dependencies]
filenamify = "0.1.0"
job_scheduler = "1.2.1"
tabby-common = { path = "../tabby-common" }
tracing = { workspace = true }
[dev-dependencies]
temp_testdir = "0.2"

View File

@ -1,70 +1,22 @@
use std::path::PathBuf;
use std::process::Command;
mod repository;
use tabby_common::{
config::{Config, Repository},
path::repositories_dir,
};
use job_scheduler::{Job, JobScheduler};
use std::time::Duration;
use tracing::info;
use filenamify::filenamify;
pub fn scheduler() {
let mut scheduler = JobScheduler::new();
trait ConfigExt {
fn sync_repositories(&self);
}
// Every 5 hours.
scheduler.add(Job::new("* * 1/5 * * *".parse().unwrap(), || {
info!("Syncing repositories...");
repository::sync_repositories();
}));
impl ConfigExt for Config {
fn sync_repositories(&self) {
for repository in self.repositories.iter() {
repository.sync()
}
info!("Scheduler activated...");
loop {
info!("Checking for jobs in queue...");
scheduler.tick();
std::thread::sleep(Duration::from_secs(10));
}
}
trait RepositoryExt {
fn dir(&self) -> PathBuf;
fn sync(&self);
}
impl RepositoryExt for Repository {
fn dir(&self) -> PathBuf {
repositories_dir().join(filenamify(&self.git_url))
}
fn sync(&self) {
let dir = self.dir();
let dir_string = dir.display().to_string();
let status = if dir.exists() {
Command::new("git")
.current_dir(&dir)
.arg("pull")
.status()
.expect("git could not be executed")
} else {
std::fs::create_dir_all(&dir)
.unwrap_or_else(|_| panic!("Failed to create dir {}", dir_string));
Command::new("git")
.current_dir(dir.parent().unwrap())
.arg("clone")
.arg("--depth")
.arg("1")
.arg(&self.git_url)
.arg(dir)
.status()
.expect("git could not be executed")
};
if let Some(code) = status.code() {
if code != 0 {
panic!(
"Failed to pull remote '{}'\nConsider remove dir '{}' and retry",
&self.git_url, &dir_string
);
}
}
}
}
pub fn job_sync_repositories() {
let config = Config::load();
config.sync_repositories();
}

View File

@ -0,0 +1,93 @@
use std::path::PathBuf;
use std::process::Command;
use tabby_common::{
config::{Config, Repository},
path::repositories_dir,
};
use filenamify::filenamify;
trait ConfigExt {
fn sync_repositories(&self);
}
impl ConfigExt for Config {
fn sync_repositories(&self) {
for repository in self.repositories.iter() {
repository.sync()
}
}
}
trait RepositoryExt {
fn dir(&self) -> PathBuf;
fn sync(&self);
}
impl RepositoryExt for Repository {
fn dir(&self) -> PathBuf {
repositories_dir().join(filenamify(&self.git_url))
}
fn sync(&self) {
let dir = self.dir();
let dir_string = dir.display().to_string();
let status = if dir.exists() {
Command::new("git")
.current_dir(&dir)
.arg("pull")
.status()
.expect("git could not be executed")
} else {
std::fs::create_dir_all(&dir)
.unwrap_or_else(|_| panic!("Failed to create dir {}", dir_string));
Command::new("git")
.current_dir(dir.parent().unwrap())
.arg("clone")
.arg("--depth")
.arg("1")
.arg(&self.git_url)
.arg(dir)
.status()
.expect("git could not be executed")
};
if let Some(code) = status.code() {
if code != 0 {
panic!(
"Failed to pull remote '{}'\nConsider remove dir '{}' and retry",
&self.git_url, &dir_string
);
}
}
}
}
pub fn sync_repositories() {
let config = Config::load();
config.sync_repositories();
}
#[cfg(test)]
mod tests {
use super::*;
use temp_testdir::*;
use tabby_common::config::{Config, Repository};
use tabby_common::path::set_tabby_root;
#[test]
fn it_works() {
set_tabby_root(TempDir::default().to_path_buf());
let config = Config {
repositories: vec![Repository {
git_url: "https://github.com/TabbyML/interview-questions".to_owned(),
}],
};
config.save();
sync_repositories();
}
}

View File

@ -1,19 +0,0 @@
use tabby_scheduler::*;
use temp_testdir::*;
use tabby_common::config::{Config, Repository};
use tabby_common::path::set_tabby_root;
#[test]
fn it_works() {
set_tabby_root(TempDir::default().to_path_buf());
let config = Config {
repositories: vec![Repository {
git_url: "https://github.com/TabbyML/interview-questions".to_owned(),
}],
};
config.save();
job_sync_repositories()
}

View File

@ -4,6 +4,9 @@ version = "0.1.0"
edition = "2021"
[dependencies]
ctranslate2-bindings = { path = "../ctranslate2-bindings" }
tabby-common = { path = "../tabby-common" }
tabby-scheduler = { path = "../tabby-scheduler" }
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }
tokio = { workspace = true, features = ["full"] }
@ -15,7 +18,6 @@ serdeconv = { workspace = true }
serde_json = "1.0"
env_logger = "0.10.0"
log = "0.4"
ctranslate2-bindings = { path = "../ctranslate2-bindings" }
tower-http = { version = "0.4.0", features = ["cors"] }
clap = { version = "4.3.0", features = ["derive"] }
regex = "1.8.3"
@ -26,9 +28,10 @@ strum = { version = "0.24", features = ["derive"] }
reqwest = { version = "0.11.18", features = ["stream", "json"] }
indicatif = "0.17.3"
futures-util = "0.3.28"
tabby-common = { path = "../tabby-common" }
anyhow = "1.0.71"
strfmt = "0.2.4"
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
[dependencies.uuid]
version = "1.3.3"

View File

@ -4,6 +4,7 @@ use std::cmp;
use std::fs;
use std::io::Write;
use std::path::Path;
use tracing::info;
use clap::Args;
use futures_util::StreamExt;
@ -25,7 +26,7 @@ pub struct DownloadArgs {
pub async fn main(args: &DownloadArgs) {
download_model(&args.model, args.prefer_local_file).await;
println!("model '{}' is ready", args.model);
info!("model '{}' is ready", args.model);
}
impl CacheInfo {

View File

@ -18,14 +18,19 @@ pub enum Commands {
/// Download the model
Download(download::DownloadArgs),
Scheduler,
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let cli = Cli::parse();
match &cli.command {
Commands::Serve(args) => serve::main(args).await,
Commands::Download(args) => download::main(args).await,
Commands::Scheduler => tabby_scheduler::scheduler(),
}
}

View File

@ -10,6 +10,7 @@ use std::{
sync::Arc,
};
use tower_http::cors::CorsLayer;
use tracing::info;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
@ -74,7 +75,7 @@ pub async fn main(args: &ServeArgs) {
.layer(CorsLayer::permissive());
let address = SocketAddr::from((Ipv4Addr::UNSPECIFIED, args.port));
println!("Listening at {}", address);
info!("Listening at {}", address);
Server::bind(&address)
.serve(app.into_make_service())
.await