Skip to content

Commit a069d87

Browse files
authored
Add method that compresses the chunks with lowest uncompressed state_group ids (#72)
1 parent 3271221 commit a069d87

File tree

6 files changed

+331
-9
lines changed

6 files changed

+331
-9
lines changed

auto_compressor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ crate-type = ["cdylib", "rlib"]
2727

2828
[dependencies.pyo3]
2929
version = "0.14.1"
30-
features = ["extension-module","abi3-py36"]
30+
features = ["extension-module","abi3-py36"]

auto_compressor/src/manager.rs

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
// of compression on the database.
33

44
use crate::state_saving::{
5-
connect_to_database, read_room_compressor_state, write_room_compressor_state,
5+
connect_to_database, create_tables_if_needed, get_next_room_to_compress,
6+
read_room_compressor_state, write_room_compressor_state,
67
};
7-
use anyhow::{Context, Result};
8-
use log::{debug, warn};
8+
use anyhow::{bail, Context, Result};
9+
use log::{debug, info, warn};
910
use synapse_compress_state::{continue_run, ChunkStats, Level};
1011

1112
/// Runs the compressor on a chunk of the room
@@ -110,3 +111,84 @@ pub fn run_compressor_on_room_chunk(
110111

111112
Ok(Some(chunk_stats))
112113
}
114+
115+
/// Runs the compressor in chunks on rooms with the lowest uncompressed state group ids
116+
///
117+
/// # Arguments
118+
///
119+
/// * `db_url` - The URL of the postgres database that synapse is using.
120+
/// e.g. "postgresql://user:[email protected]/synapse"
121+
///
122+
/// * `chunk_size` - The number of state_groups to work on. All of the entries
123+
/// from state_groups_state are requested from the database
124+
/// for state groups that are worked on. Therefore small
125+
/// chunk sizes may be needed on machines with low memory.
126+
/// (Note: if the compressor fails to find space savings on the
127+
/// chunk as a whole (which may well happen in rooms with lots
128+
/// of backfill in) then the entire chunk is skipped.)
129+
///
130+
/// * `default_levels` - If the compressor has never been run on this room before
131+
/// Then we need to provide the compressor with some information
132+
/// on what sort of compression structure we want. The default that
133+
/// the library suggests is empty levels with max sizes of 100, 50 and 25
134+
///
135+
/// * `number_of_chunks`- The number of chunks to compress. The larger this number is, the longer
136+
/// the compressor will run for.
137+
pub fn compress_chunks_of_database(
138+
db_url: &str,
139+
chunk_size: i64,
140+
default_levels: &[Level],
141+
number_of_chunks: i64,
142+
) -> Result<()> {
143+
// connect to the database
144+
let mut client = connect_to_database(db_url)
145+
.with_context(|| format!("Failed to connect to database at {}", db_url))?;
146+
147+
create_tables_if_needed(&mut client).context("Failed to create state compressor tables")?;
148+
149+
let mut skipped_chunks = 0;
150+
let mut rows_saved = 0;
151+
let mut chunks_processed = 0;
152+
153+
while chunks_processed < number_of_chunks {
154+
let room_to_compress = get_next_room_to_compress(&mut client)
155+
.context("Failed to work out what room to compress next")?;
156+
157+
if room_to_compress.is_none() {
158+
break;
159+
}
160+
161+
let room_to_compress =
162+
room_to_compress.expect("Have checked that rooms_to_compress is not None");
163+
164+
info!(
165+
"Running compressor on room {} with chunk size {}",
166+
room_to_compress, chunk_size
167+
);
168+
169+
let work_done =
170+
run_compressor_on_room_chunk(db_url, &room_to_compress, chunk_size, default_levels)?;
171+
172+
if let Some(ref chunk_stats) = work_done {
173+
if chunk_stats.commited {
174+
let savings = chunk_stats.original_num_rows - chunk_stats.new_num_rows;
175+
rows_saved += chunk_stats.original_num_rows - chunk_stats.new_num_rows;
176+
debug!("Saved {} rows for room {}", savings, room_to_compress);
177+
} else {
178+
skipped_chunks += 1;
179+
debug!(
180+
"Unable to make savings for room {}, skipping chunk",
181+
room_to_compress
182+
);
183+
}
184+
chunks_processed += 1;
185+
} else {
186+
bail!("Ran the compressor on a room that had no more work to do!")
187+
}
188+
}
189+
info!(
190+
"Finished running compressor. Saved {} rows. Skipped {}/{} chunks",
191+
rows_saved, skipped_chunks, chunks_processed
192+
);
193+
Ok(())
194+
}

auto_compressor/src/state_saving.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// This module contains functions to communicate with the database
22

33
use anyhow::{bail, Result};
4+
use log::trace;
45
use synapse_compress_state::Level;
56

67
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
@@ -55,6 +56,20 @@ pub fn create_tables_if_needed(client: &mut Client) -> Result<()> {
5556

5657
client.execute(create_progress_table, &[])?;
5758

59+
let create_compressor_global_progress_table = r#"
60+
CREATE TABLE IF NOT EXISTS state_compressor_total_progress(
61+
lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,
62+
lowest_uncompressed_group BIGINT NOT NULL,
63+
CHECK (Lock='X')
64+
);
65+
INSERT INTO state_compressor_total_progress
66+
(lowest_uncompressed_group)
67+
VALUES (0)
68+
ON CONFLICT (lock) DO NOTHING;
69+
"#;
70+
71+
client.batch_execute(create_compressor_global_progress_table)?;
72+
5873
Ok(())
5974
}
6075

@@ -249,3 +264,58 @@ pub fn write_room_compressor_state(
249264

250265
Ok(())
251266
}
267+
268+
/// Returns the room with with the lowest uncompressed state group id
269+
///
270+
/// A group is detected as uncompressed if it is greater than the `last_compressed`
271+
/// entry in `state_compressor_progress` for that room.
272+
///
273+
/// The `lowest_uncompressed_group` value stored in `state_compressor_total_progress`
274+
/// stores where this method last finished, to prevent repeating work
275+
///
276+
/// # Arguments
277+
///
278+
/// * `client` - A postgres client used to send the requests to the database
279+
pub fn get_next_room_to_compress(client: &mut Client) -> Result<Option<String>> {
280+
// Walk the state_groups table until find next uncompressed group
281+
let get_next_room = r#"
282+
SELECT room_id, id
283+
FROM state_groups
284+
LEFT JOIN state_compressor_progress USING (room_id)
285+
WHERE
286+
id >= (SELECT lowest_uncompressed_group FROM state_compressor_total_progress)
287+
AND (
288+
id > last_compressed
289+
OR last_compressed IS NULL
290+
)
291+
ORDER BY id ASC
292+
LIMIT 1
293+
"#;
294+
295+
let row_opt = client.query_opt(get_next_room, &[])?;
296+
297+
let next_room_row = if let Some(row) = row_opt {
298+
row
299+
} else {
300+
return Ok(None);
301+
};
302+
303+
let next_room: String = next_room_row.get("room_id");
304+
let lowest_uncompressed_group: i64 = next_room_row.get("id");
305+
306+
// This method has determined where the lowest uncompressesed group is, save that
307+
// information so we don't have to redo this work in the future.
308+
let update_total_progress = r#"
309+
UPDATE state_compressor_total_progress SET lowest_uncompressed_group = $1;
310+
"#;
311+
312+
client.execute(update_total_progress, &[&lowest_uncompressed_group])?;
313+
314+
trace!(
315+
"next_room: {}, lowest_uncompressed: {}",
316+
next_room,
317+
lowest_uncompressed_group
318+
);
319+
320+
Ok(Some(next_room))
321+
}

compressor_integration_tests/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ pub fn clear_compressor_state() {
314314
let sql = r"
315315
TRUNCATE state_compressor_state;
316316
TRUNCATE state_compressor_progress;
317+
UPDATE state_compressor_total_progress SET lowest_uncompressed_group = 0;
317318
";
318319

319320
client.batch_execute(sql).unwrap();

compressor_integration_tests/tests/auto_compressor_manager_tests.rs

Lines changed: 168 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
use std::collections::BTreeMap;
2+
13
use auto_compressor::{
2-
manager::run_compressor_on_room_chunk,
4+
manager::{compress_chunks_of_database, run_compressor_on_room_chunk},
35
state_saving::{connect_to_database, create_tables_if_needed},
46
};
57
use compressor_integration_tests::{
68
add_contents_to_database, clear_compressor_state, database_collapsed_states_match_map,
79
database_structure_matches_map, empty_database,
8-
map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state},
10+
map_builder::{
11+
compressed_3_3_from_0_to_13_with_state, line_segments_with_state,
12+
structure_from_edges_with_state,
13+
},
914
setup_logger, DB_URL,
1015
};
1116
use serial_test::serial;
@@ -31,7 +36,7 @@ fn run_compressor_on_room_chunk_works() {
3136
clear_compressor_state();
3237

3338
// compress in 3,3 level sizes by default
34-
let default_levels = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
39+
let default_levels = vec![Level::new(3), Level::new(3)];
3540

3641
// compress the first 7 groups in the room
3742
// structure should be the following afterwards
@@ -63,3 +68,163 @@ fn run_compressor_on_room_chunk_works() {
6368
// Check that the structure of the database matches the expected structure
6469
assert!(database_structure_matches_map(&expected));
6570
}
71+
72+
#[test]
73+
#[serial(db)]
74+
fn compress_chunks_of_database_compresses_multiple_rooms() {
75+
setup_logger();
76+
// This creates 2 with the following structure
77+
//
78+
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
79+
// (with room2's numbers shifted up 14)
80+
//
81+
// Each group i has state:
82+
// ('node','is', i)
83+
// ('group', j, 'seen') - for all j less than i in that room
84+
let initial1 = line_segments_with_state(0, 13);
85+
let initial2 = line_segments_with_state(14, 27);
86+
87+
empty_database();
88+
add_contents_to_database("room1", &initial1);
89+
add_contents_to_database("room2", &initial2);
90+
91+
let mut client = connect_to_database(DB_URL).unwrap();
92+
create_tables_if_needed(&mut client).unwrap();
93+
clear_compressor_state();
94+
95+
// compress in 3,3 level sizes by default
96+
let default_levels = vec![Level::new(3), Level::new(3)];
97+
98+
// Compress 4 chunks of size 8.
99+
// The first two should compress room1 and the second two should compress room2
100+
compress_chunks_of_database(DB_URL, 8, &default_levels, 4).unwrap();
101+
102+
// We are aiming for the following structure in the database for room1
103+
// i.e. groups 6 and 9 should have changed from initial map
104+
// N.B. this saves 11 rows
105+
//
106+
// 0 3\ 12
107+
// 1 4 6\ 13
108+
// 2 5 7 9
109+
// 8 10
110+
// 11
111+
//
112+
// Where each group i has state:
113+
// ('node','is', i)
114+
// ('group', j, 'seen') - for all j less than i
115+
let expected1 = compressed_3_3_from_0_to_13_with_state();
116+
117+
// Check that the database still gives correct states for each group in room1
118+
assert!(database_collapsed_states_match_map(&initial1));
119+
120+
// Check that the structure of the database matches the expected structure for room1
121+
assert!(database_structure_matches_map(&expected1));
122+
123+
// room 2 should have the same structure but will all numbers shifted up by 14
124+
let expected_edges: BTreeMap<i64, i64> = vec![
125+
(15, 14),
126+
(16, 15),
127+
(18, 17),
128+
(19, 18),
129+
(20, 17),
130+
(21, 20),
131+
(22, 21),
132+
(23, 20),
133+
(24, 23),
134+
(25, 24),
135+
(27, 26),
136+
]
137+
.into_iter()
138+
.collect();
139+
140+
let expected2 = structure_from_edges_with_state(expected_edges, 14, 27);
141+
142+
// Check that the database still gives correct states for each group in room2
143+
assert!(database_collapsed_states_match_map(&initial2));
144+
145+
// Check that the structure of the database matches the expected structure for room2
146+
assert!(database_structure_matches_map(&expected2));
147+
}
148+
149+
#[test]
150+
#[serial(db)]
151+
fn compress_chunks_of_database_continues_where_it_left_off() {
152+
setup_logger();
153+
// This creates 2 with the following structure
154+
//
155+
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
156+
// (with room2's numbers shifted up 14)
157+
//
158+
// Each group i has state:
159+
// ('node','is', i)
160+
// ('group', j, 'seen') - for all j less than i in that room
161+
let initial1 = line_segments_with_state(0, 13);
162+
let initial2 = line_segments_with_state(14, 27);
163+
164+
empty_database();
165+
add_contents_to_database("room1", &initial1);
166+
add_contents_to_database("room2", &initial2);
167+
168+
let mut client = connect_to_database(DB_URL).unwrap();
169+
create_tables_if_needed(&mut client).unwrap();
170+
clear_compressor_state();
171+
172+
// compress in 3,3 level sizes by default
173+
let default_levels = vec![Level::new(3), Level::new(3)];
174+
175+
// Compress chunks of various sizes:
176+
//
177+
// These two should compress room1
178+
compress_chunks_of_database(DB_URL, 8, &default_levels, 1).unwrap();
179+
compress_chunks_of_database(DB_URL, 100, &default_levels, 1).unwrap();
180+
// These three should compress room2
181+
compress_chunks_of_database(DB_URL, 1, &default_levels, 2).unwrap();
182+
compress_chunks_of_database(DB_URL, 5, &default_levels, 1).unwrap();
183+
compress_chunks_of_database(DB_URL, 5, &default_levels, 1).unwrap();
184+
185+
// We are aiming for the following structure in the database for room1
186+
// i.e. groups 6 and 9 should have changed from initial map
187+
// N.B. this saves 11 rows
188+
//
189+
// 0 3\ 12
190+
// 1 4 6\ 13
191+
// 2 5 7 9
192+
// 8 10
193+
// 11
194+
//
195+
// Where each group i has state:
196+
// ('node','is', i)
197+
// ('group', j, 'seen') - for all j less than i
198+
let expected1 = compressed_3_3_from_0_to_13_with_state();
199+
200+
// Check that the database still gives correct states for each group in room1
201+
assert!(database_collapsed_states_match_map(&initial1));
202+
203+
// Check that the structure of the database matches the expected structure for room1
204+
assert!(database_structure_matches_map(&expected1));
205+
206+
// room 2 should have the same structure but will all numbers shifted up by 14
207+
let expected_edges: BTreeMap<i64, i64> = vec![
208+
(15, 14),
209+
(16, 15),
210+
(18, 17),
211+
(19, 18),
212+
(20, 17),
213+
(21, 20),
214+
(22, 21),
215+
(23, 20),
216+
(24, 23),
217+
(25, 24),
218+
(27, 26),
219+
]
220+
.into_iter()
221+
.collect();
222+
223+
let expected2 = structure_from_edges_with_state(expected_edges, 14, 27);
224+
225+
// Check that the database still gives correct states for each group in room2
226+
assert!(database_collapsed_states_match_map(&initial2));
227+
228+
// Check that the structure of the database matches the expected structure for room2
229+
assert!(database_structure_matches_map(&expected2));
230+
}

0 commit comments

Comments
 (0)