Skip to content

Commit 9e01c84

Browse files
[Improve][Connector-V2] Support like predicate pushdown in paimon (#9653)
1 parent 5d214a7 commit 9e01c84

File tree

7 files changed

+180
-12
lines changed

7 files changed

+180
-12
lines changed

docs/en/connector-v2/source/Paimon.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ The file path of `hdfs-site.xml`
8585
### query [string]
8686

8787
The filter condition of the table read. For example: `select * from st_test where id > 100`. If not specified, all rows are read.
88-
Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in, not in, like(pattern matching with prefix only) ,and others are not supported.
88+
Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in, not in, like, and others are not supported.
8989
The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon.
9090
you can also project specific columns, for example: select id, name from st_test where id > 100.
9191
The limit will be supported in the future.

docs/zh/connector-v2/source/Paimon.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
8686

8787
读取表格的筛选条件,例如:`select * from st_test where id > 100`。如果未指定,则将读取所有记录。
8888

89-
目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in , not in, like(pattern matching with prefix only)`,其他暂不支持。
89+
目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in , not in, like`,其他暂不支持。
9090

9191
Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。
9292

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,31 @@ private static Predicate parseExpressionToPredicate(
252252
Object rightVal =
253253
convertValueByPaimonDataType(rowType, column.getColumnName(), rightPredicate);
254254

255-
Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
256-
Matcher matcher = BEGIN_PATTERN.matcher(rightVal.toString());
257-
if (matcher.matches()) {
258-
return builder.startsWith(columnIndex, BinaryString.fromString(matcher.group(1)));
259-
} else {
260-
throw new IllegalArgumentException(
261-
"Unsupported expression type: "
262-
+ expression.getClass().getSimpleName()
263-
+ ", only support like pattern matching with prefix");
255+
Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%$");
256+
Matcher beginMatcher = BEGIN_PATTERN.matcher(rightVal.toString());
257+
if (beginMatcher.matches()) {
258+
return builder.startsWith(
259+
columnIndex, BinaryString.fromString(beginMatcher.group(1)));
260+
}
261+
262+
Pattern END_PATTERN = Pattern.compile("^%([^%]+)");
263+
Matcher endMatcher = END_PATTERN.matcher(rightVal.toString());
264+
if (endMatcher.matches()) {
265+
return builder.endsWith(columnIndex, BinaryString.fromString(endMatcher.group(1)));
264266
}
267+
268+
Pattern CONTAINS_PATTERN = Pattern.compile("^%([^%]+)%$");
269+
Matcher containsMatcher = CONTAINS_PATTERN.matcher(rightVal.toString());
270+
if (containsMatcher.matches()) {
271+
return builder.contains(
272+
columnIndex, BinaryString.fromString(containsMatcher.group(1)));
273+
}
274+
throw new IllegalArgumentException(
275+
String.format(
276+
"Invalid LIKE pattern: '%s'. Supported patterns are: 'prefix%%', '%%suffix', and '%%substring%%'. "
277+
+ "Please ensure your pattern matches one of these formats.",
278+
rightVal.toString()));
279+
265280
} else if (expression instanceof Parenthesis) {
266281
Parenthesis parenthesis = (Parenthesis) expression;
267282
return parseExpressionToPredicate(builder, rowType, parenthesis.getExpression());

seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public void testConvertSqlSelectToPaimonProjectionArrayWithStar() {
250250
}
251251

252252
@Test
253-
public void testConvertSqlWhereToPaimonPredicateWithStartWith() {
253+
public void testConvertSqlWhereToPaimonLikePredicate() {
254254
String query = "SELECT * FROM table WHERE varchar_col like 'te%'";
255255

256256
PlainSelect plainSelect = convertToPlainSelect(query);
@@ -264,5 +264,32 @@ public void testConvertSqlWhereToPaimonPredicateWithStartWith() {
264264
Predicate expectedPredicate = PredicateBuilder.or(builder.startsWith(1, "te"));
265265

266266
assertEquals(expectedPredicate.toString(), predicate.toString());
267+
268+
query = "SELECT * FROM table WHERE varchar_col like '%st'";
269+
270+
plainSelect = convertToPlainSelect(query);
271+
predicate =
272+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
273+
rowType, plainSelect);
274+
275+
assertNotNull(predicate);
276+
277+
builder = new PredicateBuilder(rowType);
278+
expectedPredicate = PredicateBuilder.or(builder.endsWith(1, "st"));
279+
280+
assertEquals(expectedPredicate.toString(), predicate.toString());
281+
282+
query = "SELECT * FROM table WHERE varchar_col like '%es%'";
283+
plainSelect = convertToPlainSelect(query);
284+
predicate =
285+
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
286+
rowType, plainSelect);
287+
288+
assertNotNull(predicate);
289+
290+
builder = new PredicateBuilder(rowType);
291+
expectedPredicate = PredicateBuilder.or(builder.contains(1, "es"));
292+
293+
assertEquals(expectedPredicate.toString(), predicate.toString());
267294
}
268295
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,12 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai
477477
Container.ExecResult readResult8 =
478478
container.executeJob("/paimon_to_assert_with_filter8.conf");
479479
Assertions.assertEquals(0, readResult8.getExitCode());
480+
Container.ExecResult readResult9 =
481+
container.executeJob("/paimon_to_assert_with_filter9.conf");
482+
Assertions.assertEquals(0, readResult9.getExitCode());
483+
Container.ExecResult readResult10 =
484+
container.executeJob("/paimon_to_assert_with_filter10.conf");
485+
Assertions.assertEquals(0, readResult10.getExitCode());
480486
}
481487

482488
@TestTemplate
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
Paimon {
25+
warehouse = "/tmp/seatunnel_mnt/paimon"
26+
database = "full_type"
27+
table = "st_test"
28+
query = "select * from st_test where c_string like '%string%'"
29+
plugin_output = paimon_source
30+
}
31+
}
32+
33+
sink {
34+
Assert {
35+
plugin_input = paimon_source
36+
rules {
37+
row_rules = [
38+
{
39+
rule_type = MAX_ROW
40+
rule_value = 3
41+
}
42+
{
43+
rule_type = MIN_ROW
44+
rule_value = 3
45+
}
46+
]
47+
field_rules = [
48+
{
49+
field_name = c_string
50+
field_type = string
51+
field_value = [
52+
{
53+
rule_type = NOT_NULL
54+
}
55+
]
56+
}
57+
]
58+
}
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
Paimon {
25+
warehouse = "/tmp/seatunnel_mnt/paimon"
26+
database = "full_type"
27+
table = "st_test"
28+
query = "select * from st_test where c_string like '%string2'"
29+
plugin_output = paimon_source
30+
}
31+
}
32+
33+
sink {
34+
Assert {
35+
plugin_input = paimon_source
36+
rules {
37+
row_rules = [
38+
{
39+
rule_type = MAX_ROW
40+
rule_value = 1
41+
}
42+
{
43+
rule_type = MIN_ROW
44+
rule_value = 1
45+
}
46+
]
47+
field_rules = [
48+
{
49+
field_name = c_string
50+
field_type = string
51+
field_value = [
52+
{
53+
rule_type = NOT_NULL
54+
}
55+
]
56+
}
57+
]
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)