Skip to content

Commit feacc61

Browse files
authored
Merge pull request #405 from splitgraph/pg-json-to-arrow-transport
Add PG JSON to Arrow transport rules
2 parents 76971bf + 2672ea9 commit feacc61

File tree

5 files changed

+52
-32
lines changed

5 files changed

+52
-32
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connectorx/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ r2d2 = {version = "0.8", optional = true}
4444
r2d2-oracle = {version = "0.5.0", features = ["chrono"], optional = true}
4545
r2d2_mysql = {version = "21.0", optional = true}
4646
r2d2_postgres = {version = "0.18.1", optional = true}
47-
r2d2_sqlite = {version = "0.18", optional = true}
47+
r2d2_sqlite = {version = "0.20.0", optional = true}
4848
regex = {version = "1", optional = true}
49-
rusqlite = {version = "0.25", features = ["column_decltype", "chrono", "bundled"], optional = true}
49+
rusqlite = {version = "0.27.0", features = ["column_decltype", "chrono", "bundled"], optional = true}
5050
rust_decimal = {version = "1", features = ["db-postgres"], optional = true}
5151
rust_decimal_macros = {version = "1", optional = true}
5252
tiberius = {version = "0.5", features = ["rust_decimal", "chrono"], optional = true}

connectorx/src/destinations/arrow/arrow_assoc.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::constants::SECONDS_IN_DAY;
33
use arrow::array::{
44
ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Float32Builder, Float64Builder,
55
Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, Time64NanosecondBuilder,
6-
UInt32Builder, UInt64Builder,
6+
TimestampNanosecondBuilder, UInt32Builder, UInt64Builder,
77
};
88
use arrow::datatypes::Field;
99
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -140,34 +140,44 @@ impl ArrowAssoc for Option<String> {
140140
}
141141

142142
impl ArrowAssoc for DateTime<Utc> {
143-
type Builder = Float64Builder;
143+
type Builder = TimestampNanosecondBuilder;
144144

145-
fn builder(_nrows: usize) -> Float64Builder {
146-
unimplemented!()
145+
fn builder(nrows: usize) -> Self::Builder {
146+
TimestampNanosecondBuilder::with_capacity(nrows)
147147
}
148148

149-
fn append(_builder: &mut Self::Builder, _value: DateTime<Utc>) -> Result<()> {
150-
unimplemented!()
149+
#[throws(ArrowDestinationError)]
150+
fn append(builder: &mut Self::Builder, value: DateTime<Utc>) {
151+
builder.append_value(value.timestamp_nanos())
151152
}
152153

153-
fn field(_header: &str) -> Field {
154-
unimplemented!()
154+
fn field(header: &str) -> Field {
155+
Field::new(
156+
header,
157+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
158+
true,
159+
)
155160
}
156161
}
157162

158163
impl ArrowAssoc for Option<DateTime<Utc>> {
159-
type Builder = Float64Builder;
164+
type Builder = TimestampNanosecondBuilder;
160165

161-
fn builder(_nrows: usize) -> Float64Builder {
162-
unimplemented!()
166+
fn builder(nrows: usize) -> Self::Builder {
167+
TimestampNanosecondBuilder::with_capacity(nrows)
163168
}
164169

165-
fn append(_builder: &mut Self::Builder, _value: Option<DateTime<Utc>>) -> Result<()> {
166-
unimplemented!()
170+
#[throws(ArrowDestinationError)]
171+
fn append(builder: &mut Self::Builder, value: Option<DateTime<Utc>>) {
172+
builder.append_option(value.map(|x| x.timestamp_nanos()))
167173
}
168174

169-
fn field(_header: &str) -> Field {
170-
unimplemented!()
175+
fn field(header: &str) -> Field {
176+
Field::new(
177+
header,
178+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
179+
false,
180+
)
171181
}
172182
}
173183

connectorx/src/sources/sqlite/mod.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ where
9090
let l1query = limit1_query(query, &SQLiteDialect {})?;
9191

9292
let is_sucess = conn.query_row(l1query.as_str(), [], |row| {
93-
for (j, col) in row.columns().iter().enumerate() {
93+
for (j, col) in row.as_ref().columns().iter().enumerate() {
9494
if j >= names.len() {
9595
names.push(col.name().to_string());
9696
}
@@ -142,14 +142,15 @@ where
142142
}
143143

144144
// tried all queries but all get empty result set
145-
let mut stmt = conn.prepare(self.queries[0].as_str())?;
146-
let rows = stmt.query([])?;
147-
148-
if let Some(cnames) = rows.column_names() {
149-
self.names = cnames.into_iter().map(|s| s.to_string()).collect();
150-
// set all columns as string (align with pandas)
151-
self.schema = vec![SQLiteTypeSystem::Text(false); self.names.len()];
152-
}
145+
let stmt = conn.prepare(self.queries[0].as_str())?;
146+
147+
self.names = stmt
148+
.column_names()
149+
.into_iter()
150+
.map(|s| s.to_string())
151+
.collect();
152+
// set all columns as string (align with pandas)
153+
self.schema = vec![SQLiteTypeSystem::Text(false); self.names.len()];
153154
}
154155

155156
#[throws(SQLiteSourceError)]

connectorx/src/transports/postgres_arrow.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use num_traits::ToPrimitive;
1313
use postgres::NoTls;
1414
use postgres_openssl::MakeTlsConnector;
1515
use rust_decimal::Decimal;
16+
use serde_json::Value;
1617
use std::marker::PhantomData;
1718
use thiserror::Error;
1819
use uuid::Uuid;
@@ -57,6 +58,8 @@ macro_rules! impl_postgres_transport {
5758
{ UUID[Uuid] => LargeUtf8[String] | conversion option }
5859
{ Char[&'r str] => LargeUtf8[String] | conversion none }
5960
{ ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
61+
{ JSON[Value] => LargeUtf8[String] | conversion option }
62+
{ JSONB[Value] => LargeUtf8[String] | conversion none }
6063
}
6164
);
6265
}
@@ -83,3 +86,9 @@ impl<P, C> TypeConversion<Decimal, f64> for PostgresArrowTransport<P, C> {
8386
.unwrap_or_else(|| panic!("cannot convert decimal {:?} to float64", val))
8487
}
8588
}
89+
90+
impl<P, C> TypeConversion<Value, String> for PostgresArrowTransport<P, C> {
91+
fn convert(val: Value) -> String {
92+
val.to_string()
93+
}
94+
}

0 commit comments

Comments
 (0)