Skip to content

Commit ec6e7de

Browse files
authored
feat: add geoparquet writer encoder and object writing (#863)
## Closes - Closes #847 ## Related to We'll want to add reading eventually as well, but that's a follow-on. ## Checklist Delete any checklist items that do not apply (e.g. if your change is minor, it may not require documentation updates). - [x] Unit tests - [x] Documentation, including doctests - [x] Git history is linear - [x] Commit messages are descriptive - [x] (optional) Git commit messages follow [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) - [x] Code is formatted (`cargo fmt`) - [x] `cargo test` - [x] Changes are added to the CHANGELOG <!-- markdownlint-disable-file MD041 -->
1 parent e06de28 commit ec6e7de

File tree

6 files changed

+180
-22
lines changed

6 files changed

+180
-22
lines changed

crates/core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
### Added
1010

1111
- Add `max_row_group_size` parameter to `geoparquet::WriterBuilder` with default value of `150_000` ([#846](https://github.com/stac-utils/rustac/pull/846))
12+
- An intermediate `WriterEncoder` for writing **stac-geoparquet** ([#863](https://github.com/stac-utils/rustac/pull/863))
1213

1314
## [0.14.0] - 2025-11-14
1415

crates/core/src/geoparquet.rs

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
55
geoarrow::{Encoder, Options, VERSION, VERSION_KEY},
66
};
7+
use arrow_array::RecordBatch;
78
use bytes::Bytes;
89
use geoparquet::{
910
reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader},
@@ -35,6 +36,13 @@ pub struct WriterOptions {
3536
pub max_row_group_size: usize,
3637
}
3738

39+
/// An encoder for writing stac-geoparquet
40+
#[allow(missing_debug_implementations)]
41+
pub struct WriterEncoder {
42+
geoarrow_encoder: Encoder,
43+
encoder: GeoParquetRecordBatchEncoder,
44+
}
45+
3846
impl WriterOptions {
3947
/// Creates a new WriterOptions with default values.
4048
///
@@ -178,10 +186,8 @@ pub struct WriterBuilder<W: Write + Send> {
178186
/// Write items to stac-geoparquet.
179187
#[allow(missing_debug_implementations)]
180188
pub struct Writer<W: Write + Send> {
181-
geoarrow_encoder: Encoder,
182-
// We make this an option so we can consume it during write but keep write
183-
// as only requiring a mutable reference.
184-
encoder: Option<GeoParquetRecordBatchEncoder>,
189+
// We make this an option so we can consume the encoder without dropping it.
190+
encoder: Option<WriterEncoder>,
185191
arrow_writer: ArrowWriter<W>,
186192
}
187193

@@ -268,30 +274,86 @@ impl<W: Write + Send> WriterBuilder<W> {
268274
}
269275
}
270276

277+
impl WriterEncoder {
278+
/// Creates a new writer encoder.
279+
///
280+
/// # Examples
281+
///
282+
/// ```
283+
/// use stac::{Item, geoarrow::Options, geoparquet::WriterEncoder};
284+
///
285+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
286+
/// let options = Options::default();
287+
/// let (encoder, record_batch) = WriterEncoder::new(options, vec![item]).unwrap();
288+
/// assert_eq!(record_batch.num_rows(), 1);
289+
/// ```
290+
pub fn new(options: Options, items: Vec<Item>) -> Result<(WriterEncoder, RecordBatch)> {
291+
let (geoarrow_encoder, record_batch) = Encoder::new(items, options)?;
292+
let options = GeoParquetWriterOptionsBuilder::default()
293+
.set_primary_column("geometry".to_string())
294+
.build();
295+
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&record_batch.schema(), &options)?;
296+
let record_batch = encoder.encode_record_batch(&record_batch)?;
297+
Ok((
298+
WriterEncoder {
299+
geoarrow_encoder,
300+
encoder,
301+
},
302+
record_batch,
303+
))
304+
}
305+
306+
/// Encodes items into a record batch.
307+
///
308+
/// # Examples
309+
///
310+
/// ```
311+
/// use stac::{Item, geoarrow::Options, geoparquet::WriterEncoder};
312+
///
313+
/// let item1: Item = stac::read("examples/simple-item.json").unwrap();
314+
/// let item2 = item1.clone();
315+
/// let options = Options::default();
316+
/// let (mut encoder, record_batch) = WriterEncoder::new(options, vec![item1]).unwrap();
317+
/// let record_batch = encoder.encode(vec![item2]).unwrap();
318+
/// assert_eq!(record_batch.num_rows(), 1);
319+
/// ```
320+
pub fn encode(&mut self, items: Vec<Item>) -> Result<RecordBatch> {
321+
let record_batch = self.geoarrow_encoder.encode(items)?;
322+
let record_batch = self.encoder.encode_record_batch(&record_batch)?;
323+
Ok(record_batch)
324+
}
325+
326+
/// Consumes this encoder and returns the keys and values.
327+
///
328+
/// # Examples
329+
///
330+
/// ```
331+
/// use stac::{Item, geoarrow::Options, geoparquet::WriterEncoder};
332+
///
333+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
334+
/// let options = Options::default();
335+
/// let (encoder, _) = WriterEncoder::new(options, vec![item]).unwrap();
336+
/// let key_value = encoder.into_keyvalue().unwrap();
337+
/// assert_eq!(key_value.key, "geo");
338+
/// ```
339+
pub fn into_keyvalue(self) -> Result<KeyValue> {
340+
let keyvalue = self.encoder.into_keyvalue()?;
341+
Ok(keyvalue)
342+
}
343+
}
344+
271345
impl<W: Write + Send> Writer<W> {
272346
fn new(
273347
writer: W,
274348
options: Options,
275349
writer_options: WriterOptions,
276350
items: Vec<Item>,
277351
) -> Result<Self> {
278-
let (geoarrow_encoder, record_batch) = Encoder::new(items, options)?;
279-
let options = GeoParquetWriterOptionsBuilder::default()
280-
.set_primary_column("geometry".to_string())
281-
.build();
282-
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&record_batch.schema(), &options)?;
283-
let mut builder = WriterProperties::builder();
284-
if let Some(compression) = writer_options.compression {
285-
builder = builder.set_compression(compression);
286-
}
287-
builder = builder.set_max_row_group_size(writer_options.max_row_group_size);
288-
let properties = builder.build();
352+
let (encoder, record_batch) = WriterEncoder::new(options, items)?;
289353
let mut arrow_writer =
290-
ArrowWriter::try_new(writer, encoder.target_schema(), Some(properties))?;
291-
let record_batch = encoder.encode_record_batch(&record_batch)?;
354+
ArrowWriter::try_new(writer, record_batch.schema(), Some(writer_options.into()))?;
292355
arrow_writer.write(&record_batch)?;
293356
Ok(Writer {
294-
geoarrow_encoder,
295357
encoder: Some(encoder),
296358
arrow_writer,
297359
})
@@ -315,9 +377,8 @@ impl<W: Write + Send> Writer<W> {
315377
/// writer.finish().unwrap();
316378
/// ```
317379
pub fn write(&mut self, items: Vec<Item>) -> Result<()> {
318-
let record_batch = self.geoarrow_encoder.encode(items)?;
319380
let record_batch = if let Some(encoder) = self.encoder.as_mut() {
320-
encoder.encode_record_batch(&record_batch)?
381+
encoder.encode(items)?
321382
} else {
322383
return Err(Error::ClosedGeoparquetWriter);
323384
};
@@ -491,6 +552,17 @@ impl IntoGeoparquet for serde_json::Value {
491552
}
492553
}
493554

555+
impl From<WriterOptions> for WriterProperties {
556+
fn from(value: WriterOptions) -> Self {
557+
let mut builder = WriterProperties::builder();
558+
if let Some(compression) = value.compression {
559+
builder = builder.set_compression(compression);
560+
}
561+
builder = builder.set_max_row_group_size(value.max_row_group_size);
562+
builder.build()
563+
}
564+
}
565+
494566
#[cfg(test)]
495567
mod tests {
496568
use crate::{FromGeoparquet, Item, ItemCollection, SelfHref, Value, geoparquet::WriterBuilder};

crates/io/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88

99
### Added
1010

11+
- Writing **stac-geoparquet** to an object store ([#863](https://github.com/stac-utils/rustac/pull/863))
1112
- API client ([#864](https://github.com/stac-utils/rustac/pull/864))
1213

1314
## [0.1.2] - 2025-11-14

crates/io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ futures.workspace = true
2727
http.workspace = true
2828
jsonschema = { workspace = true, optional = true }
2929
object_store = { workspace = true, optional = true }
30-
parquet = { workspace = true, optional = true }
30+
parquet = { workspace = true, optional = true, features = ["arrow", "async", "object_store"] }
3131
reqwest = { workspace = true, features = ["json", "blocking"] }
3232
serde.workspace = true
3333
serde_json = { workspace = true, features = ["preserve_order"] }

crates/io/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod ndjson;
88
mod read;
99
mod realized_href;
1010
#[cfg(feature = "store")]
11-
mod store;
11+
pub mod store;
1212
mod write;
1313

1414
#[cfg(feature = "geoparquet")]

crates/io/src/store.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,61 @@ where
166166
}
167167
}
168168

169+
#[cfg(feature = "geoparquet")]
170+
pub mod geoparquet {
171+
use crate::Result;
172+
use object_store::{ObjectStore, path::Path};
173+
use parquet::arrow::async_writer::{AsyncArrowWriter, ParquetObjectWriter};
174+
use stac::Item;
175+
use stac::geoarrow::Options;
176+
use stac::geoparquet::{WriterEncoder, WriterOptions};
177+
use std::sync::Arc;
178+
179+
/// Writes stac-geoparquet to an object store.
180+
pub struct StacGeoparquetObjectWriter {
181+
encoder: WriterEncoder,
182+
writer: AsyncArrowWriter<ParquetObjectWriter>,
183+
}
184+
185+
impl StacGeoparquetObjectWriter {
186+
pub async fn new(
187+
store: Arc<dyn ObjectStore>,
188+
path: Path,
189+
items: Vec<Item>,
190+
options: Options,
191+
writer_options: WriterOptions,
192+
) -> Result<StacGeoparquetObjectWriter> {
193+
let (encoder, record_batch) = WriterEncoder::new(options, items)?;
194+
let object_store_writer = ParquetObjectWriter::new(store.clone(), path);
195+
let mut writer = AsyncArrowWriter::try_new(
196+
object_store_writer,
197+
record_batch.schema(),
198+
Some(writer_options.into()),
199+
)?;
200+
writer.write(&record_batch).await?;
201+
Ok(StacGeoparquetObjectWriter { encoder, writer })
202+
}
203+
204+
pub async fn write(&mut self, items: Vec<Item>) -> Result<()> {
205+
let record_batch = self.encoder.encode(items)?;
206+
self.writer.write(&record_batch).await?;
207+
Ok(())
208+
}
209+
210+
pub async fn close(mut self) -> Result<()> {
211+
self.writer
212+
.append_key_value_metadata(self.encoder.into_keyvalue()?);
213+
self.writer.close().await?;
214+
Ok(())
215+
}
216+
}
217+
}
218+
169219
#[cfg(test)]
170220
mod tests {
221+
use object_store::{memory::InMemory, path::Path};
171222
use stac::{Item, SelfHref};
223+
use std::sync::Arc;
172224

173225
#[tokio::test]
174226
async fn get_local() {
@@ -186,4 +238,36 @@ mod tests {
186238
let href = format!("file:///{path}");
187239
let _: Item = store.get(href).await.unwrap();
188240
}
241+
242+
#[tokio::test]
243+
#[cfg(feature = "geoparquet")]
244+
async fn write_parquet() {
245+
use object_store::ObjectStore;
246+
247+
use super::geoparquet::StacGeoparquetObjectWriter;
248+
249+
let store = Arc::new(InMemory::new());
250+
let item: Item = stac::read("examples/simple-item.json").unwrap();
251+
let mut writer = StacGeoparquetObjectWriter::new(
252+
store.clone(),
253+
Path::from("test"),
254+
vec![item.clone()],
255+
Default::default(),
256+
Default::default(),
257+
)
258+
.await
259+
.unwrap();
260+
writer.write(vec![item]).await.unwrap();
261+
writer.close().await.unwrap();
262+
263+
let bytes = store
264+
.get(&Path::from("test"))
265+
.await
266+
.unwrap()
267+
.bytes()
268+
.await
269+
.unwrap();
270+
let item_collection = stac::geoparquet::from_reader(bytes).unwrap();
271+
assert_eq!(item_collection.items.len(), 2);
272+
}
189273
}

0 commit comments

Comments
 (0)