Skip to content

Commit 3271221

Browse files
authored
Add method that compresses next chunk of room (#64)
1 parent a9bc800 commit 3271221

File tree

7 files changed

+223
-3
lines changed

7 files changed

+223
-3
lines changed

Cargo.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

auto_compressor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
//! to the database and uses these to enable it to incrementally work
77
//! on space reductions
88
9+
pub mod manager;
910
pub mod state_saving;

auto_compressor/src/manager.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// This module contains functions that carry out diffferent types
2+
// of compression on the database.
3+
4+
use crate::state_saving::{
5+
connect_to_database, read_room_compressor_state, write_room_compressor_state,
6+
};
7+
use anyhow::{Context, Result};
8+
use log::{debug, warn};
9+
use synapse_compress_state::{continue_run, ChunkStats, Level};
10+
11+
/// Runs the compressor on a chunk of the room
12+
///
13+
/// Returns `Some(chunk_stats)` if the compressor has progressed
14+
/// and `None` if it had already got to the end of the room
15+
///
16+
/// # Arguments
17+
///
18+
/// * `db_url` - The URL of the postgres database that synapse is using.
19+
/// e.g. "postgresql://user:[email protected]/synapse"
20+
///
21+
/// * `room_id` - The id of the room to run the compressor on. Note this
22+
/// is the id as stored in the database and will look like
23+
/// "!aasdfasdfafdsdsa:matrix.org" instead of the common
24+
/// name
25+
///
26+
/// * `chunk_size` - The number of state_groups to work on. All of the entries
27+
/// from state_groups_state are requested from the database
28+
/// for state groups that are worked on. Therefore small
29+
/// chunk sizes may be needed on machines with low memory.
30+
/// (Note: if the compressor fails to find space savings on the
31+
/// chunk as a whole (which may well happen in rooms with lots
32+
/// of backfill in) then the entire chunk is skipped.)
33+
///
34+
/// * `default_levels` - If the compressor has never been run on this room before
35+
/// then we need to provide the compressor with some information
36+
/// on what sort of compression structure we want. The default that
37+
/// the library suggests is `vec![Level::new(100), Level::new(50), Level::new(25)]`
38+
pub fn run_compressor_on_room_chunk(
39+
db_url: &str,
40+
room_id: &str,
41+
chunk_size: i64,
42+
default_levels: &[Level],
43+
) -> Result<Option<ChunkStats>> {
44+
// connect to the database
45+
let mut client =
46+
connect_to_database(db_url).with_context(|| format!("Failed to connect to {}", db_url))?;
47+
48+
// Access the database to find out where the compressor last got up to
49+
let retrieved_state = read_room_compressor_state(&mut client, room_id)
50+
.with_context(|| format!("Failed to read compressor state for room {}", room_id,))?;
51+
52+
// If the database didn't contain any information, then use the default state
53+
let (start, level_info) = match retrieved_state {
54+
Some((s, l)) => (Some(s), l),
55+
None => (None, default_levels.to_vec()),
56+
};
57+
58+
// run the compressor on this chunk
59+
let option_chunk_stats = continue_run(start, chunk_size, db_url, room_id, &level_info);
60+
61+
if option_chunk_stats.is_none() {
62+
debug!("No work to do on this room...");
63+
return Ok(None);
64+
}
65+
66+
// Ok to unwrap because have checked that it's not None
67+
let chunk_stats = option_chunk_stats.unwrap();
68+
69+
debug!("{:?}", chunk_stats);
70+
71+
// Check to see whether the compressor sent its changes to the database
72+
if !chunk_stats.commited {
73+
if chunk_stats.new_num_rows - chunk_stats.original_num_rows != 0 {
74+
warn!(
75+
"The compressor tried to increase the number of rows in {} between {:?} and {}. Skipping...",
76+
room_id, start, chunk_stats.last_compressed_group,
77+
);
78+
}
79+
80+
// Skip over the failed chunk and set the level info to the default (empty) state
81+
write_room_compressor_state(
82+
&mut client,
83+
room_id,
84+
default_levels,
85+
chunk_stats.last_compressed_group,
86+
)
87+
.with_context(|| {
88+
format!(
89+
"Failed to skip chunk in room {} between {:?} and {}",
90+
room_id, start, chunk_stats.last_compressed_group
91+
)
92+
})?;
93+
94+
return Ok(Some(chunk_stats));
95+
}
96+
97+
// Save where we got up to after this successful commit
98+
write_room_compressor_state(
99+
&mut client,
100+
room_id,
101+
&chunk_stats.new_level_info,
102+
chunk_stats.last_compressed_group,
103+
)
104+
.with_context(|| {
105+
format!(
106+
"Failed to save state after compressing chunk in room {} between {:?} and {}",
107+
room_id, start, chunk_stats.last_compressed_group
108+
)
109+
})?;
110+
111+
Ok(Some(chunk_stats))
112+
}

compressor_integration_tests/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ postgres-openssl = "0.5.0"
1414
rand = "0.8.0"
1515
synapse_compress_state = { path = "../" }
1616
auto_compressor = { path = "../auto_compressor/" }
17+
env_logger = "0.9.0"
18+
log = "0.4.14"
1719

1820
[dependencies.state-map]
1921
git = "https://github.com/matrix-org/rust-matrix-state-map"

compressor_integration_tests/src/lib.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use log::LevelFilter;
12
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
23
use postgres::{fallible_iterator::FallibleIterator, Client};
34
use postgres_openssl::MakeTlsConnector;
45
use rand::{distributions::Alphanumeric, thread_rng, Rng};
56
use state_map::StateMap;
6-
use std::{borrow::Cow, collections::BTreeMap, fmt};
7+
use std::{borrow::Cow, collections::BTreeMap, env, fmt};
78
use string_cache::DefaultAtom as Atom;
89

910
use synapse_compress_state::StateGroupEntry;
@@ -352,3 +353,27 @@ fn functions_are_self_consistent() {
352353
assert!(database_collapsed_states_match_map(&initial));
353354
assert!(database_structure_matches_map(&initial));
354355
}
356+
357+
pub fn setup_logger() {
358+
// setup the logger for the auto_compressor
359+
// The default can be overwritten with COMPRESSOR_LOG_LEVEL
360+
// see the README for more information <--- TODO
361+
if env::var("COMPRESSOR_LOG_LEVEL").is_err() {
362+
let mut log_builder = env_logger::builder();
363+
// set is_test(true) so that the output is hidden by cargo test (unless the test fails)
364+
log_builder.is_test(true);
365+
// default to printing the debug information for both packages being tested
366+
// (Note that just setting the global level to debug will log every sql transaction)
367+
log_builder.filter_module("synapse_compress_state", LevelFilter::Debug);
368+
log_builder.filter_module("auto_compressor", LevelFilter::Debug);
369+
// use try_init() incase the logger has been setup by some previous test
370+
let _ = log_builder.try_init();
371+
} else {
372+
// If COMPRESSOR_LOG_LEVEL was set then use that
373+
let mut log_builder = env_logger::Builder::from_env("COMPRESSOR_LOG_LEVEL");
374+
// set is_test(true) so that the output is hidden by cargo test (unless the test fails)
375+
log_builder.is_test(true);
376+
// use try_init() in case the logger has been setup by some previous test
377+
let _ = log_builder.try_init();
378+
}
379+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use auto_compressor::{
2+
manager::run_compressor_on_room_chunk,
3+
state_saving::{connect_to_database, create_tables_if_needed},
4+
};
5+
use compressor_integration_tests::{
6+
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
7+
database_structure_matches_map, empty_database,
8+
map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state},
9+
setup_logger, DB_URL,
10+
};
11+
use serial_test::serial;
12+
use synapse_compress_state::Level;
13+
14+
#[test]
15+
#[serial(db)]
16+
fn run_compressor_on_room_chunk_works() {
17+
setup_logger();
18+
// This starts with the following structure
19+
//
20+
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
21+
//
22+
// Each group i has state:
23+
// ('node','is', i)
24+
// ('group', j, 'seen') - for all j less than i
25+
let initial = line_segments_with_state(0, 13);
26+
empty_database();
27+
add_contents_to_database("room1", &initial);
28+
29+
let mut client = connect_to_database(DB_URL).unwrap();
30+
create_tables_if_needed(&mut client).unwrap();
31+
clear_compressor_state();
32+
33+
// compress in 3,3 level sizes by default
34+
let default_levels = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
35+
36+
// compress the first 7 groups in the room
37+
// structure should be the following afterwards
38+
// (NOTE: only including compressed groups)
39+
//
40+
// 0 3\
41+
// 1 4 6
42+
// 2 5
43+
run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap();
44+
45+
// compress the next 7 groups
46+
47+
run_compressor_on_room_chunk(DB_URL, "room1", 7, &default_levels).unwrap();
48+
49+
// This should have created the following structure in the database
50+
// i.e. groups 6 and 9 should have changed from before
51+
// N.B. this saves 11 rows
52+
//
53+
// 0 3\ 12
54+
// 1 4 6\ 13
55+
// 2 5 7 9
56+
// 8 10
57+
// 11
58+
let expected = compressed_3_3_from_0_to_13_with_state();
59+
60+
// Check that the database still gives correct states for each group!
61+
assert!(database_collapsed_states_match_map(&initial));
62+
63+
// Check that the structure of the database matches the expected structure
64+
assert!(database_structure_matches_map(&expected));
65+
}

compressor_integration_tests/tests/compressor_continue_run_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn continue_run_called_twice_same_as_run() {
3535

3636
// compress in 3,3 level sizes
3737
// since the compressor hasn't been run before they are empty
38-
let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
38+
let level_info = vec![Level::new(3), Level::new(3)];
3939

4040
// Run the compressor with those settings
4141
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();

0 commit comments

Comments
 (0)