Skip to content

Commit e33bf7e

Browse files
committed
Encapsulate some of our test behavior into a Runner
The `Runner` is basically the main interface into the job queue, and stores the environment, thread pool, and database connection pool. We technically don't need a thread pool since our jobs are so infrequent that we don't need more than one thread. But our tests are simplified by having it, and a general purpose lib will need it. The other oddity is that we will want to panic instead of returning a result if updating or fetching a job failed for some reason. This isn't something we expect to occur unless there's an issue in the DB, so this seems fine. A thread panicking won't kill the runner. We are also only panicking if changes to the `background_jobs` table fails, not if the job itself fails.
1 parent 0782b89 commit e33bf7e

File tree

4 files changed

+168
-94
lines changed

4 files changed

+168
-94
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ derive_deref = "1.0.0"
6363
reqwest = "0.9.1"
6464
tempdir = "0.3.7"
6565
parking_lot = "0.7.1"
66+
threadpool = "1.7"
6667

6768
lettre = {git = "https://github.com/lettre/lettre", version = "0.9"}
6869
lettre_email = {git = "https://github.com/lettre/lettre", version = "0.9"}

src/background/registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::Job;
55
use util::CargoResult;
66

77
#[doc(hidden)]
8-
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()>>;
8+
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()> + Send + Sync>;
99

1010
#[derive(Default)]
1111
#[allow(missing_debug_implementations)] // Can't derive debug

src/background/runner.rs

Lines changed: 156 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,161 +1,216 @@
11
#![allow(dead_code)]
22
use diesel::prelude::*;
33
use std::panic::{catch_unwind, UnwindSafe};
4+
use threadpool::ThreadPool;
45

5-
use super::storage;
6+
use db::{DieselPool, DieselPooledConn};
7+
use super::{storage, Registry, Job};
68
use util::errors::*;
79

8-
fn get_single_job<F>(conn: &PgConnection, f: F) -> CargoResult<()>
9-
where
10-
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + UnwindSafe,
11-
{
12-
conn.transaction::<_, Box<dyn CargoError>, _>(|| {
13-
let job = storage::find_next_unlocked_job(conn)?;
14-
let job_id = job.id;
15-
16-
let result = catch_unwind(|| f(job))
17-
.map_err(|_| internal("job panicked"))
18-
.and_then(|r| r);
19-
20-
if result.is_ok() {
21-
storage::delete_successful_job(conn, job_id)?;
22-
} else {
23-
storage::update_failed_job(conn, job_id);
10+
#[allow(missing_debug_implementations)]
11+
pub struct Builder<Env> {
12+
connection_pool: DieselPool,
13+
environment: Env,
14+
registry: Registry<Env>,
15+
thread_count: Option<usize>,
16+
}
17+
18+
impl<Env> Builder<Env> {
19+
pub fn register<T: Job<Environment = Env>>(mut self) -> Self {
20+
self.registry.register::<T>();
21+
self
22+
}
23+
24+
pub fn thread_count(mut self, thread_count: usize) -> Self {
25+
self.thread_count = Some(thread_count);
26+
self
27+
}
28+
29+
pub fn build(self) -> Runner<Env> {
30+
Runner {
31+
connection_pool: self.connection_pool,
32+
thread_pool: ThreadPool::new(self.thread_count.unwrap_or(5)),
33+
environment: self.environment,
34+
registry: self.registry,
2435
}
25-
Ok(())
26-
})
36+
}
37+
}
38+
39+
#[allow(missing_debug_implementations)]
40+
pub struct Runner<Env> {
41+
connection_pool: DieselPool,
42+
thread_pool: ThreadPool,
43+
environment: Env,
44+
registry: Registry<Env>,
45+
}
46+
47+
impl<Env> Runner<Env> {
48+
pub fn builder(connection_pool: DieselPool, environment: Env) -> Builder<Env> {
49+
Builder {
50+
connection_pool,
51+
environment,
52+
registry: Registry::new(),
53+
thread_count: None,
54+
}
55+
}
56+
57+
fn get_single_job<F>(&self, f: F)
58+
where
59+
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + Send + UnwindSafe + 'static,
60+
{
61+
let conn = self.connection().expect("Could not acquire connection");
62+
self.thread_pool.execute(move || {
63+
conn.transaction::<_, Box<dyn CargoError>, _>(|| {
64+
let job = storage::find_next_unlocked_job(&conn).optional()?;
65+
let job = match job {
66+
Some(j) => j,
67+
None => return Ok(()),
68+
};
69+
let job_id = job.id;
70+
71+
let result = catch_unwind(|| f(job))
72+
.map_err(|_| internal("job panicked"))
73+
.and_then(|r| r);
74+
75+
if result.is_ok() {
76+
storage::delete_successful_job(&conn, job_id)?;
77+
} else {
78+
storage::update_failed_job(&conn, job_id);
79+
}
80+
Ok(())
81+
}).expect("Could not retrieve or update job")
82+
})
83+
}
84+
85+
fn connection(&self) -> CargoResult<DieselPooledConn> {
86+
self.connection_pool.get().map_err(Into::into)
87+
}
88+
89+
#[cfg(test)]
90+
fn wait_for_jobs(&self) {
91+
self.thread_pool.join();
92+
assert_eq!(0, self.thread_pool.panic_count());
93+
}
2794
}
2895

2996
#[cfg(test)]
3097
mod tests {
3198
use diesel::prelude::*;
99+
use diesel::r2d2;
32100

33101
use schema::background_jobs::dsl::*;
34102
use std::sync::{Mutex, MutexGuard, Barrier, Arc};
35103
use std::panic::AssertUnwindSafe;
36-
use std::thread;
37104
use super::*;
38105

39106
#[test]
40107
fn jobs_are_locked_when_fetched() {
41108
let _guard = TestGuard::lock();
42109

43-
let conn = connection();
44-
let first_job_id = create_dummy_job(&conn).id;
45-
let second_job_id = create_dummy_job(&conn).id;
110+
let runner = runner();
111+
let first_job_id = create_dummy_job(&runner).id;
112+
let second_job_id = create_dummy_job(&runner).id;
46113
let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
47114
let fetch_barrier2 = fetch_barrier.clone();
48115
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
49116
let return_barrier2 = return_barrier.clone();
50117

51-
let t1 = thread::spawn(move || {
52-
let _ = get_single_job(&connection(), |job| {
53-
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
54-
assert_eq!(first_job_id, job.id);
55-
return_barrier.0.wait(); // Wait for thread 2 to lock its job
56-
Ok(())
57-
});
118+
runner.get_single_job(move |job| {
119+
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
120+
assert_eq!(first_job_id, job.id);
121+
return_barrier.0.wait(); // Wait for thread 2 to lock its job
122+
Ok(())
58123
});
59124

60-
let t2 = thread::spawn(move || {
61-
fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
62-
get_single_job(&connection(), |job| {
63-
assert_eq!(second_job_id, job.id);
64-
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
65-
Ok(())
66-
})
67-
.unwrap();
125+
fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
126+
runner.get_single_job(move |job| {
127+
assert_eq!(second_job_id, job.id);
128+
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
129+
Ok(())
68130
});
69131

70-
t1.join().unwrap();
71-
t2.join().unwrap();
132+
runner.wait_for_jobs();
72133
}
73134

74135
#[test]
75136
fn jobs_are_deleted_when_successfully_run() {
76137
let _guard = TestGuard::lock();
77138

78-
let conn = connection();
79-
create_dummy_job(&conn);
139+
let runner = runner();
140+
create_dummy_job(&runner);
80141

81-
get_single_job(&conn, |_| {
142+
runner.get_single_job(|_| {
82143
Ok(())
83-
}).unwrap();
144+
});
145+
runner.wait_for_jobs();
84146

85147
let remaining_jobs = background_jobs.count()
86-
.get_result(&conn);
148+
.get_result(&runner.connection().unwrap());
87149
assert_eq!(Ok(0), remaining_jobs);
88150
}
89151

90152
#[test]
91153
fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
92154
let _guard = TestGuard::lock();
93-
create_dummy_job(&connection());
155+
156+
let runner = runner();
157+
create_dummy_job(&runner);
94158
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
95159
let barrier2 = barrier.clone();
96160

97-
let t1 = thread::spawn(move || {
98-
let _ = get_single_job(&connection(), |_| {
99-
barrier.0.wait();
100-
// error so the job goes back into the queue
101-
Err(human("nope"))
102-
});
161+
runner.get_single_job(move |_| {
162+
barrier.0.wait();
163+
// error so the job goes back into the queue
164+
Err(human("nope"))
103165
});
104166

105-
let t2 = thread::spawn(move || {
106-
let conn = connection();
107-
// Wait for the first thread to acquire the lock
108-
barrier2.0.wait();
109-
// We are intentionally not using `get_single_job` here.
110-
// `SKIP LOCKED` is intentionally omitted here, so we block until
111-
// the lock on the first job is released.
112-
// If there is any point where the row is unlocked, but the retry
113-
// count is not updated, we will get a row here.
114-
let available_jobs = background_jobs
115-
.select(id)
116-
.filter(retries.eq(0))
117-
.for_update()
118-
.load::<i64>(&conn)
119-
.unwrap();
120-
assert_eq!(0, available_jobs.len());
167+
let conn = runner.connection().unwrap();
168+
// Wait for the first thread to acquire the lock
169+
barrier2.0.wait();
170+
// We are intentionally not using `get_single_job` here.
171+
// `SKIP LOCKED` is intentionally omitted here, so we block until
172+
// the lock on the first job is released.
173+
// If there is any point where the row is unlocked, but the retry
174+
// count is not updated, we will get a row here.
175+
let available_jobs = background_jobs
176+
.select(id)
177+
.filter(retries.eq(0))
178+
.for_update()
179+
.load::<i64>(&conn)
180+
.unwrap();
181+
assert_eq!(0, available_jobs.len());
121182

122-
// Sanity check to make sure the job actually is there
123-
let total_jobs_including_failed = background_jobs
124-
.select(id)
125-
.for_update()
126-
.load::<i64>(&conn)
127-
.unwrap();
128-
assert_eq!(1, total_jobs_including_failed.len());
129-
});
183+
// Sanity check to make sure the job actually is there
184+
let total_jobs_including_failed = background_jobs
185+
.select(id)
186+
.for_update()
187+
.load::<i64>(&conn)
188+
.unwrap();
189+
assert_eq!(1, total_jobs_including_failed.len());
130190

131-
t1.join().unwrap();
132-
t2.join().unwrap();
191+
runner.wait_for_jobs();
133192
}
134193

135194
#[test]
136195
fn panicking_in_jobs_updates_retry_counter() {
137196
let _guard = TestGuard::lock();
138-
let conn = connection();
139-
let job_id = create_dummy_job(&conn).id;
197+
let runner = runner();
198+
let job_id = create_dummy_job(&runner).id;
140199

141-
let t1 = thread::spawn(move || {
142-
let _ = get_single_job(&connection(), |_| {
143-
panic!()
144-
});
200+
runner.get_single_job(|_| {
201+
panic!()
145202
});
146-
147-
let _ = t1.join();
203+
runner.wait_for_jobs();
148204

149205
let tries = background_jobs
150206
.find(job_id)
151207
.select(retries)
152208
.for_update()
153-
.first::<i32>(&conn)
209+
.first::<i32>(&runner.connection().unwrap())
154210
.unwrap();
155211
assert_eq!(1, tries);
156212
}
157213

158-
159214
lazy_static! {
160215
// Since these tests deal with behavior concerning multiple connections
161216
// running concurrently, they have to run outside of a transaction.
@@ -177,24 +232,32 @@ mod tests {
177232
impl<'a> Drop for TestGuard<'a> {
178233
fn drop(&mut self) {
179234
::diesel::sql_query("TRUNCATE TABLE background_jobs")
180-
.execute(&connection())
235+
.execute(&runner().connection().unwrap())
181236
.unwrap();
182237
}
183238
}
184239

185-
fn connection() -> PgConnection {
240+
fn runner() -> Runner<()> {
186241
use dotenv;
187242

188243
let database_url =
189244
dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests");
190-
PgConnection::establish(&database_url).unwrap()
245+
let manager = r2d2::ConnectionManager::new(database_url);
246+
let pool = r2d2::Pool::builder()
247+
.max_size(2)
248+
.build(manager)
249+
.unwrap();
250+
251+
Runner::builder(pool, ())
252+
.thread_count(2)
253+
.build()
191254
}
192255

193-
fn create_dummy_job(conn: &PgConnection) -> storage::BackgroundJob {
256+
fn create_dummy_job(runner: &Runner<()>) -> storage::BackgroundJob {
194257
::diesel::insert_into(background_jobs)
195258
.values((job_type.eq("Foo"), data.eq(json!(null))))
196259
.returning((id, job_type, data))
197-
.get_result(conn)
260+
.get_result(&runner.connection().unwrap())
198261
.unwrap()
199262
}
200263
}

0 commit comments

Comments
 (0)