Skip to content

Commit be5872c

Browse files
authored
feat!: streaming geoparquet writes (#841)
1 parent 643b93a commit be5872c

File tree

2 files changed

+150
-19
lines changed

2 files changed

+150
-19
lines changed

crates/core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
### Changed
1010

1111
- Use `Encoder` instead of `TableBuilder` for `geoarrow` (breaking) ([#840](https://github.com/stac-utils/rustac/pull/840))
12+
- Break apart geoparquet writing into a `WriterBuilder` and a `Writer` (breaking) ([#841](https://github.com/stac-utils/rustac/pull/841))
1213

1314
## [0.13.3] - 2025-11-13
1415

crates/core/src/geoparquet.rs

Lines changed: 149 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use crate::{
44
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
5-
geoarrow::{VERSION, VERSION_KEY},
5+
geoarrow::{Encoder, Options, VERSION, VERSION_KEY},
66
};
77
use bytes::Bytes;
88
use geoparquet::{
@@ -70,7 +70,9 @@ pub fn into_writer<W>(writer: W, item_collection: impl Into<ItemCollection>) ->
7070
where
7171
W: Write + Send,
7272
{
73-
WriterBuilder::new(writer, item_collection).write()
73+
WriterBuilder::new(writer)
74+
.build(item_collection.into().items)?
75+
.finish()
7476
}
7577

7678
/// Writes a [ItemCollection] to a [std::io::Write] as
@@ -88,58 +90,186 @@ where
8890
/// ```
8991
pub fn into_writer_with_compression<W>(
9092
writer: W,
93+
// TODO should we switch to just take a vector of items in the signature?
9194
item_collection: impl Into<ItemCollection>,
9295
compression: Compression,
9396
) -> Result<()>
9497
where
9598
W: Write + Send,
9699
{
97-
WriterBuilder::new(writer, item_collection)
100+
WriterBuilder::new(writer)
98101
.compression(compression)
99-
.write()
102+
.build(item_collection.into().items)
103+
.and_then(|writer| writer.finish())
100104
}
101105

102-
struct WriterBuilder<W: Write + Send> {
106+
/// Builder for a stac-geoparquet writer.
107+
#[derive(Debug)]
108+
pub struct WriterBuilder<W: Write + Send> {
103109
writer: W,
104-
item_collection: ItemCollection,
110+
options: Options,
105111
compression: Option<Compression>,
106112
}
107113

114+
/// Write items to stac-geoparquet.
115+
#[allow(missing_debug_implementations)]
116+
pub struct Writer<W: Write + Send> {
117+
geoarrow_encoder: Encoder,
118+
encoder: GeoParquetRecordBatchEncoder,
119+
arrow_writer: ArrowWriter<W>,
120+
}
121+
108122
impl<W: Write + Send> WriterBuilder<W> {
109-
fn new(writer: W, item_collection: impl Into<ItemCollection>) -> WriterBuilder<W> {
123+
/// Creates a new writer builder.
124+
///
125+
/// # Examples
126+
///
127+
/// ```
128+
/// use std::io::Cursor;
129+
/// use stac::{Item, geoparquet::WriterBuilder};
130+
///
131+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
132+
/// let cursor = Cursor::new(Vec::new());
133+
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
134+
/// ```
135+
pub fn new(writer: W) -> WriterBuilder<W> {
110136
WriterBuilder {
111137
writer,
112-
item_collection: item_collection.into(),
138+
options: Options::default(),
113139
compression: Some(default_compression()),
114140
}
115141
}
116142

117-
fn compression(mut self, compression: impl Into<Option<Compression>>) -> WriterBuilder<W> {
143+
/// Sets the parquet compression.
144+
///
145+
/// # Examples
146+
///
147+
/// ```
148+
/// use std::io::Cursor;
149+
/// use stac::{Item, geoparquet::{WriterBuilder, Compression}};
150+
///
151+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
152+
/// let cursor = Cursor::new(Vec::new());
153+
/// let writer = WriterBuilder::new(cursor)
154+
/// .compression(Compression::SNAPPY)
155+
/// .build(vec![item])
156+
/// .unwrap();
157+
/// ```
158+
pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> WriterBuilder<W> {
118159
self.compression = compression.into();
119160
self
120161
}
121162

122-
fn write(self) -> Result<()> {
123-
let (record_batch, schema) = crate::geoarrow::encode(self.item_collection.items)?;
163+
/// Sets the geoarrow encoding options
164+
///
165+
/// # Examples
166+
///
167+
/// ```
168+
/// use std::io::Cursor;
169+
/// use stac::{Item, geoarrow::Options, geoparquet::WriterBuilder};
170+
///
171+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
172+
/// let cursor = Cursor::new(Vec::new());
173+
/// let options = Options::default();
174+
/// let writer = WriterBuilder::new(cursor)
175+
/// .options(options)
176+
/// .build(vec![item])
177+
/// .unwrap();
178+
/// ```
179+
pub fn options(mut self, options: Options) -> WriterBuilder<W> {
180+
self.options = options;
181+
self
182+
}
183+
184+
/// Builds the writer.
185+
///
186+
/// # Examples
187+
///
188+
/// ```
189+
/// use std::io::Cursor;
190+
/// use stac::{Item, geoparquet::WriterBuilder};
191+
///
192+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
193+
/// let cursor = Cursor::new(Vec::new());
194+
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
195+
/// writer.finish().unwrap();
196+
/// ```
197+
pub fn build(self, items: Vec<Item>) -> Result<Writer<W>> {
198+
Writer::new(self.writer, self.options, self.compression, items)
199+
}
200+
}
201+
202+
impl<W: Write + Send> Writer<W> {
203+
fn new(
204+
writer: W,
205+
options: Options,
206+
compression: Option<Compression>,
207+
items: Vec<Item>,
208+
) -> Result<Self> {
209+
let (geoarrow_encoder, record_batch) = Encoder::new(items, options)?;
124210
let options = GeoParquetWriterOptionsBuilder::default()
125211
.set_primary_column("geometry".to_string())
126212
.build();
127-
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&schema, &options)?;
213+
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&record_batch.schema(), &options)?;
128214
let mut builder = WriterProperties::builder();
129-
if let Some(compression) = self.compression {
215+
if let Some(compression) = compression {
130216
builder = builder.set_compression(compression);
131217
}
132218
let properties = builder.build();
133-
let mut writer =
134-
ArrowWriter::try_new(self.writer, encoder.target_schema(), Some(properties))?;
219+
let mut arrow_writer =
220+
ArrowWriter::try_new(writer, encoder.target_schema(), Some(properties))?;
135221
let record_batch = encoder.encode_record_batch(&record_batch)?;
136-
writer.write(&record_batch)?;
137-
writer.append_key_value_metadata(encoder.into_keyvalue()?);
138-
writer.append_key_value_metadata(KeyValue::new(
222+
arrow_writer.write(&record_batch)?;
223+
Ok(Writer {
224+
geoarrow_encoder,
225+
encoder,
226+
arrow_writer,
227+
})
228+
}
229+
230+
/// Writes more items to this writer.
231+
///
232+
/// # Examples
233+
///
234+
/// ```
235+
/// use std::io::Cursor;
236+
/// use stac::{Item, geoparquet::WriterBuilder};
237+
///
238+
/// let item1: Item = stac::read("examples/simple-item.json").unwrap();
239+
/// let item2 = item1.clone();
240+
/// let cursor = Cursor::new(Vec::new());
241+
/// let mut writer = WriterBuilder::new(cursor).build(vec![item1]).unwrap();
242+
/// writer.write(vec![item2]).unwrap();
243+
/// writer.finish().unwrap();
244+
/// ```
245+
pub fn write(&mut self, items: Vec<Item>) -> Result<()> {
246+
let record_batch = self.geoarrow_encoder.encode(items)?;
247+
let record_batch = self.encoder.encode_record_batch(&record_batch)?;
248+
self.arrow_writer.write(&record_batch)?;
249+
Ok(())
250+
}
251+
252+
/// Finishes writing.
253+
///
254+
/// # Examples
255+
///
256+
/// ```
257+
/// use std::io::Cursor;
258+
/// use stac::{Item, geoparquet::WriterBuilder};
259+
///
260+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
261+
/// let cursor = Cursor::new(Vec::new());
262+
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
263+
/// writer.finish().unwrap();
264+
/// ```
265+
pub fn finish(mut self) -> Result<()> {
266+
self.arrow_writer
267+
.append_key_value_metadata(self.encoder.into_keyvalue()?);
268+
self.arrow_writer.append_key_value_metadata(KeyValue::new(
139269
VERSION_KEY.to_string(),
140270
Some(VERSION.to_string()),
141271
));
142-
let _ = writer.finish()?;
272+
let _ = self.arrow_writer.finish()?;
143273
Ok(())
144274
}
145275
}

0 commit comments

Comments
 (0)