Skip to content

Commit d4289d2

Browse files
committed
Auto merge of #1588 - sgrif:sg-background-jobs, r=sgrif
Move index updates off the web server This fundamentally changes our workflow for publishing, yanking, and unyanking crates. Rather than synchronously updating the index when the request comes in (and potentially retrying multiple times since we have multiple web servers that can create a race condition), we instead queue the update to be run on another machine at some point in the future. This will improve the resiliency of index updates -- specifically letting us avoid the case where the index has been updated, but something happened to the web server before the database transaction committed. This setup assumes that all jobs *must* complete within a short timeframe, or something is seriously wrong. The only background jobs we have right now are index updates, which are extremely low volume. If a job fails, it most likely means that GitHub is down, or a bug has made it to production which is preventing publishing and/or yanking. For these reasons, this PR includes a monitor binary which will page whoever is on call with extremely low thresholds (defaults to paging if a job has been in the queue for 15 minutes, configurable by env var). The runner is meant to be run on a dedicated worker, while the monitor should be run by some cron-like tool on a regular interval (Heroku scheduler for us) One side effect of this change is that `cargo publish` returning with a 0 exit status does not mean that the crate can immediately be used. This has always technically been true, since S3 and GitHub both can have delays before they update as well, but it's going to consistently be a bit longer with this PR. It should only be a few seconds the majority of the time, and no more than a minute in the worst case. One enhancement I'd like to make, which is not included in this PR, is a UI to show the status of a publish. I did not include it here, as this PR is already huge, and I do not think that feature is strictly required to land this. In the common case, it will take longer to navigate to that UI than it will take for the job to complete. This enhancement will also go nicely with work on staging publishes if we want to add those (see #1503). There are also some low hanging fruit we can tackle to lower the job's running time if we feel it's necessary. As for the queue itself, I've chosen to implement one here based on PostgreSQL's row locking. There are a few reasons for this vs something like RabbitMQ or Faktory. The first is operational. We still have a very small team, and very limited ops bandwidth. If we can avoid introducing another piece to our stack, that is a win both in terms of the amount of work our existing team has to do, and making it easy to grow the team (by lowering the number of technologies one person has to learn). The second reason is that using an existing queue wouldn't actually reduce the amount of code required by that much. The majority of the code here is related to actually running jobs, not interacting with PostgreSQL or serialization. The only Rust libraries that exist for this are low level bindings to other queues, but the majority of the "job" infrastructure would still be needed. The queue code is intended to eventually be extracted to a library. This portion of the code is the `background` module, and is why a lot of the code in that module is written a bit more generically than crates.io specifically needs. It's still a bit too coupled to crates.io to be extracted right now, though -- and I'd like to have it in the wild for a bit before extracting it. The `background_jobs` module is our code for interacting with this "library".
2 parents 3283448 + 6777978 commit d4289d2

28 files changed

+985
-213
lines changed

Cargo.lock

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ tempdir = "0.3.7"
6363
parking_lot = "0.7.1"
6464
jemallocator = { version = "0.1.8", features = ['unprefixed_malloc_on_supported_platforms', 'profiling'] }
6565
jemalloc-ctl = "0.2.0"
66+
threadpool = "1.7"
6667

6768
lettre = {git = "https://github.com/lettre/lettre", version = "0.9"}
6869
lettre_email = {git = "https://github.com/lettre/lettre", version = "0.9"}

Procfile

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
web: bin/diesel migration run && bin/start-nginx ./target/release/server
22
worker: ./target/release/update-downloads daemon 300
3+
background_worker: ./target/release/background-worker
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE background_jobs;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CREATE TABLE background_jobs (
2+
id BIGSERIAL PRIMARY KEY,
3+
job_type TEXT NOT NULL,
4+
data JSONB NOT NULL,
5+
retries INTEGER NOT NULL DEFAULT 0,
6+
last_retry TIMESTAMP NOT NULL DEFAULT '1970-01-01',
7+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
8+
);

script/ci/prune-cache.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ du -hs target/debug
77

88
crate_name="cargo-registry"
99
test_name="all"
10-
bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads"
10+
bin_names="delete-crate delete-version populate render-readmes server test-pagerduty transfer-crates update-downloads background-worker monitor"
1111

1212
normalized_crate_name=${crate_name//-/_}
1313
rm -v target/debug/$normalized_crate_name-*

src/app.rs

+2-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
//! Application-wide components in a struct accessible from each request
22
33
use crate::{db, util::CargoResult, Config, Env};
4-
use std::{
5-
env,
6-
path::PathBuf,
7-
sync::{Arc, Mutex},
8-
time::Duration,
9-
};
4+
use std::{env, path::PathBuf, sync::Arc, time::Duration};
105

116
use diesel::r2d2;
127
use scheduled_thread_pool::ScheduledThreadPool;
@@ -25,10 +20,8 @@ pub struct App {
2520
/// A unique key used with conduit_cookie to generate cookies
2621
pub session_key: String,
2722

28-
/// The crate index git repository
29-
pub git_repo: Mutex<git2::Repository>,
30-
3123
/// The location on disk of the checkout of the crate index git repository
24+
/// Only used in the development environment.
3225
pub git_repo_checkout: PathBuf,
3326

3427
/// The server configuration
@@ -86,13 +79,10 @@ impl App {
8679
.connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout)))
8780
.thread_pool(thread_pool);
8881

89-
let repo = git2::Repository::open(&config.git_repo_checkout).unwrap();
90-
9182
App {
9283
diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config),
9384
github,
9485
session_key: config.session_key.clone(),
95-
git_repo: Mutex::new(repo),
9686
git_repo_checkout: config.git_repo_checkout.clone(),
9787
config: config.clone(),
9888
}

src/background/job.rs

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use diesel::PgConnection;
2+
use serde::{de::DeserializeOwned, Serialize};
3+
4+
use super::storage;
5+
use crate::util::CargoResult;
6+
7+
/// A background job, meant to be run asynchronously.
8+
pub trait Job: Serialize + DeserializeOwned {
9+
/// The environment this job is run with. This is a struct you define,
10+
/// which should encapsulate things like database connection pools, any
11+
/// configuration, and any other static data or shared resources.
12+
type Environment;
13+
14+
/// The key to use for storing this job, and looking it up later.
15+
///
16+
/// Typically this is the name of your struct in `snake_case`
17+
const JOB_TYPE: &'static str;
18+
19+
/// Enqueue this job to be run at some point in the future.
20+
fn enqueue(self, conn: &PgConnection) -> CargoResult<()> {
21+
storage::enqueue_job(conn, self)
22+
}
23+
24+
/// The logic involved in actually performing this job.
25+
fn perform(self, env: &Self::Environment) -> CargoResult<()>;
26+
}

src/background/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mod job;
2+
mod registry;
3+
mod runner;
4+
mod storage;
5+
6+
pub use self::job::*;
7+
pub use self::registry::Registry;
8+
pub use self::runner::*;

src/background/registry.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#![allow(clippy::new_without_default)] // https://github.com/rust-lang/rust-clippy/issues/3632
2+
3+
use serde_json;
4+
use std::collections::HashMap;
5+
use std::panic::RefUnwindSafe;
6+
7+
use super::Job;
8+
use crate::util::CargoResult;
9+
10+
#[doc(hidden)]
11+
pub type PerformFn<Env> =
12+
Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + RefUnwindSafe + Send + Sync>;
13+
14+
#[derive(Default)]
15+
#[allow(missing_debug_implementations)] // Can't derive debug
16+
/// A registry of background jobs, used to map job types to concrege perform
17+
/// functions at runtime.
18+
pub struct Registry<Env> {
19+
job_types: HashMap<&'static str, PerformFn<Env>>,
20+
}
21+
22+
impl<Env> Registry<Env> {
23+
/// Create a new, empty registry
24+
pub fn new() -> Self {
25+
Registry {
26+
job_types: Default::default(),
27+
}
28+
}
29+
30+
/// Get the perform function for a given job type
31+
pub fn get(&self, job_type: &str) -> Option<&PerformFn<Env>> {
32+
self.job_types.get(job_type)
33+
}
34+
35+
/// Register a new background job. This will override any existing
36+
/// registries with the same `JOB_TYPE`, if one exists.
37+
pub fn register<T: Job<Environment = Env>>(&mut self) {
38+
self.job_types.insert(
39+
T::JOB_TYPE,
40+
Box::new(|data, env| {
41+
let data = serde_json::from_value(data)?;
42+
T::perform(data, env)
43+
}),
44+
);
45+
}
46+
}

0 commit comments

Comments
 (0)