Skip to content

Commit ba95ec5

Browse files
authored
fix: don't consume the geoparquet writer when finishing (#842)
1 parent be5872c commit ba95ec5

File tree

2 files changed

+65
-46
lines changed

2 files changed

+65
-46
lines changed

crates/core/src/error.rs

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,6 @@ use thiserror::Error;
55
#[derive(Error, Debug)]
66
#[non_exhaustive]
77
pub enum Error {
8-
/// [arrow_schema::ArrowError]
9-
#[error(transparent)]
10-
#[cfg(feature = "geoarrow")]
11-
Arrow(#[from] arrow_schema::ArrowError),
12-
13-
/// The schema of two sets of items don't match.
14-
#[cfg(feature = "geoarrow")]
15-
#[error("Arrow schema mismatch")]
16-
ArrowSchemaMismatch,
17-
18-
/// The arrow table is empty
19-
#[cfg(feature = "geoarrow")]
20-
#[error("Empty arrow table")]
21-
EmptyArrowTable,
22-
238
/// [chrono::ParseError]
249
#[error(transparent)]
2510
ChronoParse(#[from] chrono::ParseError),
@@ -28,19 +13,10 @@ pub enum Error {
2813
#[error("{0} is not enabled")]
2914
FeatureNotEnabled(&'static str),
3015

31-
/// [geoarrow_schema::error::GeoArrowError]
32-
#[error(transparent)]
33-
#[cfg(feature = "geoarrow")]
34-
GeoArrow(#[from] geoarrow_schema::error::GeoArrowError),
35-
3616
/// [geojson::Error]
3717
#[error(transparent)]
3818
Geojson(#[from] Box<geojson::Error>),
3919

40-
/// [std::io::Error]
41-
#[error(transparent)]
42-
Io(#[from] std::io::Error),
43-
4420
/// Returned when a STAC object has the wrong type field.
4521
#[error("incorrect type: expected={expected}, actual={actual}")]
4622
IncorrectType {
@@ -64,32 +40,26 @@ pub enum Error {
6440
#[error("invalid datetime: {0}")]
6541
InvalidDatetime(String),
6642

43+
/// [std::io::Error]
44+
#[error(transparent)]
45+
Io(#[from] std::io::Error),
46+
6747
/// Returned when there is not a required field on a STAC object
6848
#[error("no \"{0}\" field in the JSON object")]
6949
MissingField(&'static str),
7050

71-
/// No geoparquet metadata in a stac-geoparquet file.
72-
#[error("no geoparquet metadata")]
73-
#[cfg(feature = "geoparquet")]
74-
MissingGeoparquetMetadata,
51+
/// There is not an href, when an href is required.
52+
#[error("no href")]
53+
NoHref,
7554

7655
/// There are no items, when items are required.
7756
#[error("no items")]
7857
NoItems,
7958

80-
/// There is not an href, when an href is required.
81-
#[error("no href")]
82-
NoHref,
83-
8459
/// This is not a JSON object.
8560
#[error("json value is not an object")]
8661
NotAnObject(serde_json::Value),
8762

88-
/// [parquet::errors::ParquetError]
89-
#[error(transparent)]
90-
#[cfg(feature = "geoparquet")]
91-
Parquet(#[from] parquet::errors::ParquetError),
92-
9363
/// [serde_json::Error]
9464
#[error(transparent)]
9565
SerdeJson(#[from] serde_json::Error),
@@ -114,8 +84,43 @@ pub enum Error {
11484
#[error(transparent)]
11585
UrlParse(#[from] url::ParseError),
11686

87+
/// [arrow_schema::ArrowError]
88+
#[error(transparent)]
89+
#[cfg(feature = "geoarrow")]
90+
Arrow(#[from] arrow_schema::ArrowError),
91+
92+
/// The schema of two sets of items don't match.
93+
#[cfg(feature = "geoarrow")]
94+
#[error("Arrow schema mismatch")]
95+
ArrowSchemaMismatch,
96+
97+
/// The arrow table is empty
98+
#[cfg(feature = "geoarrow")]
99+
#[error("Empty arrow table")]
100+
EmptyArrowTable,
101+
102+
/// [geoarrow_schema::error::GeoArrowError]
103+
#[error(transparent)]
104+
#[cfg(feature = "geoarrow")]
105+
GeoArrow(#[from] geoarrow_schema::error::GeoArrowError),
106+
117107
/// [wkb::error::WkbError]
118108
#[error(transparent)]
119109
#[cfg(feature = "geoarrow")]
120110
Wkb(#[from] wkb::error::WkbError),
111+
112+
/// The geoparquet writer has been closed.
113+
#[error("The geoparquet writer has already been closed")]
114+
#[cfg(feature = "geoparquet")]
115+
ClosedGeoparquetWriter,
116+
117+
/// No geoparquet metadata in a stac-geoparquet file.
118+
#[error("no geoparquet metadata")]
119+
#[cfg(feature = "geoparquet")]
120+
MissingGeoparquetMetadata,
121+
122+
/// [parquet::errors::ParquetError]
123+
#[error(transparent)]
124+
#[cfg(feature = "geoparquet")]
125+
Parquet(#[from] parquet::errors::ParquetError),
121126
}

crates/core/src/geoparquet.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ where
100100
WriterBuilder::new(writer)
101101
.compression(compression)
102102
.build(item_collection.into().items)
103-
.and_then(|writer| writer.finish())
103+
.and_then(|mut writer| writer.finish())
104104
}
105105

106106
/// Builder for a stac-geoparquet writer.
@@ -115,7 +115,9 @@ pub struct WriterBuilder<W: Write + Send> {
115115
#[allow(missing_debug_implementations)]
116116
pub struct Writer<W: Write + Send> {
117117
geoarrow_encoder: Encoder,
118-
encoder: GeoParquetRecordBatchEncoder,
118+
// We make this an option so we can consume it during write but keep write
119+
// as only requiring a mutable reference.
120+
encoder: Option<GeoParquetRecordBatchEncoder>,
119121
arrow_writer: ArrowWriter<W>,
120122
}
121123

@@ -191,7 +193,7 @@ impl<W: Write + Send> WriterBuilder<W> {
191193
///
192194
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
193195
/// let cursor = Cursor::new(Vec::new());
194-
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
196+
/// let mut writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
195197
/// writer.finish().unwrap();
196198
/// ```
197199
pub fn build(self, items: Vec<Item>) -> Result<Writer<W>> {
@@ -222,13 +224,15 @@ impl<W: Write + Send> Writer<W> {
222224
arrow_writer.write(&record_batch)?;
223225
Ok(Writer {
224226
geoarrow_encoder,
225-
encoder,
227+
encoder: Some(encoder),
226228
arrow_writer,
227229
})
228230
}
229231

230232
/// Writes more items to this writer.
231233
///
234+
/// It's an error to write after `finish` has been called.
235+
///
232236
/// # Examples
233237
///
234238
/// ```
@@ -244,13 +248,19 @@ impl<W: Write + Send> Writer<W> {
244248
/// ```
245249
pub fn write(&mut self, items: Vec<Item>) -> Result<()> {
246250
let record_batch = self.geoarrow_encoder.encode(items)?;
247-
let record_batch = self.encoder.encode_record_batch(&record_batch)?;
251+
let record_batch = if let Some(encoder) = self.encoder.as_mut() {
252+
encoder.encode_record_batch(&record_batch)?
253+
} else {
254+
return Err(Error::ClosedGeoparquetWriter);
255+
};
248256
self.arrow_writer.write(&record_batch)?;
249257
Ok(())
250258
}
251259

252260
/// Finishes writing.
253261
///
262+
/// It's an error to call finish twice.
263+
///
254264
/// # Examples
255265
///
256266
/// ```
@@ -259,12 +269,16 @@ impl<W: Write + Send> Writer<W> {
259269
///
260270
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
261271
/// let cursor = Cursor::new(Vec::new());
262-
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
272+
/// let mut writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
263273
/// writer.finish().unwrap();
264274
/// ```
265-
pub fn finish(mut self) -> Result<()> {
266-
self.arrow_writer
267-
.append_key_value_metadata(self.encoder.into_keyvalue()?);
275+
pub fn finish(&mut self) -> Result<()> {
276+
if let Some(encoder) = self.encoder.take() {
277+
self.arrow_writer
278+
.append_key_value_metadata(encoder.into_keyvalue()?);
279+
} else {
280+
return Err(Error::ClosedGeoparquetWriter);
281+
}
268282
self.arrow_writer.append_key_value_metadata(KeyValue::new(
269283
VERSION_KEY.to_string(),
270284
Some(VERSION.to_string()),

0 commit comments

Comments
 (0)