Skip to content

Commit 96c26ee

Browse files
committed
fix: gateway state drifts, health checks and project recreation
1 parent 6c848bf commit 96c26ee

File tree

9 files changed

+930
-548
lines changed

9 files changed

+930
-548
lines changed

Cargo.lock

Lines changed: 9 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gateway/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ tower-http = { version = "0.3.4", features = ["trace"] }
3232
tracing = "0.1.35"
3333
tracing-opentelemetry = "0.18.0"
3434
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
35+
uuid = { version = "1.2.1", features = [ "v4" ] }
3536

3637
[dependencies.shuttle-common]
3738
version = "0.7.2"

gateway/src/api/latest.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tower_http::trace::TraceLayer;
1515
use tracing::{debug, debug_span, field, Span};
1616

1717
use crate::auth::{Admin, ScopedUser, User};
18-
use crate::worker::Work;
18+
use crate::task::{self, BoxedTask};
1919
use crate::{AccountName, Error, GatewayService, ProjectName};
2020

2121
#[derive(Serialize, Deserialize)]
@@ -79,39 +79,52 @@ async fn get_project(
7979

8080
async fn post_project(
8181
Extension(service): Extension<Arc<GatewayService>>,
82-
Extension(sender): Extension<Sender<Work>>,
82+
Extension(sender): Extension<Sender<BoxedTask>>,
8383
User { name, .. }: User,
8484
Path(project): Path<ProjectName>,
8585
) -> Result<AxumJson<project::Response>, Error> {
86-
let work = service.create_project(project.clone(), name).await?;
86+
let state = service
87+
.create_project(project.clone(), name.clone())
88+
.await?;
8789

88-
let name = work.project_name.to_string();
89-
let state = work.work.clone().into();
90+
service
91+
.new_task()
92+
.project(project.clone())
93+
.account(name.clone())
94+
.send(&sender)
95+
.await?;
9096

91-
sender.send(work).await?;
92-
93-
let response = project::Response { name, state };
97+
let response = project::Response {
98+
name: project.to_string(),
99+
state: state.into(),
100+
};
94101

95102
Ok(AxumJson(response))
96103
}
97104

98105
async fn delete_project(
99106
Extension(service): Extension<Arc<GatewayService>>,
100-
Extension(sender): Extension<Sender<Work>>,
107+
Extension(sender): Extension<Sender<BoxedTask>>,
101108
ScopedUser {
102109
scope: _,
103110
user: User { name, .. },
104111
}: ScopedUser,
105112
Path(project): Path<ProjectName>,
106113
) -> Result<AxumJson<project::Response>, Error> {
107-
let work = service.destroy_project(project, name).await?;
108-
109-
let name = work.project_name.to_string();
110-
let state = work.work.clone().into();
114+
let project_name = project.clone();
111115

112-
sender.send(work).await?;
116+
service
117+
.new_task()
118+
.project(project)
119+
.account(name)
120+
.and_then(task::destroy())
121+
.send(&sender)
122+
.await?;
113123

114-
let response = project::Response { name, state };
124+
let response = project::Response {
125+
name: project_name.to_string(),
126+
state: shuttle_common::models::project::State::Destroying,
127+
};
115128
Ok(AxumJson(response))
116129
}
117130

@@ -123,7 +136,7 @@ async fn route_project(
123136
service.route(&scope, req).await
124137
}
125138

126-
async fn get_status(Extension(sender): Extension<Sender<Work>>) -> Response<Body> {
139+
async fn get_status(Extension(sender): Extension<Sender<BoxedTask>>) -> Response<Body> {
127140
let (status, body) = if !sender.is_closed() && sender.capacity() > 0 {
128141
(StatusCode::OK, StatusResponse::healthy())
129142
} else {
@@ -140,8 +153,9 @@ async fn get_status(Extension(sender): Extension<Sender<Work>>) -> Response<Body
140153
.unwrap()
141154
}
142155

143-
pub fn make_api(service: Arc<GatewayService>, sender: Sender<Work>) -> Router<Body> {
156+
pub fn make_api(service: Arc<GatewayService>, sender: Sender<BoxedTask>) -> Router<Body> {
144157
debug!("making api route");
158+
145159
Router::<Body>::new()
146160
.route(
147161
"/",
@@ -185,14 +199,13 @@ pub mod tests {
185199
use super::*;
186200
use crate::service::GatewayService;
187201
use crate::tests::{RequestBuilderExt, World};
188-
use crate::worker::Work;
189202

190203
#[tokio::test]
191204
async fn api_create_get_delete_projects() -> anyhow::Result<()> {
192205
let world = World::new().await;
193206
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);
194207

195-
let (sender, mut receiver) = channel::<Work>(256);
208+
let (sender, mut receiver) = channel::<BoxedTask>(256);
196209
tokio::spawn(async move {
197210
while receiver.recv().await.is_some() {
198211
// do not do any work with inbound requests
@@ -327,7 +340,7 @@ pub mod tests {
327340
let world = World::new().await;
328341
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);
329342

330-
let (sender, mut receiver) = channel::<Work>(256);
343+
let (sender, mut receiver) = channel::<BoxedTask>(256);
331344
tokio::spawn(async move {
332345
while receiver.recv().await.is_some() {
333346
// do not do any work with inbound requests
@@ -416,7 +429,7 @@ pub mod tests {
416429
let world = World::new().await;
417430
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);
418431

419-
let (sender, mut receiver) = channel::<Work>(1);
432+
let (sender, mut receiver) = channel::<BoxedTask>(1);
420433
let (ctl_send, ctl_recv) = oneshot::channel();
421434
let (done_send, done_recv) = oneshot::channel();
422435
let worker = tokio::spawn(async move {
@@ -468,6 +481,7 @@ pub mod tests {
468481
assert_eq!(resp.status(), StatusCode::OK);
469482

470483
worker.abort();
484+
let _ = worker.await;
471485

472486
let resp = router.call(get_status()).await.unwrap();
473487
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);

0 commit comments

Comments
 (0)