Skip to content

Commit 125354b

Browse files
committed
Further fixes
1 parent ab1ac15 commit 125354b

File tree

9 files changed

+80
-126
lines changed

9 files changed

+80
-126
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/query_processing/src/expressions.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ use polars::prelude::{
1616
use representation::cats::{maybe_decode_expr, Cats};
1717
use representation::multitype::all_multi_main_cols;
1818
use representation::query_context::Context;
19-
use representation::rdf_to_polars::{
20-
rdf_literal_to_polars_expr,
21-
};
19+
use representation::rdf_to_polars::rdf_literal_to_polars_expr;
2220
use representation::solution_mapping::{BaseCatState, SolutionMappings};
2321
use representation::{BaseRDFNodeType, RDFNodeState, LANG_STRING_VALUE_FIELD};
2422
use spargebra::algebra::Expression;

lib/query_processing/src/graph_patterns/join.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ pub fn join(
4040
&right_datatypes,
4141
global_cats,
4242
);
43-
4443
let (left_mappings, left_datatypes, right_mappings, right_datatypes) =
4544
create_join_compatible_solution_mappings(
4645
left_mappings,
@@ -75,10 +74,41 @@ pub fn join_workaround(
7574
) -> SolutionMappings {
7675
assert!(matches!(join_type, JoinType::Left | JoinType::Inner));
7776

78-
let (mut left_mappings, left_exploded) = unnest_multicols(left_mappings, &left_datatypes);
77+
let (mut left_mappings, mut left_exploded) = unnest_multicols(left_mappings, &left_datatypes);
78+
79+
//Fixes "unnesting" where only one side is multi.
80+
for (c, s) in &right_datatypes {
81+
if s.is_multi() {
82+
if let Some(sl) = left_datatypes.get(c) {
83+
if !sl.is_multi() {
84+
let lbt = sl.get_base_type().unwrap();
85+
let fcn = lbt.field_col_name();
86+
let colname = format!("{c}{}", &fcn);
87+
left_mappings = left_mappings.rename([c], [&colname], true);
88+
left_exploded.insert(c.clone(), (vec![fcn], vec![colname]));
89+
}
90+
}
91+
}
92+
}
93+
7994
let (mut right_mappings, mut right_exploded) =
8095
unnest_multicols(right_mappings, &right_datatypes);
8196

97+
//Fixes "unnesting" where only one side is multi.
98+
for (c, s) in &left_datatypes {
99+
if s.is_multi() {
100+
if let Some(sr) = right_datatypes.get(c) {
101+
if !sr.is_multi() && !sr.is_none() {
102+
let rbt = sr.get_base_type().unwrap();
103+
let fcn = rbt.field_col_name();
104+
let colname = format!("{c}{}", &fcn);
105+
right_mappings = right_mappings.rename([c], [&colname], true);
106+
right_exploded.insert(c.clone(), (vec![fcn], vec![colname]));
107+
}
108+
}
109+
}
110+
}
111+
82112
let mut on = vec![];
83113
let mut no_join = false;
84114
for (c, left_type) in &left_datatypes {
@@ -483,16 +513,7 @@ pub fn create_join_compatible_solution_mappings(
483513
}
484514
JoinType::Left => {
485515
if left_dt.map.contains_key(base_right) {
486-
right_mappings = right_mappings.with_column(
487-
convert_lf_col_to_multitype(col(v), right_dt).alias(v),
488-
);
489-
new_right_states.insert(
490-
v.clone(),
491-
RDFNodeState::from_bases(
492-
base_right.clone(),
493-
right_dt.map.get(base_right).unwrap().clone(),
494-
),
495-
);
516+
//Do nothing
496517
} else {
497518
right_mappings = right_mappings.filter(lit(false)).with_column(
498519
lit(LiteralValue::untyped_null())

lib/query_processing/src/graph_patterns/union.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use crate::errors::QueryProcessingError;
33
use polars::prelude::{as_struct, col, concat_lf_diagonal, lit, LiteralValue, UnionArgs};
44
use representation::cats::Cats;
55
use representation::solution_mapping::SolutionMappings;
6-
use representation::{
7-
RDFNodeState, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD,
8-
};
9-
use std::collections::{HashMap};
6+
use representation::{RDFNodeState, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD};
7+
use std::collections::HashMap;
108
use std::sync::Arc;
119

1210
pub fn union(
@@ -108,8 +106,8 @@ pub fn union(
108106
} in sms
109107
{
110108
to_concat.push(mappings);
111-
for (k,v) in rdf_node_types {
112-
target_state.insert(k,v);
109+
for (k, v) in rdf_node_types {
110+
target_state.insert(k, v);
113111
}
114112
}
115113
let output_mappings = concat_lf_diagonal(

lib/representation/src/cats.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use std::sync::Arc;
2020
const SUBJECT_PREFIX_COL_NAME: &str = "subject_prefix";
2121
const OBJECT_PREFIX_COL_NAME: &str = "object_prefix";
2222

23-
pub const OBJECT_ARG_SORT_COL_NAME: &str = "object_arg_sort";
24-
pub const SUBJECT_ARG_SORT_COL_NAME: &str = "subject_arg_sort";
23+
pub const OBJECT_RANK_COL_NAME: &str = "object_rank";
24+
pub const SUBJECT_RANK_COL_NAME: &str = "subject_rank";
2525

2626
pub struct CatTriples {
2727
pub encoded_triples: Vec<EncodedTriples>,

lib/triplestore/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ rayon.workspace = true
1919
sprs = { workspace = true, features = ["multi_thread"] }
2020
oxrdf.workspace = true
2121
polars = { workspace = true, features = [
22-
"ipc", "range",
22+
"ipc", "range", "rank",
2323
"is_unique","merge_sorted", "new_streaming", "zip_with", "nightly", "performant", "cse", "semi_anti_join", "abs", "round_series", "lazy", "concat_str", "is_in",
2424
"dtype-full", "strings", "rows", "timezones", "polars-time", "temporal", "list_eval", "partition_by", "parquet",
2525
"diagonal_concat", "cum_agg"], default-features = false }

lib/triplestore/src/dblf.rs

Lines changed: 2 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ use crate::storage::Triples;
44
use oxrdf::{NamedNode, Subject, Term};
55
use polars::prelude::{as_struct, by_name, col, concat, lit, IntoLazy, LazyFrame, UnionArgs};
66
use polars_core::prelude::{Column, DataFrame};
7-
use query_processing::expressions::{
8-
blank_node_enc, maybe_literal_enc, named_node_enc,
9-
};
7+
use query_processing::expressions::{blank_node_enc, maybe_literal_enc, named_node_enc};
108
use query_processing::type_constraints::PossibleTypes;
119
use representation::cats::{named_node_split_prefix, Cats};
1210
use representation::multitype::all_multi_cols;
@@ -155,12 +153,8 @@ impl Triplestore {
155153
include_transient: bool,
156154
) -> Result<SolutionMappings, SparqlError> {
157155
let predicate_uris = predicate_uris.unwrap_or(self.all_predicates());
156+
158157
let mut sms = vec![];
159-
if let Some(s) = &subject_keep_rename {
160-
if s == "b" {
161-
//panic!("Panik b multi");
162-
}
163-
}
164158
if !(objects.is_some() && objects.as_ref().unwrap().is_empty())
165159
|| !(subjects.is_some() && subjects.as_ref().unwrap().is_empty())
166160
{
@@ -191,78 +185,6 @@ impl Triplestore {
191185
let mut object_types = HashMap::new();
192186
let mut accumulated_heights = 0usize;
193187

194-
// // This part is to work around a performance bug in Polars.
195-
// // Still present..
196-
// if predicate_uris_len > 1 && (subjects.is_some() || objects.is_some()) {
197-
// sms = sms
198-
// .into_par_iter()
199-
// .map(|sm| {
200-
// let HalfBakedSolutionMappings {
201-
// mut mappings,
202-
// verb,
203-
// subject_type,
204-
// object_type,
205-
// height_upper_bound: _,
206-
// } = sm;
207-
// if subject_type.is_some() {
208-
// mappings =
209-
// mappings.with_column(col(SUBJECT_COL_NAME).cast(DataType::String));
210-
// }
211-
// if let Some(object_type) = &object_type {
212-
// if object_type.is_lang_string() {
213-
// mappings = mappings.with_column(
214-
// as_struct(vec![
215-
// col(OBJECT_COL_NAME)
216-
// .struct_()
217-
// .field_by_name(LANG_STRING_VALUE_FIELD)
218-
// .cast(DataType::String)
219-
// .alias(LANG_STRING_VALUE_FIELD),
220-
// col(OBJECT_COL_NAME)
221-
// .struct_()
222-
// .field_by_name(LANG_STRING_LANG_FIELD)
223-
// .cast(DataType::String)
224-
// .alias(LANG_STRING_LANG_FIELD),
225-
// ])
226-
// .alias(OBJECT_COL_NAME),
227-
// )
228-
// } else if object_type.polars_data_type() == DataType::String {
229-
// mappings = mappings
230-
// .with_column(col(OBJECT_COL_NAME).cast(DataType::String));
231-
// }
232-
// }
233-
//
234-
// let df = mappings.collect().unwrap();
235-
// let height_upper_bound = df.height();
236-
// mappings = df.lazy();
237-
// let mut rdf_node_types = HashMap::new();
238-
// if let Some(subject_type) = &subject_type {
239-
// rdf_node_types.insert(
240-
// SUBJECT_COL_NAME.to_string(),
241-
// subject_type.clone().into_rdf_node_type(),
242-
// );
243-
// }
244-
// if let Some(object_type) = &object_type {
245-
// rdf_node_types.insert(
246-
// OBJECT_COL_NAME.to_string(),
247-
// object_type.clone().into_rdf_node_type(),
248-
// );
249-
// }
250-
// mappings = lf_columns_to_polars_categorical(
251-
// mappings,
252-
// &rdf_node_types,
253-
// CategoricalOrdering::Physical,
254-
// );
255-
//
256-
// HalfBakedSolutionMappings {
257-
// mappings,
258-
// verb,
259-
// subject_type,
260-
// object_type,
261-
// height_upper_bound,
262-
// }
263-
// })
264-
// .collect();
265-
// }
266188
for HalfBakedSolutionMappings {
267189
mut mappings,
268190
verb,

lib/triplestore/src/lib.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ use fts::FtsIndex;
1818
use log::trace;
1919
use oxrdf::vocab::{rdf, rdfs, xsd};
2020
use oxrdf::NamedNode;
21-
use polars::prelude::{arg_sort_by, col, AnyValue, DataFrame, IntoLazy};
21+
use polars::prelude::{arg_sort_by, col, AnyValue, DataFrame, IntoLazy, RankMethod, RankOptions};
2222
use polars_core::prelude::SortMultipleOptions;
2323
use query_processing::type_constraints::ConstraintBaseRDFNodeType;
2424
use rayon::iter::ParallelDrainRange;
2525
use rayon::iter::{IntoParallelIterator, ParallelIterator};
2626
use representation::cats::{
27-
cat_encode_triples, decode_expr, CatTriples, CatType, Cats, OBJECT_ARG_SORT_COL_NAME,
27+
cat_encode_triples, decode_expr, CatTriples, CatType, Cats, OBJECT_RANK_COL_NAME,
28+
SUBJECT_RANK_COL_NAME,
2829
};
2930
use representation::multitype::set_struct_all_null_to_null_row;
3031
use representation::solution_mapping::{BaseCatState, EagerSolutionMappings};
@@ -526,19 +527,29 @@ pub fn prepare_add_triples_par(
526527
let object_index = can_and_should_index_object(&t.object_type, &t.predicate, indexing);
527528
if object_index {
528529
// Only create O,S - but no need to deduplicate
529-
lf = lf.with_column(
530-
arg_sort_by(
531-
vec![obj_col_expr, subj_col_expr],
532-
SortMultipleOptions {
533-
descending: vec![false, false],
534-
nulls_last: vec![false, false],
535-
multithreaded: true,
536-
maintain_order: false,
537-
limit: None,
538-
},
530+
lf = lf
531+
.with_column(
532+
obj_col_expr
533+
.rank(
534+
RankOptions {
535+
method: RankMethod::Min,
536+
descending: false,
537+
},
538+
None,
539+
)
540+
.alias(OBJECT_RANK_COL_NAME),
539541
)
540-
.alias(OBJECT_ARG_SORT_COL_NAME),
541-
);
542+
.with_column(
543+
subj_col_expr
544+
.rank(
545+
RankOptions {
546+
method: RankMethod::Min,
547+
descending: false,
548+
},
549+
None,
550+
)
551+
.alias(SUBJECT_RANK_COL_NAME),
552+
);
542553
}
543554
t.df = lf.collect().unwrap();
544555
t

lib/triplestore/src/storage.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use polars::prelude::{
99
};
1010
use polars_core::datatypes::AnyValue;
1111
use polars_core::frame::DataFrame;
12-
use polars_core::prelude::{
13-
IntoColumn, Series, SortMultipleOptions, UInt32Chunked,
14-
};
12+
use polars_core::prelude::{IntoColumn, Series, SortMultipleOptions, UInt32Chunked};
1513
use polars_core::series::SeriesIter;
1614
use rayon::iter::{IntoParallelIterator, ParallelIterator};
17-
use representation::cats::{rdf_split_iri_str, CatEncs, Cats, OBJECT_ARG_SORT_COL_NAME};
15+
use representation::cats::{
16+
rdf_split_iri_str, CatEncs, Cats, OBJECT_RANK_COL_NAME, SUBJECT_RANK_COL_NAME,
17+
};
1818
use representation::{
1919
BaseRDFNodeType, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD, OBJECT_COL_NAME,
2020
SUBJECT_COL_NAME,
@@ -28,7 +28,7 @@ use std::time::Instant;
2828
const OFFSET_STEP: usize = 100;
2929
const MIN_SIZE_CACHING: usize = 100_000_000; //100MB
3030

31-
#[derive(Clone)]
31+
#[derive(Clone, Debug)]
3232
struct SparseIndex {
3333
map: BTreeMap<String, usize>,
3434
}
@@ -347,6 +347,7 @@ impl TriplesSegment {
347347
} else {
348348
None
349349
};
350+
350351
return sorted.get_lazy_frames(offsets);
351352
}
352353
}
@@ -448,16 +449,18 @@ fn create_indices(
448449
if should_index_by_objects {
449450
let object_now = Instant::now();
450451
df = df
451-
.sort(
452-
vec![PlSmallStr::from_str(OBJECT_ARG_SORT_COL_NAME)],
452+
.lazy()
453+
.sort_by_exprs(
454+
[col(OBJECT_RANK_COL_NAME), col(SUBJECT_RANK_COL_NAME)],
453455
SortMultipleOptions {
454456
descending: vec![false, false],
455457
nulls_last: vec![false, false],
456-
multithreaded: false,
458+
multithreaded: true,
457459
maintain_order: false,
458460
limit: None,
459461
},
460462
)
463+
.collect()
461464
.unwrap();
462465
let object_encs = get_encs(cats, obj_type);
463466
let sparse = create_sparse_index(

0 commit comments

Comments
 (0)