Skip to content

Commit 84e0d17

Browse files
committed
Fix more bugs
1 parent d93a5be commit 84e0d17

File tree

12 files changed

+269
-196
lines changed

12 files changed

+269
-196
lines changed

lib/maplib/src/mapping/expansion/validation.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ fn infer_validate_mapping_column_type_from_ptype(
263263
))
264264
}
265265
} else if ptype_is_blank(nn.as_ref()) {
266-
if datatype.is_string() {
266+
if datatype.is_string() {
267267
Ok(MappingColumnType::Flat(
268268
BaseRDFNodeType::BlankNode.into_default_input_rdf_node_state(),
269269
))
@@ -284,9 +284,7 @@ fn infer_validate_mapping_column_type_from_ptype(
284284
} else {
285285
let ptype_rdf_node_type = BaseRDFNodeType::Literal(nn.clone());
286286
let ptype_dt = ptype_rdf_node_type.default_input_polars_data_type();
287-
if ptype_dt.is_string() && datatype.is_string()
288-
|| &ptype_dt == datatype
289-
{
287+
if ptype_dt.is_string() && datatype.is_string() || &ptype_dt == datatype {
290288
Ok(MappingColumnType::Flat(
291289
ptype_rdf_node_type.into_default_input_rdf_node_state(),
292290
))

lib/query_processing/src/expressions/functions.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1625,7 +1625,8 @@ pub fn func_expression(
16251625
col(first_context.as_str())
16261626
.struct_()
16271627
.field_by_name(&t.field_col_name())
1628-
.is_null().not(),
1628+
.is_null()
1629+
.not(),
16291630
)
16301631
.then(lit(rdf_named_node_to_polars_literal_value(l)))
16311632
.otherwise(lit(LiteralValue::untyped_null()).cast(iri_dtype.clone())),

lib/query_processing/src/graph_patterns/cats.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use polars::prelude::{by_name, col, LazyFrame};
1+
use polars::prelude::{by_name, col, JoinType, LazyFrame};
22
use representation::cats::{maybe_decode_expr, optional_maybe_decode_expr, CatReEnc, Cats};
33
use representation::solution_mapping::BaseCatState;
44
use representation::{BaseRDFNodeType, RDFNodeState};
55
use std::sync::Arc;
66

77
pub enum CatOperation {
88
Decode,
9-
ReEnc(CatReEnc, bool),
9+
ReEnc(CatReEnc),
1010
}
1111

1212
impl CatOperation {
@@ -44,14 +44,14 @@ impl CatOperation {
4444
));
4545
}
4646
}
47-
CatOperation::ReEnc(cat_re_enc, forget_others) => {
47+
CatOperation::ReEnc(cat_re_enc) => {
4848
if !t.is_multi() {
49-
mappings = cat_re_enc.re_encode(mappings, c, forget_others);
49+
mappings = cat_re_enc.re_encode(mappings, c, false);
5050
} else {
5151
let tmp = uuid::Uuid::new_v4().to_string();
5252
let n = base_t.field_col_name();
5353
mappings = mappings.with_column(col(c).struct_().field_by_name(&n).alias(&tmp));
54-
mappings = cat_re_enc.re_encode(mappings, &tmp, forget_others);
54+
mappings = cat_re_enc.re_encode(mappings, &tmp, false);
5555
mappings = mappings
5656
.with_column(col(c).struct_().with_fields(vec![col(&tmp).alias(&n)]));
5757
mappings = mappings.drop(by_name([tmp], true));
@@ -93,7 +93,7 @@ pub fn create_compatible_cats(
9393
(
9494
BaseCatState::CategoricalNative(false, Some(left_local_cats.clone())),
9595
None,
96-
Some(CatOperation::ReEnc(re_enc, true)),
96+
Some(CatOperation::ReEnc(re_enc)),
9797
)
9898
}
9999
BaseCatState::String | BaseCatState::NonString => {

lib/representation/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ thiserror.workspace = true
2020
pyo3 = {workspace = true, features = ["chrono-tz", "chrono"]}
2121
oxsdatatypes.workspace = true
2222
rayon.workspace = true
23+
nohash-hasher.workspace = true
2324
uuid.workspace = true
2425

2526
[lints.rust]

lib/representation/src/cats.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@ mod split;
88
pub use decode::*;
99
pub use encode::*;
1010
pub use globalize::*;
11+
pub use image::*;
1112
pub use re_encode::*;
1213
pub use split::*;
13-
pub use image::*;
1414

1515
use crate::BaseRDFNodeType;
16+
use nohash_hasher::NoHashHasher;
1617
use oxrdf::NamedNode;
1718
use polars::prelude::DataFrame;
1819
use std::collections::{BTreeMap, HashMap};
20+
use std::hash::BuildHasherDefault;
1921
use std::sync::Arc;
22+
use uuid::Uuid;
2023

2124
const SUBJECT_PREFIX_COL_NAME: &str = "subject_prefix";
2225
const OBJECT_PREFIX_COL_NAME: &str = "object_prefix";
@@ -62,7 +65,7 @@ pub struct EncodedTriples {
6265
pub struct CatEncs {
6366
// We use a BTree map to keep strings sorted
6467
pub map: BTreeMap<String, u32>,
65-
pub rev_map: HashMap<u32, String>,
68+
pub rev_map: HashMap<u32, String, BuildHasherDefault<NoHashHasher<u32>>>,
6669
}
6770

6871
#[derive(Debug, Clone)]
@@ -101,7 +104,7 @@ impl Cats {
101104
iri_height: 0,
102105
blank_height: 0,
103106
literal_height_map: Default::default(),
104-
uuid: uuid::Uuid::new_v4().to_string(),
107+
uuid: Uuid::new_v4().to_string(),
105108
};
106109
cats.iri_height = cats.calc_new_iri_height();
107110
cats.blank_height = cats.calc_new_blank_height();

lib/representation/src/cats/decode.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use crate::solution_mapping::BaseCatState;
33
use crate::BaseRDFNodeType;
44
use polars::datatypes::{DataType, Field, PlSmallStr};
55
use polars::frame::DataFrame;
6-
use polars::prelude::{col, Column, Expr, IntoColumn, IntoLazy};
6+
use polars::prelude::{col, Column, Expr, IntoColumn, IntoLazy, IntoSeries, StringChunked, UInt32Chunked};
77
use polars::series::Series;
88
use std::sync::Arc;
9+
use rayon::iter::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};
910

1011
impl CatEncs {
1112
pub fn decode(&self, ser: &Series, cat_type: &CatType) -> Series {
@@ -94,8 +95,8 @@ impl Cats {
9495
encs.extend(local_cats.get_encs(t))
9596
}
9697
let u32s = ser.u32().unwrap();
97-
let mut strings = Vec::with_capacity(ser.len());
98-
for u in u32s {
98+
let us:Vec<_> = u32s.iter().collect();
99+
let strings: Vec<_> = us.par_iter().map(|u| {
99100
let s = if let Some(u) = u {
100101
let mut s = None;
101102
for (t, e) in &encs {
@@ -108,9 +109,9 @@ impl Cats {
108109
} else {
109110
None
110111
};
111-
strings.push(s)
112-
}
113-
Series::from_iter(strings.into_iter())
112+
s
113+
}).collect();
114+
Series::from_iter(strings)
114115
}
115116
}
116117

@@ -172,7 +173,7 @@ pub fn decode_expr(
172173
move |x| {
173174
let original_name = x.name().to_string();
174175
let mut s = global_cats.decode(
175-
&x.as_materialized_series(),
176+
x.as_materialized_series(),
176177
&base_rdf_node_type,
177178
local_cats.as_ref().map(|x| x.clone()),
178179
);

lib/representation/src/cats/encode.rs

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use polars::frame::DataFrame;
1010
use polars::prelude::{col, lit, IntoLazy, Series};
1111
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
1212
use std::collections::HashMap;
13+
use std::hash::BuildHasherDefault;
1314
use std::sync::Arc;
1415

1516
impl CatEncs {
1617
pub fn new_empty() -> CatEncs {
1718
CatEncs {
1819
map: Default::default(),
19-
rev_map: Default::default(),
20+
rev_map: HashMap::with_capacity_and_hasher(2, BuildHasherDefault::default()),
2021
}
2122
}
2223

@@ -532,42 +533,61 @@ pub fn encode_triples(
532533
let out: Vec<_> = dfs
533534
.into_par_iter()
534535
.map(|mut df| {
535-
let subject_out =
536-
if let Some(subject_prefix_map) = prefix_maps.get(SUBJECT_COL_NAME) {
537-
let u = df
538-
.drop_in_place(SUBJECT_PREFIX_COL_NAME)
539-
.unwrap()
540-
.as_materialized_series_maintain_scalar()
541-
.u32()
542-
.unwrap()
543-
.first()
544-
.unwrap();
545-
Some(subject_prefix_map.get(&u).unwrap().clone())
546-
} else {
547-
None
548-
};
549-
let object_out = if let Some(object_prefix_map) = prefix_maps.get(OBJECT_COL_NAME) {
550-
let u = df
551-
.drop_in_place(OBJECT_PREFIX_COL_NAME)
552-
.unwrap()
553-
.as_materialized_series_maintain_scalar()
554-
.u32()
555-
.unwrap()
556-
.first()
557-
.unwrap();
558-
Some(object_prefix_map.get(&u).unwrap().clone())
559-
} else {
536+
if df.height() == 0 {
560537
None
561-
};
562-
(df, subject_out, object_out)
538+
} else {
539+
let subject_out =
540+
if let Some(subject_prefix_map) = prefix_maps.get(SUBJECT_COL_NAME) {
541+
let ser = df
542+
.drop_in_place(SUBJECT_PREFIX_COL_NAME)
543+
.unwrap()
544+
.as_materialized_series_maintain_scalar();
545+
let u32ch = ser.u32().unwrap();
546+
let i = u32ch.first_non_null();
547+
if let Some(i) = i {
548+
let u = u32ch.get(i);
549+
if let Some(u) = u {
550+
Some(subject_prefix_map.get(&u).unwrap().clone())
551+
} else {
552+
None
553+
}
554+
} else {
555+
None
556+
}
557+
} else {
558+
None
559+
};
560+
let object_out =
561+
if let Some(object_prefix_map) = prefix_maps.get(OBJECT_COL_NAME) {
562+
let ser = df
563+
.drop_in_place(OBJECT_PREFIX_COL_NAME)
564+
.unwrap()
565+
.as_materialized_series_maintain_scalar();
566+
let u32ch = ser.u32().unwrap();
567+
let i = u32ch.first_non_null();
568+
if let Some(i) = i {
569+
let u = u32ch.get(i);
570+
if let Some(u) = u {
571+
Some(object_prefix_map.get(&u).unwrap().clone())
572+
} else {
573+
None
574+
}
575+
} else {
576+
None
577+
}
578+
} else {
579+
None
580+
};
581+
Some((df, subject_out, object_out))
582+
}
563583
})
564584
.collect();
565585
out
566586
} else {
567-
vec![(mappings, None, None)]
587+
vec![Some((mappings, None, None))]
568588
};
569589
let mut new_out = vec![];
570-
for (df, subject, object) in out {
590+
for (df, subject, object) in out.into_iter().filter(|x| x.is_some()).map(|x| x.unwrap()) {
571591
let subject = if matches!(subject_cat_state, BaseCatState::CategoricalNative(..)) {
572592
if let Some(subject) = subject {
573593
Some(CatType::Prefix(NamedNode::new_unchecked(subject)))

lib/representation/src/cats/image.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use super::{reencode_solution_mappings, CatEncs};
22
use super::{CatReEnc, Cats};
33
use crate::solution_mapping::{BaseCatState, EagerSolutionMappings, SolutionMappings};
44
use crate::{BaseRDFNodeType, RDFNodeState};
5+
use nohash_hasher::NoHashHasher;
56
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
67
use std::collections::{BTreeMap, HashMap, HashSet};
8+
use std::hash::BuildHasherDefault;
79
use std::sync::Arc;
810

911
impl Cats {
@@ -139,7 +141,7 @@ impl Cats {
139141

140142
impl CatEncs {
141143
pub fn image(&self, s: &HashSet<u32>) -> Option<CatEncs> {
142-
let rev_map: HashMap<_, _> = s
144+
let rev_map: HashMap<_, _, BuildHasherDefault<NoHashHasher<u32>>> = s
143145
.par_iter()
144146
.map(|x| {
145147
if let Some(s) = self.rev_map.get(x) {
@@ -160,23 +162,30 @@ impl CatEncs {
160162
}
161163
}
162164

163-
pub fn new_solution_mapping_cats(sms: Vec<EagerSolutionMappings>, global_cats:&Cats) -> (Vec<EagerSolutionMappings>, Cats) {
165+
pub fn new_solution_mapping_cats(
166+
sms: Vec<EagerSolutionMappings>,
167+
global_cats: &Cats,
168+
) -> (Vec<EagerSolutionMappings>, Cats) {
164169
let smsref: Vec<_> = sms.iter().collect();
165170
let mut cats = global_cats.mappings_cat_image(&smsref);
166171

167172
let reenc = cats.merge_solution_mappings_locals(&smsref);
168-
let new_sms:Vec<_> = sms.into_par_iter().map(|sm| {
169-
reencode_solution_mappings(sm, &reenc)
170-
}).collect();
173+
let new_sms: Vec<_> = sms
174+
.into_par_iter()
175+
.map(|sm| reencode_solution_mappings(sm, &reenc))
176+
.collect();
171177
(new_sms, cats)
172178
}
173179

174-
pub fn set_global_cats_as_local(rdf_node_types:&mut HashMap<String, RDFNodeState>, cats:Arc<Cats>) {
180+
pub fn set_global_cats_as_local(
181+
rdf_node_types: &mut HashMap<String, RDFNodeState>,
182+
cats: Arc<Cats>,
183+
) {
175184
for (_, s) in rdf_node_types {
176185
for v in s.map.values_mut() {
177186
if matches!(v, BaseCatState::CategoricalNative(_, _)) {
178187
*v = BaseCatState::CategoricalNative(false, Some(cats.clone()));
179188
}
180189
}
181190
}
182-
}
191+
}

lib/representation/src/rdf_to_polars.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD};
22
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
3+
use chrono_tz::Tz;
34
use log::warn;
45
use oxrdf::vocab::{rdf, xsd};
56
use oxrdf::{BlankNode, Literal, NamedNode, NamedNodeRef, Term};
@@ -9,7 +10,6 @@ use polars::prelude::{
910
};
1011
use std::ops::Deref;
1112
use std::str::FromStr;
12-
use chrono_tz::Tz;
1313

1414
pub fn rdf_term_to_polars_expr(term: &Term) -> Expr {
1515
match term {
@@ -399,7 +399,10 @@ pub fn polars_literal_values_to_series(literal_values: Vec<LiteralValue>, name:
399399
})
400400
.collect::<Vec<Option<i64>>>(),
401401
)
402-
.cast(&DataType::Datetime(*t, Some(tz.unwrap_or(&TimeZone::UTC).clone())))
402+
.cast(&DataType::Datetime(
403+
*t,
404+
Some(tz.unwrap_or(&TimeZone::UTC).clone()),
405+
))
403406
.unwrap(),
404407
AnyValue::DatetimeOwned(_, t, tz) => {
405408
let tz = tz.as_ref().map(|x| x.deref().clone());
@@ -417,7 +420,10 @@ pub fn polars_literal_values_to_series(literal_values: Vec<LiteralValue>, name:
417420
})
418421
.collect::<Vec<Option<i64>>>(),
419422
)
420-
.cast(&DataType::Datetime(*t, Some(tz.unwrap_or(TimeZone::UTC).clone())))
423+
.cast(&DataType::Datetime(
424+
*t,
425+
Some(tz.unwrap_or(TimeZone::UTC).clone()),
426+
))
421427
.unwrap()
422428
}
423429
AnyValue::Date(_) => Series::new(

0 commit comments

Comments
 (0)