Skip to content

Commit 1695450

Browse files
committed
_bool (postgres): update pandas_columns, transport, postgres
Implement Vec<bool> and Option<Vec<bool>> for Postgres{CSV,Simple}SourceParser. Since the values of true/false are stored like "t" and "f", they can't be produced by using `v.parse()` like the other types; instead, we write out the implementations with explicit match statements.
1 parent cb56729 commit 1695450

File tree

3 files changed

+157
-0
lines changed

3 files changed

+157
-0
lines changed

connectorx-python/src/pandas/pandas_columns/array.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,33 @@ where
9494
}
9595
}
9696

97+
impl PandasColumn<Vec<bool>> for ArrayColumn<bool> {
98+
#[throws(ConnectorXPythonError)]
99+
fn write(&mut self, val: Vec<bool>, row: usize) {
100+
self.lengths.push(val.len());
101+
self.buffer.extend_from_slice(&val[..]);
102+
self.row_idx.push(row);
103+
self.try_flush()?;
104+
}
105+
}
106+
107+
impl PandasColumn<Option<Vec<bool>>> for ArrayColumn<bool> {
108+
#[throws(ConnectorXPythonError)]
109+
fn write(&mut self, val: Option<Vec<bool>>, row: usize) {
110+
match val {
111+
Some(v) => {
112+
self.lengths.push(v.len());
113+
self.buffer.extend_from_slice(&v[..]);
114+
self.row_idx.push(row);
115+
self.try_flush()?;
116+
}
117+
None => {
118+
self.lengths.push(usize::MAX);
119+
self.row_idx.push(row);
120+
}
121+
}
122+
}
123+
}
97124
impl PandasColumn<Vec<f64>> for ArrayColumn<f64> {
98125
#[throws(ConnectorXPythonError)]
99126
fn write(&mut self, val: Vec<f64>, row: usize) {
@@ -150,6 +177,14 @@ impl PandasColumn<Option<Vec<i64>>> for ArrayColumn<i64> {
150177
}
151178
}
152179

180+
impl HasPandasColumn for Vec<bool> {
181+
type PandasColumn<'a> = ArrayColumn<bool>;
182+
}
183+
184+
impl HasPandasColumn for Option<Vec<bool>> {
185+
type PandasColumn<'a> = ArrayColumn<bool>;
186+
}
187+
153188
impl HasPandasColumn for Vec<f64> {
154189
type PandasColumn<'a> = ArrayColumn<f64>;
155190
}

connectorx-python/src/pandas/transports/postgres.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ macro_rules! impl_postgres_transport {
3333
{ Int2[i16] => I64[i64] | conversion auto }
3434
{ Int4[i32] => I64[i64] | conversion auto }
3535
{ Int8[i64] => I64[i64] | conversion auto }
36+
{ BoolArray[Vec<bool>] => BoolArray[Vec<bool>] | conversion auto_vec }
3637
{ Int2Array[Vec<i16>] => I64Array[Vec<i64>] | conversion auto_vec }
3738
{ Int4Array[Vec<i32>] => I64Array[Vec<i64>] | conversion auto_vec }
3839
{ Int8Array[Vec<i64>] => I64Array[Vec<i64>] | conversion auto }

connectorx/src/sources/postgres/mod.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ impl_produce!(
472472
Vec<f64>,
473473
Vec<Decimal>,
474474
bool,
475+
Vec<bool>,
475476
&'r str,
476477
Vec<u8>,
477478
NaiveTime,
@@ -706,6 +707,57 @@ impl<'r, 'a> Produce<'r, Option<bool>> for PostgresCSVSourceParser<'a> {
706707
}
707708
}
708709

710+
impl<'r, 'a> Produce<'r, Vec<bool>> for PostgresCSVSourceParser<'a> {
711+
type Error = PostgresSourceError;
712+
713+
#[throws(PostgresSourceError)]
714+
fn produce(&mut self) -> Vec<bool> {
715+
let (ridx, cidx) = self.next_loc()?;
716+
let s = &self.rowbuf[ridx][cidx][..];
717+
match s {
718+
"{}" => vec![],
719+
_ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
720+
s => s[1..s.len() - 1]
721+
.split(",")
722+
.map(|v|
723+
match v {
724+
"t" => Ok(true),
725+
"f" => Ok(false),
726+
_ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
727+
}
728+
)
729+
.collect::<Result<Vec<bool>, ConnectorXError>>()?,
730+
}
731+
}
732+
}
733+
734+
impl<'r, 'a> Produce<'r, Option<Vec<bool>>> for PostgresCSVSourceParser<'a> {
735+
type Error = PostgresSourceError;
736+
737+
#[throws(PostgresSourceError)]
738+
fn produce(&mut self) -> Option<Vec<bool>> {
739+
let (ridx, cidx) = self.next_loc()?;
740+
let s = &self.rowbuf[ridx][cidx][..];
741+
match s {
742+
"" => None,
743+
"{}" => Some(vec![]),
744+
_ if s.len() < 3 => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
745+
s => Some(
746+
s[1..s.len() - 1]
747+
.split(",")
748+
.map(|v|
749+
match v {
750+
"t" => Ok(true),
751+
"f" => Ok(false),
752+
_ => throw!(ConnectorXError::cannot_produce::<bool>(Some(s.into()))),
753+
}
754+
)
755+
.collect::<Result<Vec<bool>, ConnectorXError>>()?,
756+
),
757+
}
758+
}
759+
}
760+
709761
impl<'r, 'a> Produce<'r, DateTime<Utc>> for PostgresCSVSourceParser<'a> {
710762
type Error = PostgresSourceError;
711763

@@ -1007,6 +1059,7 @@ impl_produce!(
10071059
Vec<f64>,
10081060
Vec<Decimal>,
10091061
bool,
1062+
Vec<bool>,
10101063
&'r str,
10111064
Vec<u8>,
10121065
NaiveTime,
@@ -1348,6 +1401,74 @@ macro_rules! impl_simple_vec_produce {
13481401
}
13491402
impl_simple_vec_produce!(i16, i32, i64, f32, f64, Decimal, String,);
13501403

1404+
impl<'r> Produce<'r, Vec<bool>> for PostgresSimpleSourceParser {
1405+
type Error = PostgresSourceError;
1406+
1407+
#[throws(PostgresSourceError)]
1408+
fn produce(&'r mut self) -> Vec<bool> {
1409+
let (ridx, cidx) = self.next_loc()?;
1410+
let val = match &self.rows[ridx] {
1411+
SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1412+
Some(s) => match s{
1413+
"" => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1414+
"{}" => vec![],
1415+
_ => rem_first_and_last(s)
1416+
.split(",")
1417+
.map(|token| match token {
1418+
"t" => Ok(true),
1419+
"f" => Ok(false),
1420+
_ => throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1421+
})
1422+
.collect::<Result<Vec<bool>, ConnectorXError>>()?,
1423+
},
1424+
None => throw!(anyhow!("Cannot parse NULL in non-NULL column.")),
1425+
},
1426+
SimpleQueryMessage::CommandComplete(c) => {
1427+
panic!("get command: {}", c);
1428+
}
1429+
_ => {
1430+
panic!("what?");
1431+
}
1432+
};
1433+
val
1434+
}
1435+
}
1436+
1437+
impl<'r> Produce<'r, Option<Vec<bool>>> for PostgresSimpleSourceParser {
1438+
type Error = PostgresSourceError;
1439+
1440+
#[throws(PostgresSourceError)]
1441+
fn produce(&'r mut self) -> Option<Vec<bool>> {
1442+
let (ridx, cidx) = self.next_loc()?;
1443+
let val = match &self.rows[ridx] {
1444+
SimpleQueryMessage::Row(row) => match row.try_get(cidx)? {
1445+
Some(s) => match s{
1446+
"" => None,
1447+
"{}" => Some(vec![]),
1448+
_ => Some(
1449+
rem_first_and_last(s)
1450+
.split(",")
1451+
.map(|token| match token {
1452+
"t" => Ok(true),
1453+
"f" => Ok(false),
1454+
_ => throw!(ConnectorXError::cannot_produce::<Vec<bool>>(Some(s.into())))
1455+
})
1456+
.collect::<Result<Vec<bool>, ConnectorXError>>()?
1457+
),
1458+
},
1459+
None => None,
1460+
},
1461+
SimpleQueryMessage::CommandComplete(c) => {
1462+
panic!("get command: {}", c);
1463+
}
1464+
_ => {
1465+
panic!("what?");
1466+
}
1467+
};
1468+
val
1469+
}
1470+
}
1471+
13511472
impl<'r> Produce<'r, NaiveDate> for PostgresSimpleSourceParser {
13521473
type Error = PostgresSourceError;
13531474

0 commit comments

Comments
 (0)