Skip to content

Commit 4644765

Browse files
persistent_state: Bound the memory used during index creation
During index creation, we parallelize the work in case we need to create multiple indexes. We do this in a way where we create a mpsc channel to pass the deserialized row data to the index creation threads. For btree indexes, we define our own comparator function, aka make_compare_keys!. This function needs to perform extra work such deserialization of keys, comparison of keys, ... In comparison to hashmap indexes, btree indexes are slower to create by the inner threads. In such situation, where the inner threads are falling behind, we end up storing an excessive amount of data in the channel, sometimes even loading almost all table into memory. This commit changes the channel to a sync channel, where we limit the amount of data (rows) that can be in the buffer. Fixes: REA-5767 Closes: #1528 Release-Note-Core: Fixed an issue where a CREATE CACHE using a range condition (Btree index) could consume excessive memory. Change-Id: I7c70d4b4a398ee6c4c5891bdce23806e5c5e6db6 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9674 Reviewed-by: Sidney Cammeresi <sac@readyset.io> Tested-by: Buildkite CI
1 parent 6af3fdc commit 4644765

File tree

1 file changed

+4
-3
lines changed
  • dataflow-state/src/persistent_state

1 file changed

+4
-3
lines changed

dataflow-state/src/persistent_state/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use std::cmp::Ordering;
6969
use std::io::{self, Read};
7070
use std::path::PathBuf;
7171
use std::str::FromStr;
72-
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
72+
use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender};
7373
use std::sync::{mpsc, Arc};
7474
use std::thread::{self, JoinHandle};
7575
use std::time::{Duration, Instant};
@@ -1836,11 +1836,11 @@ impl PersistentState {
18361836
}
18371837

18381838
#[allow(clippy::type_complexity)]
1839-
fn make_channels<T>(num: usize) -> (Vec<Sender<Arc<T>>>, Vec<Receiver<Arc<T>>>) {
1839+
fn make_channels<T>(num: usize) -> (Vec<SyncSender<Arc<T>>>, Vec<Receiver<Arc<T>>>) {
18401840
let mut txs = Vec::new();
18411841
let mut rxs = Vec::new();
18421842
for _ in 0..num {
1843-
let (tx, rx) = mpsc::channel();
1843+
let (tx, rx) = mpsc::sync_channel(INDEX_BATCH_SIZE * 4);
18441844
txs.push(tx);
18451845
rxs.push(rx);
18461846
}
@@ -1916,6 +1916,7 @@ impl PersistentState {
19161916

19171917
while let (Some(pk), Some(value)) = (iter.key(), iter.value()) {
19181918
let row = deserialize_row(value);
1919+
// TODO: only pass data that will be used by any index.
19191920
let kv = Arc::new(IndexKeyValue::new(pk.to_vec(), row));
19201921
for tx in &txs {
19211922
tx.send(Arc::clone(&kv))

0 commit comments

Comments
 (0)