Skip to content

Commit 63e1d6e

Browse files
authored
Added option to only run the compressor on a range of state groups in a room (#44)
1 parent 3290726 commit 63e1d6e

File tree

4 files changed

+281
-88
lines changed

4 files changed

+281
-88
lines changed

src/compressor.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,23 @@ impl<'a> Compressor<'a> {
141141
pb.enable_steady_tick(100);
142142

143143
for (&state_group, entry) in self.original_state_map {
144+
// Check whether this entry is in_range or is just present in the map due to being
145+
// a predecessor of a group that IS in_range for compression
146+
if !entry.in_range {
147+
let new_entry = StateGroupEntry {
148+
// in_range is kept the same so that the new entry is equal to the old entry
149+
// otherwise it might trigger a useless database transaction
150+
in_range: entry.in_range,
151+
prev_state_group: entry.prev_state_group,
152+
state_map: entry.state_map.clone(),
153+
};
154+
// Paranoidly assert that not making changes to this entry
155+
// could probably be removed...
156+
assert!(new_entry == *entry);
157+
self.new_state_group_map.insert(state_group, new_entry);
158+
159+
continue;
160+
}
144161
let mut prev_state_group = None;
145162
for level in &mut self.levels {
146163
if level.has_space() {
@@ -162,6 +179,7 @@ impl<'a> Compressor<'a> {
162179
self.new_state_group_map.insert(
163180
state_group,
164181
StateGroupEntry {
182+
in_range: true,
165183
prev_state_group,
166184
state_map: delta,
167185
},
@@ -239,6 +257,7 @@ fn test_new_map() {
239257
initial.insert(
240258
i,
241259
StateGroupEntry {
260+
in_range: true,
242261
prev_state_group: prev,
243262
state_map: StateMap::new(),
244263
},

src/database.rs

Lines changed: 152 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,40 +25,62 @@ use super::StateGroupEntry;
2525
/// specific room.
2626
///
2727
/// - Connects to the database
28-
/// - Fetches rows with group id lower than max
28+
/// - Fetches the first [group] rows with group id after [min]
2929
/// - Recursively searches for missing predecessors and adds those
3030
///
31+
/// Returns with the state_group map and the id of the last group that was used
32+
///
3133
/// # Arguments
3234
///
33-
/// * `room_id` - The ID of the room in the database
34-
/// * `db_url` - The URL of a Postgres database. This should be of the
35-
/// form: "postgresql://user:pass@domain:port/database"
36-
/// * `max_state_group` - If specified, then only fetch the entries for state
37-
/// groups lower than or equal to this number. (N.B. all
38-
/// predecessors are also fetched)
35+
/// * `room_id` - The ID of the room in the database
36+
/// * `db_url` - The URL of a Postgres database. This should be of the
37+
/// form: "postgresql://user:pass@domain:port/database"
38+
/// * `min_state_group` - If specified, then only fetch the entries for state
39+
/// groups greater than (but not equal) to this number. It
40+
/// also requires groups_to_compress to be specified
41+
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
42+
/// * `max_state_group` - If specified, then only fetch the entries for state
43+
/// groups lower than or equal to this number.
3944
pub fn get_data_from_db(
4045
db_url: &str,
4146
room_id: &str,
47+
min_state_group: Option<i64>,
48+
groups_to_compress: Option<i64>,
4249
max_state_group: Option<i64>,
43-
) -> BTreeMap<i64, StateGroupEntry> {
50+
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
51+
// connect to the database
4452
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
4553
builder.set_verify(SslVerifyMode::NONE);
4654
let connector = MakeTlsConnector::new(builder.build());
4755

4856
let mut client = Client::connect(db_url, connector).unwrap();
4957

50-
let mut state_group_map = get_initial_data_from_db(&mut client, room_id, max_state_group);
58+
// Search for the group id of the groups_to_compress'th group after min_state_group
59+
// If this is saved, then the compressor can continue by having min_state_group being
60+
// set to this maximum
61+
let max_group_found = find_max_group(
62+
&mut client,
63+
room_id,
64+
min_state_group,
65+
groups_to_compress,
66+
max_state_group,
67+
);
68+
69+
let mut state_group_map =
70+
get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found);
5171

5272
println!("Got initial state from database. Checking for any missing state groups...");
5373

5474
// Due to reasons some of the state groups appear in the edges table, but
55-
// not in the state_groups_state table. This means they don't get included
56-
// in our DB queries, so we have to fetch any missing groups explicitly.
75+
// not in the state_groups_state table.
76+
//
77+
// Also it is likely that the predecessor of a node will not be within the
78+
// chunk that was specified by min_state_group and groups_to_compress.
79+
// This means they don't get included in our DB queries, so we have to fetch
80+
// any missing groups explicitly.
81+
//
5782
// Since the returned groups may themselves reference groups we don't have,
5883
// we need to do this recursively until we don't find any more missing.
59-
//
60-
// N.B. This does NOT currently fetch the deltas for the missing groups!
61-
// By carefully chosen max_state_group this might cause issues...?
6284
loop {
6385
let mut missing_sgs: Vec<_> = state_group_map
6486
.iter()
@@ -76,41 +98,92 @@ pub fn get_data_from_db(
7698
.collect();
7799

78100
if missing_sgs.is_empty() {
79-
println!("No missing state groups");
101+
// println!("No missing state groups");
80102
break;
81103
}
82104

83105
missing_sgs.sort_unstable();
84106
missing_sgs.dedup();
85107

86-
println!("Missing {} state groups", missing_sgs.len());
108+
// println!("Missing {} state groups", missing_sgs.len());
87109

88-
let map = get_missing_from_db(&mut client, &missing_sgs);
89-
state_group_map.extend(map.into_iter());
110+
// find state groups not picked up already and add them to the map
111+
let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found);
112+
for (k, v) in map {
113+
state_group_map.entry(k).or_insert(v);
114+
}
90115
}
91116

92-
state_group_map
117+
(state_group_map, max_group_found)
118+
}
119+
120+
/// Returns the group ID of the last group to be compressed
121+
///
122+
/// This can be saved so that future runs of the compressor only
123+
/// continue from after this point
124+
///
125+
/// # Arguments
126+
///
127+
/// * `client` - A Postgres client to make requests with
128+
/// * `room_id` - The ID of the room in the database
129+
/// * `min_state_group` - The lower limit (non inclusive) of group id's to compress
130+
/// * 'groups_to_compress' - How many groups to compress
131+
/// * `max_state_group` - The upper bound on what this method can return
132+
fn find_max_group(
133+
client: &mut Client,
134+
room_id: &str,
135+
min_state_group: Option<i64>,
136+
groups_to_compress: Option<i64>,
137+
max_state_group: Option<i64>,
138+
) -> i64 {
139+
// Get list of state_id's in a certain room
140+
let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string();
141+
let params: Vec<&(dyn ToSql + Sync)>;
142+
143+
if let Some(max) = max_state_group {
144+
query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max)
145+
}
146+
147+
// Adds additional constraint if a groups_to_compress has been specified
148+
if min_state_group.is_some() && groups_to_compress.is_some() {
149+
params = vec![&room_id, &min_state_group, &groups_to_compress];
150+
query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids);
151+
} else {
152+
params = vec![&room_id];
153+
query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids);
154+
}
155+
156+
let sql_query = format!(
157+
"SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1",
158+
query_chunk_of_ids
159+
);
160+
let final_row = client.query(sql_query.as_str(), &params).unwrap();
161+
162+
final_row.last().unwrap().get(0)
93163
}
94164

95165
/// Fetch the entries in state_groups_state and immediate predecessors for
96166
/// a specific room.
97167
///
98-
/// - Fetches rows with group id lower than max
168+
/// - Fetches first [groups_to_compress] rows with group id higher than min
99169
/// - Stores the group id, predecessor id and deltas into a map
170+
/// - returns map and maximum row that was considered
100171
///
101172
/// # Arguments
102173
///
103174
/// * `client` - A Postgres client to make requests with
104175
/// * `room_id` - The ID of the room in the database
105-
/// * `max_state_group` - If specified, then only fetch the entries for state
106-
/// groups lower than or equal to this number. (N.B. doesn't
107-
/// fetch IMMEDIATE predecessors if ID is above this number)
176+
/// * `min_state_group` - If specified, then only fetch the entries for state
177+
/// groups greater than (but not equal) to this number. It
178+
/// also requires groups_to_compress to be specified
179+
/// * 'max_group_found' - The upper limit on state_groups ids to get from the database
108180
fn get_initial_data_from_db(
109181
client: &mut Client,
110182
room_id: &str,
111-
max_state_group: Option<i64>,
183+
min_state_group: Option<i64>,
184+
max_group_found: i64,
112185
) -> BTreeMap<i64, StateGroupEntry> {
113-
// Query to get id, predecessor and delta for each state group
186+
// Query to get id, predecessor and deltas for each state group
114187
let sql = r#"
115188
SELECT m.id, prev_state_group, type, state_key, s.event_id
116189
FROM state_groups AS m
@@ -119,18 +192,21 @@ fn get_initial_data_from_db(
119192
WHERE m.room_id = $1
120193
"#;
121194

122-
// Adds additional constraint if a max_state_group has been specified
123-
// Then sends query to the datatbase
124-
let mut rows = if let Some(s) = max_state_group {
125-
let params: Vec<&dyn ToSql> = vec![&room_id, &s];
126-
client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params)
195+
// Adds additional constraint if minimum state_group has been specified.
196+
// note that the maximum group only affects queries if there is also a minimum
197+
// otherwise it is assumed that ALL groups should be fetched
198+
let mut rows = if let Some(min) = min_state_group {
199+
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found];
200+
client.query_raw(
201+
format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(),
202+
params,
203+
)
127204
} else {
128205
client.query_raw(sql, &[room_id])
129206
}
130207
.unwrap();
131208

132209
// Copy the data from the database into a map
133-
134210
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
135211

136212
let pb = ProgressBar::new_spinner();
@@ -143,8 +219,10 @@ fn get_initial_data_from_db(
143219
// The row in the map to copy the data to
144220
let entry = state_group_map.entry(row.get(0)).or_default();
145221

146-
// Save the predecessor (this may already be there)
222+
// Save the predecessor and mark for compression (this may already be there)
223+
// TODO: slightly fewer redundant rewrites
147224
entry.prev_state_group = row.get(1);
225+
entry.in_range = true;
148226

149227
// Copy the single delta from the predecessor stored in this row
150228
if let Some(etype) = row.get::<_, Option<String>>(2) {
@@ -172,34 +250,57 @@ fn get_initial_data_from_db(
172250
///
173251
/// * `client` - A Postgres client to make requests with
174252
/// * `missing_sgs` - An array of missing state_group ids
175-
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
176-
let mut rows = client
177-
.query_raw(
178-
r#"
179-
SELECT state_group, prev_state_group
180-
FROM state_group_edges
181-
WHERE state_group = ANY($1)
182-
"#,
183-
&[missing_sgs],
184-
)
185-
.unwrap();
253+
/// * 'min_state_group' - Minimum state_group id to mark as in range
254+
/// * 'max_group_found' - Maximum state_group id to mark as in range
255+
fn get_missing_from_db(
256+
client: &mut Client,
257+
missing_sgs: &[i64],
258+
min_state_group: Option<i64>,
259+
max_group_found: i64,
260+
) -> BTreeMap<i64, StateGroupEntry> {
261+
// "Due to reasons" it is possible that some states only appear in edges table and not in state_groups table
262+
// so since we know the IDs we're looking for as they are the missing predecessors, we can find them by
263+
// left joining onto the edges table (instead of the state_group table!)
264+
let sql = r#"
265+
SELECT target.prev_state_group, source.prev_state_group, state.type, state.state_key, state.event_id
266+
FROM state_group_edges AS target
267+
LEFT JOIN state_group_edges AS source ON (target.prev_state_group = source.state_group)
268+
LEFT JOIN state_groups_state AS state ON (target.prev_state_group = state.state_group)
269+
WHERE target.prev_state_group = ANY($1)
270+
"#;
271+
272+
let mut rows = client.query_raw(sql, &[missing_sgs]).unwrap();
186273

187-
// initialise the map with empty entries (the missing group may not
188-
// have a prev_state_group either)
189-
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = missing_sgs
190-
.iter()
191-
.map(|sg| (*sg, StateGroupEntry::default()))
192-
.collect();
274+
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
193275

194276
while let Some(row) = rows.next().unwrap() {
195-
let state_group = row.get(0);
196-
let entry = state_group_map.get_mut(&state_group).unwrap();
277+
let id = row.get(0);
278+
// The row in the map to copy the data to
279+
let entry = state_group_map.entry(id).or_default();
280+
281+
// Save the predecessor and mark for compression (this may already be there)
282+
// Also may well not exist!
197283
entry.prev_state_group = row.get(1);
284+
if let Some(min) = min_state_group {
285+
if min < id && id <= max_group_found {
286+
entry.in_range = true
287+
}
288+
}
289+
290+
// Copy the single delta from the predecessor stored in this row
291+
if let Some(etype) = row.get::<_, Option<String>>(2) {
292+
entry.state_map.insert(
293+
&etype,
294+
&row.get::<_, String>(3),
295+
row.get::<_, String>(4).into(),
296+
);
297+
}
198298
}
199299

200300
state_group_map
201301
}
202302

303+
// TODO: find a library that has an existing safe postgres escape function
203304
/// Helper function that escapes the wrapped text when writing SQL
204305
pub struct PGEscape<'a>(pub &'a str);
205306

src/graphing.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ fn output_csv(groups: &Graph, edges_output: &mut File, nodes_output: &mut File)
5757
/// * `after` - A map from state group ids to StateGroupEntries
5858
/// the information from this map goes into after_edges.csv
5959
/// and after_nodes.csv
60-
pub fn make_graphs(before: Graph, after: Graph) {
60+
pub fn make_graphs(before: &Graph, after: &Graph) {
6161
// Open all the files to output to
6262
let mut before_edges_file = File::create("before_edges.csv").unwrap();
6363
let mut before_nodes_file = File::create("before_nodes.csv").unwrap();
6464
let mut after_edges_file = File::create("after_edges.csv").unwrap();
6565
let mut after_nodes_file = File::create("after_nodes.csv").unwrap();
6666

6767
// Write before's information to before_edges and before_nodes
68-
output_csv(&before, &mut before_edges_file, &mut before_nodes_file);
68+
output_csv(before, &mut before_edges_file, &mut before_nodes_file);
6969
// Write afters's information to after_edges and after_nodes
70-
output_csv(&after, &mut after_edges_file, &mut after_nodes_file);
70+
output_csv(after, &mut after_edges_file, &mut after_nodes_file);
7171
}

0 commit comments

Comments
 (0)