From 87fa2a4d41ad935af5c7180d590b793c1aa3c5a2 Mon Sep 17 00:00:00 2001 From: Damien Broka Date: Wed, 14 Dec 2022 08:59:41 +0000 Subject: [PATCH 1/4] feat: per-project parallelism --- gateway/src/lib.rs | 2 +- gateway/src/service.rs | 12 +++++++-- gateway/src/task.rs | 47 +++++++++++++++++++++++++++++++++--- gateway/src/worker.rs | 55 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 109 insertions(+), 7 deletions(-) diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index a747bc673..ad1ac8357 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -119,7 +119,7 @@ impl std::fmt::Display for Error { impl StdError for Error {} -#[derive(Debug, sqlx::Type, Serialize, Clone, PartialEq, Eq)] +#[derive(Debug, sqlx::Type, Serialize, Clone, PartialEq, Eq, Hash)] #[sqlx(transparent)] pub struct ProjectName(String); diff --git a/gateway/src/service.rs b/gateway/src/service.rs index ed32cd884..fb0e62191 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -28,7 +28,8 @@ use crate::acme::CustomDomain; use crate::args::ContextArgs; use crate::auth::{Key, Permissions, ScopedUser, User}; use crate::project::Project; -use crate::task::TaskBuilder; +use crate::task::{TaskBuilder, BoxedTask}; +use crate::worker::TaskRouter; use crate::{AccountName, DockerContext, Error, ErrorKind, ProjectDetails, ProjectName}; pub static MIGRATIONS: Migrator = sqlx::migrate!("./migrations"); @@ -187,6 +188,7 @@ impl GatewayContextProvider { pub struct GatewayService { provider: GatewayContextProvider, db: SqlitePool, + task_router: TaskRouter } impl GatewayService { @@ -201,7 +203,9 @@ impl GatewayService { let provider = GatewayContextProvider::new(docker, container_settings); - Self { provider, db } + let task_router = TaskRouter::new(); + + Self { provider, db, task_router } } pub async fn route( @@ -547,6 +551,10 @@ impl GatewayService { pub fn new_task(self: &Arc) -> TaskBuilder { TaskBuilder::new(self.clone()) } + + pub fn task_router(&self) -> TaskRouter { + self.task_router.clone() + } } #[derive(Clone)] diff --git a/gateway/src/task.rs b/gateway/src/task.rs index 5a23c64ac..6c33feb77 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use crate::project::*; use crate::service::{GatewayContext, GatewayService}; +use crate::worker::TaskRouter; use crate::{AccountName, EndState, Error, ErrorKind, ProjectName, Refresh, State}; // Default maximum _total_ time a task is allowed to run @@ -187,15 +188,21 @@ impl TaskBuilder { let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT); - Box::new(WithTimeout::on( + let project_name = self.project_name.expect("project_name is required"); + + let task_router = self.service.task_router(); + + let task: BoxedTask = Box::new(WithTimeout::on( timeout, ProjectTask { uuid: Uuid::new_v4(), - project_name: self.project_name.expect("project_name is required"), + project_name: project_name.clone(), service: self.service, tasks: self.tasks, }, - )) + )); + + Box::new(Route::to(project_name, task, task_router)) } pub async fn send(self, sender: &Sender) -> Result { @@ -207,6 +214,40 @@ impl TaskBuilder { } } +pub struct Route { + project_name: ProjectName, + inner: Option, + router: TaskRouter, +} + +impl Route { + pub fn to(project_name: ProjectName, what: T, router: TaskRouter) -> Self { + Self { + project_name, + inner: Some(what), + router + } + } +} + +#[async_trait] +impl Task<()> for Route { + type Output = (); + + type Error = Error; + + async fn poll(&mut self, _ctx: ()) -> TaskResult { + if let Some(task) = self.inner.take() { + match self.router.route(&self.project_name, task).await { + Ok(_) => TaskResult::Done(()), + Err(_) => TaskResult::Err(Error::from_kind(ErrorKind::Internal)) + } + } else { + TaskResult::Done(()) + } + } +} + pub struct RunFn { f: F, _output: PhantomData, diff --git a/gateway/src/worker.rs b/gateway/src/worker.rs index 25a3914ba..37c026248 100644 --- a/gateway/src/worker.rs +++ b/gateway/src/worker.rs @@ -1,8 +1,13 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::RwLock; use tracing::{debug, info}; use crate::task::{BoxedTask, TaskResult}; -use crate::Error; +use crate::{Error, ProjectName}; pub const WORKER_QUEUE_SIZE: usize = 2048; @@ -71,3 +76,51 @@ impl Worker { Ok(self) } } + +pub struct TaskRouter { + table: Arc>>>, +} + +impl Clone for TaskRouter { + fn clone(&self) -> Self { + Self { + table: self.table.clone(), + } + } +} + +impl TaskRouter { + pub fn new() -> Self { + Self { + table: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl TaskRouter { + pub async fn route( + &self, + name: &ProjectName, + task: BoxedTask, + ) -> Result<(), SendError> { + if let Some(sender) = self.table.read().await.get(name) { + sender.send(task).await + } else { + let mut table = self.table.write().await; + if let Some(sender) = table.get(name) { + sender.send(task).await + } else { + let worker = Worker::new(); + let sender = worker.sender(); + + tokio::spawn(worker.start()); + + let res = sender.send(task).await; + + table.insert(name.clone(), sender); + + res + } + } + } +} From b7f469e9ab3995ed82c0b1d1956b2790663bae19 Mon Sep 17 00:00:00 2001 From: Damien Broka Date: Wed, 14 Dec 2022 09:37:13 +0000 Subject: [PATCH 2/4] fix --- gateway/src/task.rs | 16 +++++++--------- gateway/src/worker.rs | 20 ++++++++------------ 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/gateway/src/task.rs b/gateway/src/task.rs index 6c33feb77..3289a94d7 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -188,25 +188,22 @@ impl TaskBuilder { let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT); - let project_name = self.project_name.expect("project_name is required"); - - let task_router = self.service.task_router(); - - let task: BoxedTask = Box::new(WithTimeout::on( + Box::new(WithTimeout::on( timeout, ProjectTask { uuid: Uuid::new_v4(), - project_name: project_name.clone(), + project_name: self.project_name.expect("project_name is required"), service: self.service, tasks: self.tasks, }, - )); - - Box::new(Route::to(project_name, task, task_router)) + )) } pub async fn send(self, sender: &Sender) -> Result { + let project_name = self.project_name.clone().expect("project_name is required"); + let task_router = self.service.task_router(); let (task, handle) = AndThenNotify::after(self.build()); + let task = Route::::to(project_name, Box::new(task), task_router); match timeout(TASK_SEND_TIMEOUT, sender.send(Box::new(task))).await { Ok(Ok(_)) => Ok(handle), _ => Err(Error::from_kind(ErrorKind::ServiceUnavailable)), @@ -238,6 +235,7 @@ impl Task<()> for Route { async fn poll(&mut self, _ctx: ()) -> TaskResult { if let Some(task) = self.inner.take() { + println!("task picked up"); match self.router.route(&self.project_name, task).await { Ok(_) => TaskResult::Done(()), Err(_) => TaskResult::Err(Error::from_kind(ErrorKind::Internal)) diff --git a/gateway/src/worker.rs b/gateway/src/worker.rs index 37c026248..01634b9b0 100644 --- a/gateway/src/worker.rs +++ b/gateway/src/worker.rs @@ -103,24 +103,20 @@ impl TaskRouter { name: &ProjectName, task: BoxedTask, ) -> Result<(), SendError> { - if let Some(sender) = self.table.read().await.get(name) { + let mut table = self.table.write().await; + if let Some(sender) = table.get(name) { sender.send(task).await } else { - let mut table = self.table.write().await; - if let Some(sender) = table.get(name) { - sender.send(task).await - } else { - let worker = Worker::new(); - let sender = worker.sender(); + let worker = Worker::new(); + let sender = worker.sender(); - tokio::spawn(worker.start()); + tokio::spawn(worker.start()); - let res = sender.send(task).await; + let res = sender.send(task).await; - table.insert(name.clone(), sender); + table.insert(name.clone(), sender); - res - } + res } } } From 5d02e0216f1cee82dc8a6e174b61350237e30ac5 Mon Sep 17 00:00:00 2001 From: Damien Broka Date: Wed, 14 Dec 2022 09:37:33 +0000 Subject: [PATCH 3/4] fmt --- gateway/src/service.rs | 10 +++++++--- gateway/src/task.rs | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/gateway/src/service.rs b/gateway/src/service.rs index fb0e62191..afd03dd67 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -28,7 +28,7 @@ use crate::acme::CustomDomain; use crate::args::ContextArgs; use crate::auth::{Key, Permissions, ScopedUser, User}; use crate::project::Project; -use crate::task::{TaskBuilder, BoxedTask}; +use crate::task::{BoxedTask, TaskBuilder}; use crate::worker::TaskRouter; use crate::{AccountName, DockerContext, Error, ErrorKind, ProjectDetails, ProjectName}; @@ -188,7 +188,7 @@ impl GatewayContextProvider { pub struct GatewayService { provider: GatewayContextProvider, db: SqlitePool, - task_router: TaskRouter + task_router: TaskRouter, } impl GatewayService { @@ -205,7 +205,11 @@ impl GatewayService { let task_router = TaskRouter::new(); - Self { provider, db, task_router } + Self { + provider, + db, + task_router, + } } pub async fn route( diff --git a/gateway/src/task.rs b/gateway/src/task.rs index 3289a94d7..655299d70 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -222,7 +222,7 @@ impl Route { Self { project_name, inner: Some(what), - router + router, } } } @@ -238,7 +238,7 @@ impl Task<()> for Route { println!("task picked up"); match self.router.route(&self.project_name, task).await { Ok(_) => TaskResult::Done(()), - Err(_) => TaskResult::Err(Error::from_kind(ErrorKind::Internal)) + Err(_) => TaskResult::Err(Error::from_kind(ErrorKind::Internal)), } } else { TaskResult::Done(()) From c6aadb81b5837f64322769c00db109eaeb9b7663 Mon Sep 17 00:00:00 2001 From: Damien Broka Date: Wed, 14 Dec 2022 09:47:27 +0000 Subject: [PATCH 4/4] clippy --- gateway/src/task.rs | 1 - gateway/src/worker.rs | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/gateway/src/task.rs b/gateway/src/task.rs index 655299d70..dc506306a 100644 --- a/gateway/src/task.rs +++ b/gateway/src/task.rs @@ -235,7 +235,6 @@ impl Task<()> for Route { async fn poll(&mut self, _ctx: ()) -> TaskResult { if let Some(task) = self.inner.take() { - println!("task picked up"); match self.router.route(&self.project_name, task).await { Ok(_) => TaskResult::Done(()), Err(_) => TaskResult::Err(Error::from_kind(ErrorKind::Internal)), diff --git a/gateway/src/worker.rs b/gateway/src/worker.rs index 01634b9b0..b81bb1ad0 100644 --- a/gateway/src/worker.rs +++ b/gateway/src/worker.rs @@ -89,6 +89,12 @@ impl Clone for TaskRouter { } } +impl Default for TaskRouter { + fn default() -> Self { + Self::new() + } +} + impl TaskRouter { pub fn new() -> Self { Self {