Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 7 additions & 35 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Missing timestamp in write batch"))]
BatchMissingTimestamp { backtrace: Backtrace },

#[snafu(display("Failed to write columns, source: {}", source))]
FlushIo {
source: object_store::Error,
Expand Down Expand Up @@ -184,13 +181,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("IO failed while reading Parquet file: {}, source: {}", file, source))]
ReadParquetIo {
file: String,
source: std::io::Error,
backtrace: Backtrace,
},

#[snafu(display("Region is under {} state, cannot proceed operation", state))]
InvalidRegionState {
state: &'static str,
Expand Down Expand Up @@ -222,12 +212,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Region version not found in manifest, the region: {}", region_name))]
VersionNotFound {
region_name: String,
backtrace: Backtrace,
},

#[snafu(display(
"Sequence of region should increase monotonically (should be {} < {})",
prev,
Expand Down Expand Up @@ -317,18 +301,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Timestamp column type illegal, data type: {:?}", data_type))]
IllegalTimestampColumnType { data_type: ConcreteDataType },

#[snafu(display(
"Failed to convert between ColumnSchema and ColumnMetadata, source: {}",
source
))]
ConvertColumnSchema {
#[snafu(backtrace)]
source: MetadataError,
},

#[snafu(display("Incompatible schema to read, reason: {}", reason))]
CompatRead {
reason: String,
Expand Down Expand Up @@ -437,6 +409,9 @@ pub enum Error {

#[snafu(display("More columns than expected in the request"))]
MoreColumnThanExpected { backtrace: Backtrace },

#[snafu(display("Failed to decode parquet file time range, msg: {}", msg))]
DecodeParquetTimeRange { msg: String, backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -448,12 +423,10 @@ impl ErrorExt for Error {
match self {
InvalidScanIndex { .. }
| BatchMissingColumn { .. }
| BatchMissingTimestamp { .. }
| InvalidProjection { .. }
| BuildBatch { .. }
| NotInSchemaToCompat { .. }
| WriteToOldVersion { .. }
| IllegalTimestampColumnType { .. }
| CreateRecordBatch { .. }
| RequestTooLarge { .. }
| TypeMismatch { .. }
Expand All @@ -469,7 +442,6 @@ impl ErrorExt for Error {
| DecodeMetaActionList { .. }
| Readline { .. }
| WalDataCorrupted { .. }
| VersionNotFound { .. }
| SequenceNotMonotonic { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. }
Expand All @@ -496,19 +468,19 @@ impl ErrorExt for Error {
| ManifestProtocolForbidRead { .. }
| ManifestProtocolForbidWrite { .. }
| ReadParquet { .. }
| ReadParquetIo { .. }
| InvalidRegionState { .. }
| ReadWal { .. } => StatusCode::StorageUnavailable,

UnknownColumn { .. } => StatusCode::TableColumnNotFound,

InvalidAlterRequest { source, .. }
| InvalidRegionDesc { source, .. }
| ConvertColumnSchema { source, .. } => source.status_code(),
InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => {
source.status_code()
}
PushBatch { source, .. } => source.status_code(),
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/storage/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileMeta, WriteOptions};
use crate::sst::{AccessLayerRef, FileMeta, SstInfo, WriteOptions};
use crate::wal::Wal;

/// Default write buffer size (32M).
Expand Down Expand Up @@ -185,12 +185,18 @@ impl<S: LogStore> FlushJob<S> {
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
futures.push(async move {
self.sst_layer
let SstInfo {
start_timestamp,
end_timestamp,
} = self
.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())
.await?;

Ok(FileMeta {
file_name,
start_timestamp,
end_timestamp,
level: 0,
})
});
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/manifest/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
level: 0,
})
.collect(),
files_to_remove: files_to_remove
.iter()
.map(|f| FileMeta {
file_name: f.to_string(),
start_timestamp: None,
end_timestamp: None,
level: 0,
})
.collect(),
Expand Down
17 changes: 12 additions & 5 deletions src/storage/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod parquet;
use std::sync::Arc;

use async_trait::async_trait;
use common_time::Timestamp;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use table::predicate::Predicate;
Expand Down Expand Up @@ -176,6 +177,8 @@ impl FileHandleInner {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileMeta {
pub file_name: String,
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
/// SST level of the file.
pub level: u8,
}
Expand All @@ -195,6 +198,12 @@ pub struct ReadOptions {
pub predicate: Predicate,
}

#[derive(Debug)]
pub struct SstInfo {
pub start_timestamp: Option<Timestamp>,
pub end_timestamp: Option<Timestamp>,
}

/// SST access layer.
#[async_trait]
pub trait AccessLayer: Send + Sync + std::fmt::Debug {
Expand All @@ -204,7 +213,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
file_name: &str,
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()>;
) -> Result<SstInfo>;

/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
Expand Down Expand Up @@ -240,14 +249,12 @@ impl AccessLayer for FsAccessLayer {
file_name: &str,
iter: BoxedBatchIterator,
opts: &WriteOptions,
) -> Result<()> {
) -> Result<SstInfo> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(file_name);
let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone());

writer.write_sst(opts).await?;
Ok(())
writer.write_sst(opts).await
}

async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader> {
Expand Down
Loading