Skip to content

Commit a2b9956

Browse files
authored
Merge pull request #698 from pangjunrong/feature/mssql-param-certs
feat: Added Params for CA Certs in MSSQL Config
2 parents 3106e7d + e6b197d commit a2b9956

File tree

4 files changed

+56
-117
lines changed

4 files changed

+56
-117
lines changed

Cargo.lock

Lines changed: 13 additions & 98 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

connectorx/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ chrono = "0.4"
2525
arrow = {workspace = true, optional = true}
2626
arrow2 = {workspace = true, default-features = false, optional = true}
2727
bb8 = {version = "0.7", optional = true}
28-
bb8-tiberius = {version = "0.5", optional = true}
28+
bb8-tiberius = {version = "0.8", optional = true}
2929
csv = {version = "1", optional = true}
3030
fallible-streaming-iterator = {version = "0.1", optional = true}
3131
futures = {version = "0.3", optional = true}
@@ -50,7 +50,7 @@ regex = {version = "1", optional = true}
5050
rusqlite = {version = "0.30.0", features = ["column_decltype", "chrono", "bundled"], optional = true}
5151
rust_decimal = {version = "1", features = ["db-postgres"], optional = true}
5252
rust_decimal_macros = {version = "1", optional = true}
53-
tiberius = {version = "0.5", features = ["rust_decimal", "chrono", "integrated-auth-gssapi"], optional = true}
53+
tiberius = {version = "0.7.3", features = ["rust_decimal", "chrono", "integrated-auth-gssapi"], optional = true}
5454
tokio = {version = "1", features = ["rt", "rt-multi-thread", "net"], optional = true}
5555
tokio-util = {version = "0.6", optional = true}
5656
urlencoding = {version = "2.1", optional = true}

connectorx/src/sources/mssql/mod.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use rust_decimal::Decimal;
2626
use sqlparser::dialect::MsSqlDialect;
2727
use std::collections::HashMap;
2828
use std::sync::Arc;
29-
use tiberius::{AuthMethod, Config, EncryptionLevel, QueryResult, Row};
29+
use tiberius::{AuthMethod, Config, EncryptionLevel, QueryItem, QueryStream, Row};
3030
use tokio::runtime::{Handle, Runtime};
3131
use url::Url;
3232
use urlencoding::decode;
@@ -84,6 +84,16 @@ pub fn mssql_config(url: &Url) -> Config {
8484
decode(url.password().unwrap_or(""))?.to_owned(),
8585
));
8686

87+
match params.get("trust_server_certificate") {
88+
Some(v) if v.to_lowercase() == "true" => config.trust_cert(),
89+
_ => {}
90+
};
91+
92+
match params.get("trust_server_certificate_ca") {
93+
Some(v) => config.trust_cert_ca(v),
94+
_ => {}
95+
};
96+
8797
match params.get("encrypt") {
8898
Some(v) if v.to_lowercase() == "true" => config.encryption(EncryptionLevel::Required),
8999
_ => config.encryption(EncryptionLevel::NotSupported),
@@ -147,22 +157,27 @@ where
147157
let mut conn = self.rt.block_on(self.pool.get())?;
148158
let first_query = &self.queries[0];
149159
let (names, types) = match self.rt.block_on(conn.query(first_query.as_str(), &[])) {
150-
Ok(stream) => {
151-
let columns = stream.columns().ok_or_else(|| {
152-
anyhow!("MsSQL failed to get the columns of query: {}", first_query)
153-
})?;
154-
columns
160+
Ok(mut stream) => match self.rt.block_on(async { stream.columns().await }) {
161+
Ok(Some(columns)) => columns
155162
.iter()
156163
.map(|col| {
157164
(
158165
col.name().to_string(),
159166
MsSQLTypeSystem::from(&col.column_type()),
160167
)
161168
})
162-
.unzip()
163-
}
169+
.unzip(),
170+
Ok(None) => {
171+
throw!(anyhow!(
172+
"MsSQL returned no columns for query: {}",
173+
first_query
174+
));
175+
}
176+
Err(e) => {
177+
throw!(anyhow!("Error fetching columns: {}", e));
178+
}
179+
},
164180
Err(e) => {
165-
// tried the last query but still get an error
166181
debug!(
167182
"cannot get metadata for '{}', try next query: {}",
168183
first_query, e
@@ -269,7 +284,7 @@ impl SourcePartition for MsSQLSourcePartition {
269284
#[throws(MsSQLSourceError)]
270285
fn parser<'a>(&'a mut self) -> Self::Parser<'a> {
271286
let conn = self.rt.block_on(self.pool.get())?;
272-
let rows: OwningHandle<Box<Conn<'a>>, DummyBox<QueryResult<'a>>> =
287+
let rows: OwningHandle<Box<Conn<'a>>, DummyBox<QueryStream<'a>>> =
273288
OwningHandle::new_with_fn(Box::new(conn), |conn: *const Conn<'a>| unsafe {
274289
let conn = &mut *(conn as *mut Conn<'a>);
275290

@@ -294,7 +309,7 @@ impl SourcePartition for MsSQLSourcePartition {
294309

295310
pub struct MsSQLSourceParser<'a> {
296311
rt: &'a Handle,
297-
iter: OwningHandle<Box<Conn<'a>>, DummyBox<QueryResult<'a>>>,
312+
iter: OwningHandle<Box<Conn<'a>>, DummyBox<QueryStream<'a>>>,
298313
rowbuf: Vec<Row>,
299314
ncols: usize,
300315
current_col: usize,
@@ -305,7 +320,7 @@ pub struct MsSQLSourceParser<'a> {
305320
impl<'a> MsSQLSourceParser<'a> {
306321
fn new(
307322
rt: &'a Handle,
308-
iter: OwningHandle<Box<Conn<'a>>, DummyBox<QueryResult<'a>>>,
323+
iter: OwningHandle<Box<Conn<'a>>, DummyBox<QueryStream<'a>>>,
309324
schema: &[MsSQLTypeSystem],
310325
) -> Self {
311326
Self {
@@ -348,7 +363,10 @@ impl<'a> PartitionParser<'a> for MsSQLSourceParser<'a> {
348363

349364
for _ in 0..DB_BUFFER_SIZE {
350365
if let Some(item) = self.rt.block_on(self.iter.next()) {
351-
self.rowbuf.push(item?);
366+
match item.map_err(MsSQLSourceError::MsSQLError)? {
367+
QueryItem::Row(row) => self.rowbuf.push(row),
368+
_ => continue,
369+
}
352370
} else {
353371
self.is_finished = true;
354372
break;

docs/databases/mssql.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ SQLServer does not need to specify protocol.
66

77
### MsSQL Connection
88
```{hint}
9-
By adding `trusted_connection=true` to connection uri parameter, windows authentication will be enabled. Example: `mssql://host:port/db?trusted_connection=true`
10-
By adding `encrypt=true` to connection uri parameter, SQLServer will use SSL encryption. Example: `mssql://host:port/db?encrypt=true&trusted_connection=true`
11-
```
12-
```{hint}
139
if the user password has special characters, they need to be sanitized. example: `from urllib import parse; password = parse.quote_plus(password)`
1410
```
1511

@@ -20,6 +16,16 @@ query = 'SELECT * FROM table' # query string
2016
cx.read_sql(conn, query) # read data from MsSQL
2117
```
2218

19+
### Connection Parameters
20+
* By adding `trusted_connection=true` to connection uri parameter, windows authentication will be enabled.
21+
* Example: `mssql://host:port/db?trusted_connection=true`
22+
* By adding `encrypt=true` to connection uri parameter, SQLServer will use SSL encryption.
23+
* Example: `mssql://host:port/db?encrypt=true&trusted_connection=true`
24+
* By adding `trust_server_certificate=true` to connection uri parameter, the SQLServer certificate will not be validated and it is accepted as-is.
25+
* Example: `mssql://host:port/db?trust_server_certificate=true&encrypt=true`
26+
* By adding `trust_server_certificate_ca=/path/to/ca-cert.crt` to connection uri parameter, the SQLServer certificate will be validated against the given CA certificate in addition to the system-truststore.
27+
* Example: `mssql://host:port/db?encrypt=true&trust_server_certificate_ca=/path/to/ca-cert.crt`
28+
2329
### SQLServer-Pandas Type Mapping
2430
| SQLServer Type | Pandas Type | Comment |
2531
|:---------------:|:---------------------------:|:----------------------------------:|

0 commit comments

Comments
 (0)