Skip to content

Commit 44b0702

Browse files
authored
[ENH]: Added SegmentWriter and Flusher abstractions over shards (#6814)
## Description of changes This change adds a wrapper over all sharded writer and flusher types. This writer wrapper maintains a vector of sharded writers, one for each segment shard that its segment is concerned with. A new helper `get_shards()` over `Segment` is created to obtain a vector of `SegmentShards`. This function always returns at least one shard. If the given segment is empty it returns an empty shard with no filepaths. All segments are assumed to have one shard for this change. - Improvements & Bug fixes - ... - New functionality - ... ## Test plan Current testing is sufficient to test this change. - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent c30928b commit 44b0702

8 files changed

Lines changed: 614 additions & 265 deletions

File tree

rust/segment/src/blockfile_metadata.rs

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use chroma_types::SPARSE_MAX;
3030
use chroma_types::SPARSE_OFFSET_VALUE;
3131
use chroma_types::STRING_METADATA;
3232
use chroma_types::U32_METADATA;
33-
use chroma_types::{MaterializedLogOperation, MetadataValue, Segment, SegmentShard, SegmentUuid};
33+
use chroma_types::{
34+
MaterializedLogOperation, MetadataValue, Segment, SegmentShard, SegmentShardError, SegmentUuid,
35+
};
3436
use core::panic;
3537
use roaring::RoaringBitmap;
3638
use std::collections::HashMap;
@@ -57,6 +59,104 @@ impl Debug for MetadataSegmentWriterShard<'_> {
5759
}
5860
}
5961

62+
#[derive(Error, Debug)]
63+
#[error(transparent)]
64+
pub struct MetadataSegmentWriterError(#[from] MetadataSegmentError);
65+
66+
impl chroma_error::ChromaError for MetadataSegmentWriterError {
67+
fn code(&self) -> chroma_error::ErrorCodes {
68+
self.0.code()
69+
}
70+
}
71+
72+
#[derive(Clone, Debug)]
73+
pub struct MetadataSegmentWriter<'me> {
74+
shards: Vec<MetadataSegmentWriterShard<'me>>,
75+
pub id: SegmentUuid,
76+
}
77+
78+
impl<'me> MetadataSegmentWriter<'me> {
79+
pub async fn from_segment(
80+
tenant: &str,
81+
database_id: &DatabaseUuid,
82+
segment: &Segment,
83+
blockfile_provider: &BlockfileProvider,
84+
cmek: Option<Cmek>,
85+
) -> Result<Self, MetadataSegmentWriterError> {
86+
let segment_shards = segment
87+
.get_shards()
88+
.map_err(MetadataSegmentError::SegmentShard)?;
89+
90+
if segment_shards.is_empty() {
91+
return Err(MetadataSegmentWriterError(
92+
MetadataSegmentError::SegmentShard(SegmentShardError::EmptyShards),
93+
));
94+
}
95+
96+
// Create futures for all shards
97+
let futures: Vec<_> = segment_shards
98+
.iter()
99+
.map(|shard| {
100+
MetadataSegmentWriterShard::from_segment(
101+
tenant,
102+
database_id,
103+
shard,
104+
blockfile_provider,
105+
cmek.clone(),
106+
)
107+
})
108+
.collect();
109+
110+
// Await all futures concurrently
111+
let writer_shards = futures::future::try_join_all(futures).await?;
112+
113+
Ok(Self {
114+
shards: writer_shards,
115+
id: segment.id,
116+
})
117+
}
118+
119+
pub async fn apply_materialized_log_chunk(
120+
&self,
121+
record_segment_reader: &Option<RecordSegmentReaderShard<'_>>,
122+
materialized: &MaterializeLogsResult,
123+
schema: Option<Schema>,
124+
) -> Result<Option<Schema>, ApplyMaterializedLogError> {
125+
// Apply to all shards concurrently
126+
// TODO(tanujnay112): This ONLY WORKS if we have one shard.
127+
let futures = self.shards.iter().map(|shard| {
128+
shard.apply_materialized_log_chunk(record_segment_reader, materialized, schema.clone())
129+
});
130+
131+
let results = futures::future::try_join_all(futures).await?;
132+
133+
// Return the first non-None schema from any shard
134+
Ok(results.into_iter().find(|s| s.is_some()).flatten())
135+
}
136+
137+
pub async fn finish(&mut self) -> Result<(), Box<dyn ChromaError>> {
138+
// Call finish on all shards concurrently
139+
let futures = self.shards.iter_mut().map(|shard| shard.finish());
140+
141+
futures::future::try_join_all(futures).await?;
142+
Ok(())
143+
}
144+
145+
pub async fn commit(self) -> Result<MetadataSegmentFlusher, Box<dyn ChromaError>> {
146+
let futures = self
147+
.shards
148+
.into_iter()
149+
.map(|shard| Box::pin(shard.commit()));
150+
151+
let flusher_shards = futures::future::try_join_all(futures).await?;
152+
153+
Ok(MetadataSegmentFlusher {
154+
shards: flusher_shards,
155+
id: self.id,
156+
})
157+
}
158+
}
159+
60160
#[derive(Debug, Error)]
61161
pub enum MetadataSegmentError {
62162
#[error("Invalid segment type")]
@@ -75,6 +175,10 @@ pub enum MetadataSegmentError {
75175
#[error("Missing file {0}")]
76176
MissingFile(String),
77177
#[error("Count not parse UUID {0}")]
178+
UUID(String),
179+
#[error("Segment shard error: {0}")]
180+
SegmentShard(#[from] chroma_types::SegmentShardError),
181+
#[error("UUID parse error: {0}")]
78182
UuidParseError(String),
79183
#[error("No writer found")]
80184
NoWriter,
@@ -84,7 +188,7 @@ pub enum MetadataSegmentError {
84188
BlockfileWriteError,
85189
#[error("Limit and offset are not currently supported")]
86190
LimitOffsetNotSupported,
87-
#[error("Could not query metadata index {0}")]
191+
#[error("Metadata index error: {0}")]
88192
MetadataIndexQueryError(#[from] MetadataIndexError),
89193
}
90194

@@ -98,6 +202,8 @@ impl ChromaError for MetadataSegmentError {
98202
MetadataSegmentError::FullTextIndexFilesIntegrityError => ErrorCodes::Internal,
99203
MetadataSegmentError::IncorrectNumberOfFiles => ErrorCodes::Internal,
100204
MetadataSegmentError::MissingFile(_) => ErrorCodes::Internal,
205+
MetadataSegmentError::UUID(_) => ErrorCodes::Internal,
206+
MetadataSegmentError::SegmentShard(e) => e.code(),
101207
MetadataSegmentError::UuidParseError(_) => ErrorCodes::Internal,
102208
MetadataSegmentError::NoWriter => ErrorCodes::Internal,
103209
MetadataSegmentError::EmptyPathVector => ErrorCodes::Internal,
@@ -808,6 +914,10 @@ impl<'me> MetadataSegmentWriterShard<'me> {
808914
let mut count = 0u64;
809915
let mut schema = schema;
810916
let mut schema_modified = false;
917+
tracing::info!(
918+
"Applying metadata materialized log chunk with {} records",
919+
materialized.len()
920+
);
811921

812922
self.apply_fts_logs(record_segment_reader, materialized, &schema)
813923
.await?;
@@ -1165,6 +1275,31 @@ impl<'me> MetadataSegmentWriterShard<'me> {
11651275
}
11661276
}
11671277

1278+
#[derive(Debug)]
1279+
pub struct MetadataSegmentFlusher {
1280+
shards: Vec<MetadataSegmentFlusherShard>,
1281+
pub id: SegmentUuid,
1282+
}
1283+
1284+
impl MetadataSegmentFlusher {
1285+
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
1286+
// Flush all shards and collect file paths
1287+
let mut all_file_paths = HashMap::new();
1288+
1289+
for shard in self.shards {
1290+
let shard_paths = Box::pin(shard.flush()).await?;
1291+
for (key, mut paths) in shard_paths {
1292+
all_file_paths
1293+
.entry(key)
1294+
.or_insert_with(Vec::new)
1295+
.append(&mut paths);
1296+
}
1297+
}
1298+
1299+
Ok(all_file_paths)
1300+
}
1301+
}
1302+
11681303
pub struct MetadataSegmentFlusherShard {
11691304
pub id: SegmentUuid,
11701305
pub(crate) full_text_index_flusher: FullTextIndexFlusher,

rust/segment/src/blockfile_record.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,100 @@ use tracing::{Instrument, Span};
2828

2929
const DEFAULT_BLOOM_FILTER_CAPACITY: u64 = 100_000;
3030

31+
#[derive(Error, Debug)]
32+
#[error(transparent)]
33+
pub struct RecordSegmentWriterError(#[from] RecordSegmentWriterShardCreationError);
34+
35+
impl chroma_error::ChromaError for RecordSegmentWriterError {
36+
fn code(&self) -> chroma_error::ErrorCodes {
37+
self.0.code()
38+
}
39+
}
40+
41+
#[derive(Error, Debug)]
42+
#[error(transparent)]
43+
pub struct RecordSegmentWriterCreationError(#[from] RecordSegmentWriterShardCreationError);
44+
45+
impl chroma_error::ChromaError for RecordSegmentWriterCreationError {
46+
fn code(&self) -> chroma_error::ErrorCodes {
47+
self.0.code()
48+
}
49+
}
50+
51+
#[derive(Clone, Debug)]
52+
pub struct RecordSegmentWriter {
53+
shards: Vec<RecordSegmentWriterShard>,
54+
pub id: SegmentUuid,
55+
}
56+
57+
impl RecordSegmentWriter {
58+
pub async fn from_segment(
59+
tenant: &str,
60+
database_id: &DatabaseUuid,
61+
segment: &Segment,
62+
blockfile_provider: &BlockfileProvider,
63+
cmek: Option<Cmek>,
64+
bloom_filter_manager: Option<BloomFilterManager>,
65+
) -> Result<Self, RecordSegmentWriterCreationError> {
66+
let segment_shards = segment
67+
.get_shards()
68+
.map_err(RecordSegmentWriterShardCreationError::SegmentShard)?;
69+
70+
// Create futures for all shards
71+
let futures: Vec<_> = segment_shards
72+
.iter()
73+
.map(|shard| {
74+
RecordSegmentWriterShard::from_segment(
75+
tenant,
76+
database_id,
77+
shard,
78+
blockfile_provider,
79+
cmek.clone(),
80+
bloom_filter_manager.clone(),
81+
)
82+
})
83+
.collect();
84+
85+
// Await all futures concurrently
86+
let writer_shards = futures::future::try_join_all(futures).await?;
87+
88+
Ok(Self {
89+
shards: writer_shards,
90+
id: segment.id,
91+
})
92+
}
93+
94+
pub async fn apply_materialized_log_chunk(
95+
&self,
96+
record_segment_reader: &Option<RecordSegmentReaderShard<'_>>,
97+
materialized: &MaterializeLogsResult,
98+
) -> Result<(), ApplyMaterializedLogError> {
99+
// Apply to all shards concurrently
100+
let futures = self
101+
.shards
102+
.iter()
103+
.map(|shard| shard.apply_materialized_log_chunk(record_segment_reader, materialized));
104+
105+
// TODO: Limit concurrency?
106+
futures::future::try_join_all(futures).await?;
107+
Ok(())
108+
}
109+
110+
pub async fn commit(self) -> Result<RecordSegmentFlusher, Box<dyn ChromaError>> {
111+
let futures = self
112+
.shards
113+
.into_iter()
114+
.map(|shard| Box::pin(shard.commit()));
115+
116+
let flusher_shards = futures::future::try_join_all(futures).await?;
117+
118+
Ok(RecordSegmentFlusher {
119+
shards: flusher_shards,
120+
id: self.id,
121+
})
122+
}
123+
}
124+
31125
#[derive(Clone)]
32126
pub struct RecordSegmentWriterShard {
33127
// These are Option<> so that we can take() them when we commit
@@ -73,6 +167,8 @@ pub enum RecordSegmentWriterShardCreationError {
73167
BloomFilterError(#[from] BloomFilterError),
74168
#[error("Record segment reader error: {0}")]
75169
RecordSegmentReaderShardError(#[from] RecordSegmentReaderShardCreationError),
170+
#[error("Segment shard error: {0}")]
171+
SegmentShard(#[from] chroma_types::SegmentShardError),
76172
#[error("Bloom filter rebuild error: {0}")]
77173
BloomFilterRebuildError(Box<dyn ChromaError>),
78174
}
@@ -708,6 +804,36 @@ impl ChromaError for ApplyMaterializedLogError {
708804
}
709805
}
710806

807+
#[derive(Debug)]
808+
pub struct RecordSegmentFlusher {
809+
shards: Vec<RecordSegmentFlusherShard>,
810+
pub id: SegmentUuid,
811+
}
812+
813+
impl RecordSegmentFlusher {
814+
pub async fn flush(self) -> Result<HashMap<String, Vec<String>>, Box<dyn ChromaError>> {
815+
// Flush all shards and collect file paths
816+
let mut all_file_paths = HashMap::new();
817+
818+
for shard in self.shards {
819+
let shard_paths = Box::pin(shard.flush()).await?;
820+
for (key, mut paths) in shard_paths {
821+
all_file_paths
822+
.entry(key)
823+
.or_insert_with(Vec::new)
824+
.append(&mut paths);
825+
}
826+
}
827+
828+
Ok(all_file_paths)
829+
}
830+
831+
pub fn count(&self) -> u64 {
832+
// Sum counts from all shards
833+
self.shards.iter().map(|shard| shard.count()).sum()
834+
}
835+
}
836+
711837
pub struct RecordSegmentFlusherShard {
712838
pub id: SegmentUuid,
713839
user_id_to_id_flusher: BlockfileFlusher,

0 commit comments

Comments
 (0)