Skip to content

Commit cbb6472

Browse files
committed
Move our index updates to be run in background jobs
This fundamentally changes the workflow for all operations we perform involving git, so that they are not performed on the web server and do not block the response. This will improve the response times of `cargo publish`, and make the publish process more resilient, reducing the liklihood of an inconsistency occurring such as the index getting updated, but not our database. Previously, our workflow looked something like this: - When the server boots, do a full clone of the index into a known location - Some request comes in that needs to update the index - Database transaction is opened - Local checkout is modified, we attempt to commit & push (note: This involves a mutex to avoid contention with another request to update the index on the same server) - If push fails, we fetch, `reset --hard`, and try again up to 20 times - Database transaction is committed - We send a successful response The reason for the retry logic is that we have more than one web server, meaning no server can be sure that its local checkout is actually up to date. There's also a major opportunity for an inconsistent state to be reached here. If the power goes out, the server is restarted, something crashes, etc, in between the index being updated and the database transaction being committed, we will never retry it. The new workflow looks like this: - Some request comes in that needs to update the index - A job is queued in the database to update the index at some point in the future. - We send a successful response - A separate process pulls the job out of the database - A full clone of the index is performed into a temporary directory - The new checkout is modified, committed, and pushed - If push succeeds, job is removed from database - If push fails, job is marked as failed and will be retried at some point in the future While a background worker can be spread across multiple machines and/or threads, we will be able to avoid the race conditions that were previously possible by ensuring that we only have one worker with one thread that handles index updates. Right now that's easy since index updates are the only background job we have, but as we add more we will need to add support for multiple queues to account for this. I've opted to do a fresh checkout in every job, rather than relying on some state that was setup when the machine booted. This is mostly for simplicity's sake. It also means that if we need to scale to multiple threads/processes for other jobs, we can punt the multi-queue enhancement for a while if we wish. However, it does mean the job will take a bit longer to run. If this turns out to be a problem, it's easy to address. This should eliminate the opportunity for the index to enter an inconsistent state from our database -- or at least they should become eventually consistent. If the power goes out before the job is committed as done, it is assumed the job failed and it will be retried. The job itself is idempotent, so even if the power goes out after the index is updated, the retry should succeed. One other side effect of this change is that when `cargo publish` returns with an exit status of 0, that does not mean that your crate/new version is immediately available for use -- if you try to point to it in Cargo.toml seconds after publishing, you may get an error that it could not find that version. This was technically already true, since neither S3 nor GitHub guarantee that uploads/pushes are immediately visible. However, this does increase the timescale beyond the delay we would have seen there. In most cases it should be under 10 seconds, and at most a minute. One enhancement that will come as a followup, but is not included in this PR is a UI to see the status of your upload. This is definitely nice to have, but is not something I think is necessary for this feature to land. The time it would take to navigate to that UI is going to be longer than the time it takes the background job to run in most cases. That enhancement is something I think can go hand in hand with rust-lang#1503 (which incidentally becomes much easier to implement with this PR, since a "staging" publish just skips queuing the background job, and the only thing the button to full publish needs to do is queue the job). This setup does assume that all background jobs *must* eventually succeed. If any job fails, the index is in an inconsistent state with our database, and we are having an outage of some kind. Due to the nature of our background jobs, this likely means that GitHub is down, or there is a bug in our code. Either way, we page whoever is on-call, since it means publishing is broken. Since publishing crates is such an infrequent event, I've set the thresholds to be extremely low.
1 parent e33bf7e commit cbb6472

File tree

15 files changed

+312
-224
lines changed

15 files changed

+312
-224
lines changed

src/app.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{db, util::CargoResult, Config, Env};
44
use std::{
55
env,
66
path::PathBuf,
7-
sync::{Arc, Mutex},
7+
sync::Arc,
88
time::Duration,
99
};
1010

@@ -25,10 +25,8 @@ pub struct App {
2525
/// A unique key used with conduit_cookie to generate cookies
2626
pub session_key: String,
2727

28-
/// The crate index git repository
29-
pub git_repo: Mutex<git2::Repository>,
30-
3128
/// The location on disk of the checkout of the crate index git repository
29+
/// Only used in the development environment.
3230
pub git_repo_checkout: PathBuf,
3331

3432
/// The server configuration
@@ -86,13 +84,10 @@ impl App {
8684
.connection_customizer(Box::new(db::SetStatementTimeout(db_connection_timeout)))
8785
.thread_pool(thread_pool);
8886

89-
let repo = git2::Repository::open(&config.git_repo_checkout).unwrap();
90-
9187
App {
9288
diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config),
9389
github,
9490
session_key: config.session_key.clone(),
95-
git_repo: Mutex::new(repo),
9691
git_repo_checkout: config.git_repo_checkout.clone(),
9792
config: config.clone(),
9893
}

src/background/job.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use diesel::PgConnection;
22
use serde::{Serialize, de::DeserializeOwned};
33

44
use super::storage;
5-
use util::CargoResult;
5+
use crate::util::CargoResult;
66

77
/// A background job, meant to be run asynchronously.
88
pub trait Job: Serialize + DeserializeOwned {

src/background/registry.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use serde_json;
22
use std::collections::HashMap;
3+
use std::panic::RefUnwindSafe;
34

45
use super::Job;
5-
use util::CargoResult;
6+
use crate::util::CargoResult;
67

78
#[doc(hidden)]
8-
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + Send + Sync>;
9+
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + RefUnwindSafe + Send + Sync>;
910

1011
#[derive(Default)]
1112
#[allow(missing_debug_implementations)] // Can't derive debug

src/background/runner.rs

+61-15
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#![allow(dead_code)]
22
use diesel::prelude::*;
3-
use std::panic::{catch_unwind, UnwindSafe};
3+
use std::any::Any;
4+
use std::panic::{catch_unwind, UnwindSafe, RefUnwindSafe, PanicInfo};
5+
use std::sync::Arc;
46
use threadpool::ThreadPool;
57

6-
use db::{DieselPool, DieselPooledConn};
8+
use crate::db::{DieselPool, DieselPooledConn};
79
use super::{storage, Registry, Job};
8-
use util::errors::*;
10+
use crate::util::errors::*;
911

1012
#[allow(missing_debug_implementations)]
1113
pub struct Builder<Env> {
@@ -30,8 +32,8 @@ impl<Env> Builder<Env> {
3032
Runner {
3133
connection_pool: self.connection_pool,
3234
thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)),
33-
environment: self.environment,
34-
registry: self.registry,
35+
environment: Arc::new(self.environment),
36+
registry: Arc::new(self.registry),
3537
}
3638
}
3739
}
@@ -40,11 +42,11 @@ impl<Env> Builder<Env> {
4042
pub struct Runner<Env> {
4143
connection_pool: DieselPool,
4244
thread_pool: ThreadPool,
43-
environment: Env,
44-
registry: Registry<Env>,
45+
environment: Arc<Env>,
46+
registry: Arc<Registry<Env>>,
4547
}
4648

47-
impl<Env> Runner<Env> {
49+
impl<Env: RefUnwindSafe + Send + Sync + 'static> Runner<Env> {
4850
pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder<Env> {
4951
Builder {
5052
connection_pool,
@@ -54,6 +56,24 @@ impl<Env> Runner<Env> {
5456
}
5557
}
5658

59+
pub fn run_all_pending_jobs(&self) -> CargoResult<()> {
60+
let available_job_count = storage::available_job_count(&*self.connection()?)?;
61+
for _ in 0..available_job_count {
62+
self.run_single_job()
63+
}
64+
Ok(())
65+
}
66+
67+
fn run_single_job(&self) {
68+
let environment = Arc::clone(&self.environment);
69+
let registry = Arc::clone(&self.registry);
70+
self.get_single_job(move |job| {
71+
let perform_fn = registry.get(&job.job_type)
72+
.ok_or_else(|| internal(&format_args!("Unknown job type {}", job.job_type)))?;
73+
perform_fn(job.data, &environment)
74+
})
75+
}
76+
5777
fn get_single_job<F>(&self, f: F)
5878
where
5979
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static,
@@ -69,13 +89,15 @@ impl<Env> Runner<Env> {
6989
let job_id = job.id;
7090

7191
let result = catch_unwind(|| f(job))
72-
.map_err(|_| internal("job panicked"))
92+
.map_err(try_to_extract_panic_info)
7393
.and_then(|r| r);
7494

75-
if result.is_ok() {
76-
storage::delete_successful_job(&conn, job_id)?;
77-
} else {
78-
storage::update_failed_job(&conn, job_id);
95+
match result {
96+
Ok(_) => storage::delete_successful_job(&conn, job_id)?,
97+
Err(e) => {
98+
eprintln!("Job {} failed to run: {}", job_id, e);
99+
storage::update_failed_job(&conn, job_id);
100+
}
79101
}
80102
Ok(())
81103
}).expect("Could not retrieve or update job")
@@ -86,19 +108,43 @@ impl<Env> Runner<Env> {
86108
self.connection_pool.get().map_err(Into::into)
87109
}
88110

89-
#[cfg(test)]
111+
pub fn assert_no_failed_jobs(&self) -> CargoResult<()> {
112+
self.wait_for_jobs();
113+
let failed_jobs = storage::failed_job_count(&*self.connection()?)?;
114+
assert_eq!(0, failed_jobs);
115+
Ok(())
116+
}
117+
90118
fn wait_for_jobs(&self) {
91119
self.thread_pool.join();
92120
assert_eq!(0, self.thread_pool.panic_count());
93121
}
94122
}
95123

124+
/// Try to figure out what's in the box, and print it if we can.
125+
///
126+
/// The actual error type we will get from `panic::catch_unwind` is really poorly documented.
127+
/// However, the `panic::set_hook` functions deal with a `PanicInfo` type, and its payload is
128+
/// documented as "commonly but not always `&'static str` or `String`". So we can try all of those,
129+
/// and give up if we didn't get one of those three types.
130+
fn try_to_extract_panic_info(info: Box<dyn Any + Send + 'static>) -> Box<dyn CargoError> {
131+
if let Some(x) = info.downcast_ref::<PanicInfo>() {
132+
internal(&format_args!("job panicked: {}", x))
133+
} else if let Some(x) = info.downcast_ref::<&'static str>() {
134+
internal(&format_args!("job panicked: {}", x))
135+
} else if let Some(x) = info.downcast_ref::<String>() {
136+
internal(&format_args!("job panicked: {}", x))
137+
} else {
138+
internal("job panicked")
139+
}
140+
}
141+
96142
#[cfg(test)]
97143
mod tests {
98144
use diesel::prelude::*;
99145
use diesel::r2d2;
100146

101-
use schema::background_jobs::dsl::*;
147+
use crate::schema::background_jobs::dsl::*;
102148
use std::sync::{Mutex, MutexGuard, Barrier, Arc};
103149
use std::panic::AssertUnwindSafe;
104150
use super::*;

src/background/storage.rs

+38-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use diesel::dsl::now;
2+
use diesel::pg::Pg;
23
use diesel::prelude::*;
34
use diesel::{delete, insert_into, update};
4-
use diesel::sql_types::Integer;
5+
use diesel::sql_types::{Bool, Integer, Interval};
56
use serde_json;
67

7-
use schema::background_jobs;
8+
use crate::schema::background_jobs;
89
use super::Job;
9-
use util::CargoResult;
10+
use crate::util::CargoResult;
1011

1112
#[derive(Queryable, Identifiable, Debug, Clone)]
1213
pub struct BackgroundJob {
@@ -17,7 +18,7 @@ pub struct BackgroundJob {
1718

1819
/// Enqueues a job to be run as soon as possible.
1920
pub fn enqueue_job<T: Job>(conn: &PgConnection, job: T) -> CargoResult<()> {
20-
use schema::background_jobs::dsl::*;
21+
use crate::schema::background_jobs::dsl::*;
2122

2223
let job_data = serde_json::to_value(job)?;
2324
insert_into(background_jobs)
@@ -29,27 +30,52 @@ pub fn enqueue_job<T: Job>(conn: &PgConnection, job: T) -> CargoResult<()> {
2930
Ok(())
3031
}
3132

32-
/// Finds the next job that is unlocked, and ready to be retried. If a row is
33-
/// found, it will be locked.
34-
pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult<BackgroundJob> {
35-
use schema::background_jobs::dsl::*;
33+
fn retriable() -> Box<dyn BoxableExpression<background_jobs::table, Pg, SqlType = Bool>> {
34+
use crate::schema::background_jobs::dsl::*;
3635
use diesel::dsl::*;
37-
use diesel::sql_types::Interval;
3836

3937
sql_function!(power, power_t, (x: Integer, y: Integer) -> Integer);
4038

39+
Box::new(last_retry.lt(now - 1.minute().into_sql::<Interval>() * power(2, retries)))
40+
}
41+
42+
/// Finds the next job that is unlocked, and ready to be retried. If a row is
43+
/// found, it will be locked.
44+
pub fn find_next_unlocked_job(conn: &PgConnection) -> QueryResult<BackgroundJob> {
45+
use crate::schema::background_jobs::dsl::*;
46+
4147
background_jobs
4248
.select((id, job_type, data))
43-
.filter(last_retry.lt(now - 1.minute().into_sql::<Interval>() * power(2, retries)))
49+
.filter(retriable())
4450
.order(id)
4551
.for_update()
4652
.skip_locked()
4753
.first::<BackgroundJob>(conn)
4854
}
4955

56+
/// The number of jobs available to be run
57+
pub fn failed_job_count(conn: &PgConnection) -> QueryResult<i64> {
58+
use crate::schema::background_jobs::dsl::*;
59+
60+
background_jobs
61+
.count()
62+
.filter(retries.gt(0))
63+
.get_result(conn)
64+
}
65+
66+
/// The number of jobs that have failed at least once
67+
pub fn available_job_count(conn: &PgConnection) -> QueryResult<i64> {
68+
use crate::schema::background_jobs::dsl::*;
69+
70+
background_jobs
71+
.count()
72+
.filter(retriable())
73+
.get_result(conn)
74+
}
75+
5076
/// Deletes a job that has successfully completed running
5177
pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()> {
52-
use schema::background_jobs::dsl::*;
78+
use crate::schema::background_jobs::dsl::*;
5379

5480
delete(background_jobs.find(job_id)).execute(conn)?;
5581
Ok(())
@@ -60,7 +86,7 @@ pub fn delete_successful_job(conn: &PgConnection, job_id: i64) -> QueryResult<()
6086
/// Ignores any database errors that may have occurred. If the DB has gone away,
6187
/// we assume that just trying again with a new connection will succeed.
6288
pub fn update_failed_job(conn: &PgConnection, job_id: i64) {
63-
use schema::background_jobs::dsl::*;
89+
use crate::schema::background_jobs::dsl::*;
6490

6591
let _ = update(background_jobs.find(job_id))
6692
.set((

src/background_jobs.rs

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use url::Url;
2+
3+
use crate::background::{Runner, Builder};
4+
use crate::git::{AddCrate, Yank};
5+
6+
pub fn job_runner(config: Builder<Environment>) -> Runner<Environment> {
7+
config
8+
.register::<AddCrate>()
9+
.register::<Yank>()
10+
.build()
11+
}
12+
13+
#[allow(missing_debug_implementations)]
14+
pub struct Environment {
15+
pub index_location: Url,
16+
pub credentials: Option<(String, String)>,
17+
}
18+
19+
impl Environment {
20+
pub fn credentials(&self) -> Option<(&str, &str)> {
21+
self.credentials.as_ref().map(|(u, p)| (u.as_str(), p.as_str()))
22+
}
23+
}

src/bin/server.rs

+4-31
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#![deny(warnings)]
22

3-
use cargo_registry::{boot, build_handler, env, git, App, Config, Env};
3+
use cargo_registry::{boot, App, Env};
44
use std::{
55
env,
6-
fs::{self, File},
6+
fs::File,
77
sync::{mpsc::channel, Arc},
88
};
99

@@ -12,37 +12,10 @@ use civet::Server;
1212
fn main() {
1313
// Initialize logging
1414
env_logger::init();
15-
let config = Config::default();
16-
17-
// If there isn't a git checkout containing the crate index repo at the path specified
18-
// by `GIT_REPO_CHECKOUT`, delete that directory and clone the repo specified by `GIT_REPO_URL`
19-
// into that directory instead. Uses the credentials specified in `GIT_HTTP_USER` and
20-
// `GIT_HTTP_PWD` via the `cargo_registry::git::credentials` function.
21-
let url = env("GIT_REPO_URL");
22-
let repo = match git2::Repository::open(&config.git_repo_checkout) {
23-
Ok(r) => r,
24-
Err(..) => {
25-
let _ = fs::remove_dir_all(&config.git_repo_checkout);
26-
fs::create_dir_all(&config.git_repo_checkout).unwrap();
27-
let mut cb = git2::RemoteCallbacks::new();
28-
cb.credentials(git::credentials);
29-
let mut opts = git2::FetchOptions::new();
30-
opts.remote_callbacks(cb);
31-
git2::build::RepoBuilder::new()
32-
.fetch_options(opts)
33-
.clone(&url, &config.git_repo_checkout)
34-
.unwrap()
35-
}
36-
};
37-
38-
// All commits to the index registry made through crates.io will be made by bors, the Rust
39-
// community's friendly GitHub bot.
40-
let mut cfg = repo.config().unwrap();
41-
cfg.set_str("user.name", "bors").unwrap();
42-
cfg.set_str("user.email", "[email protected]").unwrap();
4315

16+
let config = cargo_registry::Config::default();
4417
let app = App::new(&config);
45-
let app = build_handler(Arc::new(app));
18+
let app = cargo_registry::build_handler(Arc::new(app));
4619

4720
// On every server restart, ensure the categories available in the database match
4821
// the information in *src/categories.toml*.

src/config.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use crate::{env, uploaders::Uploader, Env, Replica};
22
use std::{env, path::PathBuf};
3+
use url::Url;
34

45
#[derive(Clone, Debug)]
56
pub struct Config {
67
pub uploader: Uploader,
78
pub session_key: String,
89
pub git_repo_checkout: PathBuf,
10+
pub index_location: Url,
911
pub gh_client_id: String,
1012
pub gh_client_secret: String,
1113
pub db_url: String,
@@ -124,6 +126,7 @@ impl Default for Config {
124126
uploader,
125127
session_key: env("SESSION_KEY"),
126128
git_repo_checkout: checkout,
129+
index_location: Url::parse(&env("GIT_REPO_URL")).unwrap(),
127130
gh_client_id: env("GH_CLIENT_ID"),
128131
gh_client_secret: env("GH_CLIENT_SECRET"),
129132
db_url: env("DATABASE_URL"),

src/controllers/krate/publish.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ pub fn publish(req: &mut dyn Request) -> CargoResult<Response> {
196196
yanked: Some(false),
197197
links,
198198
};
199-
git::add_crate(&**req.app(), &git_crate).chain_error(|| {
199+
git::add_crate(&conn, git_crate).chain_error(|| {
200200
internal(&format_args!(
201201
"could not add crate `{}` to the git repo",
202202
name

0 commit comments

Comments
 (0)