@@ -49,11 +49,11 @@ use geo_traits::to_geo::{
4949 ToGeoGeometry , ToGeoGeometryCollection , ToGeoLineString , ToGeoMultiLineString , ToGeoMultiPoint ,
5050 ToGeoMultiPolygon , ToGeoPoint , ToGeoPolygon , ToGeoRect ,
5151} ;
52- use geoarrow_array:: ArrayAccessor ;
5352use geoarrow_array:: array:: from_arrow_array;
5453use geoarrow_array:: cast:: AsGeoArrowArray ;
54+ use geoarrow_array:: { ArrayAccessor , GeoArrowArray , GeoArrowType } ;
5555use serde_json:: { Value , json, map:: Map as JsonMap } ;
56- use std:: iter;
56+ use std:: { iter, sync :: Arc } ;
5757
5858use super :: DATETIME_COLUMNS ;
5959
@@ -428,78 +428,90 @@ fn set_column_for_json_rows(
428428 Ok ( ( ) )
429429}
430430
431- /// Creates JSON values from a record batch reader.
431+ fn set_geometry_column_for_json_rows (
432+ rows : & mut [ Option < JsonMap < String , Value > > ] ,
433+ array : Arc < dyn GeoArrowArray > ,
434+ col_name : & str ,
435+ ) -> Result < ( ) , Error > {
436+ for ( i, row) in rows
437+ . iter_mut ( )
438+ . enumerate ( )
439+ . filter_map ( |( i, maybe_row) | maybe_row. as_mut ( ) . map ( |row| ( i, row) ) )
440+ {
441+ use GeoArrowType :: * ;
442+ let value = match array. data_type ( ) {
443+ Point ( _) => geojson:: Value :: from ( & array. as_point ( ) . value ( i) ?. to_point ( ) ) ,
444+ LineString ( _) => {
445+ geojson:: Value :: from ( & array. as_line_string ( ) . value ( i) ?. to_line_string ( ) )
446+ }
447+ Polygon ( _) => geojson:: Value :: from ( & array. as_polygon ( ) . value ( i) ?. to_polygon ( ) ) ,
448+ MultiPoint ( _) => {
449+ geojson:: Value :: from ( & array. as_multi_point ( ) . value ( i) ?. to_multi_point ( ) )
450+ }
451+ MultiLineString ( _) => geojson:: Value :: from (
452+ & array
453+ . as_multi_line_string ( )
454+ . value ( i) ?
455+ . to_multi_line_string ( ) ,
456+ ) ,
457+ MultiPolygon ( _) => {
458+ geojson:: Value :: from ( & array. as_multi_polygon ( ) . value ( i) ?. to_multi_polygon ( ) )
459+ }
460+ Geometry ( _) => geojson:: Value :: from ( & array. as_geometry ( ) . value ( i) ?. to_geometry ( ) ) ,
461+ GeometryCollection ( _) => geojson:: Value :: from (
462+ & array
463+ . as_geometry_collection ( )
464+ . value ( i) ?
465+ . to_geometry_collection ( ) ,
466+ ) ,
467+ Rect ( _) => geojson:: Value :: from ( & array. as_rect ( ) . value ( i) ?. to_rect ( ) ) ,
468+ Wkb ( _) => geojson:: Value :: from ( & array. as_wkb :: < i32 > ( ) . value ( i) ?. to_geometry ( ) ) ,
469+ LargeWkb ( _) => geojson:: Value :: from ( & array. as_wkb :: < i64 > ( ) . value ( i) ?. to_geometry ( ) ) ,
470+ Wkt ( _) => geojson:: Value :: from ( & array. as_wkt :: < i32 > ( ) . value ( i) ?. to_geometry ( ) ) ,
471+ LargeWkt ( _) => geojson:: Value :: from ( & array. as_wkt :: < i64 > ( ) . value ( i) ?. to_geometry ( ) ) ,
472+ } ;
473+ let _ = row. insert (
474+ col_name. to_string ( ) ,
475+ serde_json:: to_value ( geojson:: Geometry :: new ( value) ) ?,
476+ ) ;
477+ }
478+ Ok ( ( ) )
479+ }
480+
481+ /// Creates STAC JSON values from a record batch reader.
432482pub fn from_record_batch_reader < R : RecordBatchReader > (
433483 reader : R ,
434484) -> Result < Vec < serde_json:: Map < String , Value > > , Error > {
435- use geoarrow_array:: GeoArrowType ;
436-
437- let schema = reader. schema ( ) ;
438- let geometry_index = schema. column_with_name ( "geometry" ) . map ( |( index, _) | index) ;
485+ let mut rows = Vec :: new ( ) ;
486+ for result in reader {
487+ let record_batch = result?;
488+ rows. extend ( record_batch_to_json_rows ( record_batch) ?) ;
489+ }
490+ Ok ( rows)
491+ }
439492
440- // For now we collect all batches into memory, but in the future we could iterate on the stream
441- // directly.
442- let batches = reader. collect :: < Result < Vec < _ > , _ > > ( ) ?;
443- let mut json_rows = record_batches_to_json_rows ( & batches, geometry_index) ?;
444- let mut items = Vec :: new ( ) ;
445- if let Some ( index) = geometry_index {
446- let field = schema. field ( index) ;
447- for batch in & batches {
448- let array = batch. column ( index) ;
449- let chunk = from_arrow_array ( array, field) ?;
450- for i in 0 ..chunk. len ( ) {
451- use GeoArrowType :: * ;
452- let value = match chunk. data_type ( ) {
453- Point ( _) => geojson:: Value :: from ( & chunk. as_point ( ) . value ( i) ?. to_point ( ) ) ,
454- LineString ( _) => {
455- geojson:: Value :: from ( & chunk. as_line_string ( ) . value ( i) ?. to_line_string ( ) )
456- }
457- Polygon ( _) => geojson:: Value :: from ( & chunk. as_polygon ( ) . value ( i) ?. to_polygon ( ) ) ,
458- MultiPoint ( _) => {
459- geojson:: Value :: from ( & chunk. as_multi_point ( ) . value ( i) ?. to_multi_point ( ) )
460- }
461- MultiLineString ( _) => geojson:: Value :: from (
462- & chunk
463- . as_multi_line_string ( )
464- . value ( i) ?
465- . to_multi_line_string ( ) ,
466- ) ,
467- MultiPolygon ( _) => {
468- geojson:: Value :: from ( & chunk. as_multi_polygon ( ) . value ( i) ?. to_multi_polygon ( ) )
469- }
470- Geometry ( _) => {
471- geojson:: Value :: from ( & chunk. as_geometry ( ) . value ( i) ?. to_geometry ( ) )
472- }
473- GeometryCollection ( _) => geojson:: Value :: from (
474- & chunk
475- . as_geometry_collection ( )
476- . value ( i) ?
477- . to_geometry_collection ( ) ,
478- ) ,
479- Rect ( _) => geojson:: Value :: from ( & chunk. as_rect ( ) . value ( i) ?. to_rect ( ) ) ,
480- Wkb ( _) => geojson:: Value :: from ( & chunk. as_wkb :: < i32 > ( ) . value ( i) ?. to_geometry ( ) ) ,
481- LargeWkb ( _) => {
482- geojson:: Value :: from ( & chunk. as_wkb :: < i64 > ( ) . value ( i) ?. to_geometry ( ) )
483- }
484- Wkt ( _) => geojson:: Value :: from ( & chunk. as_wkt :: < i32 > ( ) . value ( i) ?. to_geometry ( ) ) ,
485- LargeWkt ( _) => {
486- geojson:: Value :: from ( & chunk. as_wkt :: < i64 > ( ) . value ( i) ?. to_geometry ( ) )
487- }
488- } ;
489- let mut row = json_rows
490- . next ( )
491- . expect ( "we shouldn't run out of rows before we're done" ) ;
492- let _ = row. insert (
493- "geometry" . into ( ) ,
494- serde_json:: to_value ( geojson:: Geometry :: new ( value) ) ?,
495- ) ;
496- items. push ( unflatten ( row) ?) ;
497- }
493+ fn record_batch_to_json_rows (
494+ record_batch : RecordBatch ,
495+ ) -> Result < Vec < JsonMap < String , Value > > , Error > {
496+ let mut rows: Vec < Option < JsonMap < String , Value > > > =
497+ iter:: repeat_n ( Some ( JsonMap :: new ( ) ) , record_batch. num_rows ( ) ) . collect ( ) ;
498+ let schema = record_batch. schema ( ) ;
499+ for ( j, col) in record_batch. columns ( ) . iter ( ) . enumerate ( ) {
500+ let field = schema. field ( j) ;
501+ let col_name = field. name ( ) ;
502+ if field. extension_type_name ( ) . is_some ( ) & GeoArrowType :: try_from ( field) . is_ok ( ) {
503+ let array = from_arrow_array ( col, field) ?;
504+ set_geometry_column_for_json_rows ( & mut rows, array, col_name) ?;
505+ } else {
506+ set_column_for_json_rows ( & mut rows, col, col_name, false ) ?;
498507 }
499- } else {
500- items = json_rows. map ( unflatten) . collect :: < Result < _ , Error > > ( ) ?;
501508 }
502- Ok ( items)
509+ rows. into_iter ( )
510+ . map ( |row| {
511+ let row = row. unwrap ( ) ;
512+ unflatten ( row)
513+ } )
514+ . collect :: < Result < _ , _ > > ( )
503515}
504516
505517fn unflatten (
@@ -516,6 +528,9 @@ fn unflatten(
516528 }
517529 } )
518530 . collect ( ) ;
531+ if let Some ( assets) = item. get_mut ( "assets" ) . and_then ( |a| a. as_object_mut ( ) ) {
532+ assets. retain ( |_, asset| asset. is_object ( ) ) ;
533+ }
519534 for key in keys {
520535 if let Some ( value) = item. remove ( & key) {
521536 if DATETIME_COLUMNS . contains ( & key. as_str ( ) ) {
@@ -539,52 +554,6 @@ fn unflatten(
539554 Ok ( item)
540555}
541556
542- fn record_batches_to_json_rows (
543- batches : & [ RecordBatch ] ,
544- geometry_index : Option < usize > ,
545- ) -> Result < impl Iterator < Item = JsonMap < String , Value > > + use < > , ArrowError > {
546- // For backwards compatibility, default to skip nulls
547- // Skip converting the geometry index, we'll do that later.
548- record_batches_to_json_rows_internal ( batches, false , geometry_index)
549- }
550-
551- fn record_batches_to_json_rows_internal (
552- batches : & [ RecordBatch ] ,
553- explicit_nulls : bool ,
554- geometry_index : Option < usize > ,
555- ) -> Result < impl Iterator < Item = JsonMap < String , Value > > + use < > , ArrowError > {
556- let mut rows: Vec < Option < JsonMap < String , Value > > > = iter:: repeat_n (
557- Some ( JsonMap :: new ( ) ) ,
558- batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ,
559- )
560- . collect ( ) ;
561-
562- if !rows. is_empty ( ) {
563- let schema = batches[ 0 ] . schema ( ) ;
564- let mut base = 0 ;
565- for batch in batches {
566- let row_count = batch. num_rows ( ) ;
567- let row_slice = & mut rows[ base..base + batch. num_rows ( ) ] ;
568- for ( j, col) in batch. columns ( ) . iter ( ) . enumerate ( ) {
569- if geometry_index. map ( |v| v == j) . unwrap_or_default ( ) {
570- continue ;
571- }
572- let col_name = schema. field ( j) . name ( ) ;
573- set_column_for_json_rows ( row_slice, col, col_name, explicit_nulls) ?
574- }
575- base += row_count;
576- }
577- }
578-
579- Ok ( rows. into_iter ( ) . map ( |a| {
580- let mut a = a. unwrap ( ) ;
581- if let Some ( assets) = a. get_mut ( "assets" ) . and_then ( |a| a. as_object_mut ( ) ) {
582- assets. retain ( |_, asset| asset. is_object ( ) ) ;
583- }
584- a
585- } ) )
586- }
587-
588557fn convert_bbox ( obj : serde_json:: Map < String , Value > ) -> Value {
589558 if let Some ( ( ( ( xmin, ymin) , xmax) , ymax) ) = obj
590559 . get ( "xmin" )
0 commit comments