Skip to content

Subscription: intro poll and prefetch v2 for tsfile topic #15790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,24 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
setProperty(
"subscription_prefetch_ts_file_batch_max_delay_in_ms",
String.valueOf(subscriptionPrefetchTsFileBatchMaxDelayInMs));
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
setProperty(
"subscription_prefetch_ts_file_batch_max_size_in_bytes",
String.valueOf(subscriptionPrefetchTsFileBatchMaxSizeInBytes));
return this;
}

@Override
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
setProperty("default_storage_group_level", String.valueOf(defaultStorageGroupLevel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,26 @@ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
dnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
subscriptionPrefetchTsFileBatchMaxDelayInMs);
cnConfig.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
subscriptionPrefetchTsFileBatchMaxDelayInMs);
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
dnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
subscriptionPrefetchTsFileBatchMaxSizeInBytes);
cnConfig.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
subscriptionPrefetchTsFileBatchMaxSizeInBytes);
return this;
}

@Override
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
dnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,16 @@ public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
int subscriptionPrefetchTsFileBatchMaxDelayInMs) {
return this;
}

@Override
public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
int subscriptionPrefetchTsFileBatchMaxSizeInBytes) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ CommonConfig setPipeConnectorRequestSliceThresholdBytes(

CommonConfig setQueryMemoryProportion(String queryMemoryProportion);

CommonConfig setSubscriptionPrefetchTsFileBatchMaxDelayInMs(
int subscriptionPrefetchTsFileBatchMaxDelayInMs);

CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
int subscriptionPrefetchTsFileBatchMaxSizeInBytes);

default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ protected void setUpConfig() {
sender.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
receiver1.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
receiver2.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);

// reduce tsfile batch memory usage
sender.getConfig().getCommonConfig().setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500);
sender
.getConfig()
.getCommonConfig()
.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
Expand Down Expand Up @@ -305,11 +307,6 @@ public void consume_data(SubscriptionTreePullConsumer consumer, Session session)
}
}

public List<Integer> consume_tsfile_withFileCount(
SubscriptionTreePullConsumer consumer, String device) throws InterruptedException {
return consume_tsfile(consumer, Collections.singletonList(device));
}

public int consume_tsfile(SubscriptionTreePullConsumer consumer, String device)
throws InterruptedException {
return consume_tsfile(consumer, Collections.singletonList(device)).get(0);
Expand Down Expand Up @@ -361,15 +358,6 @@ public List<Integer> consume_tsfile(SubscriptionTreePullConsumer consumer, List<
return results;
}

public void consume_data(SubscriptionTreePullConsumer consumer)
throws TException,
IOException,
StatementExecutionException,
InterruptedException,
IoTDBConnectionException {
consume_data(consumer, session_dest);
}

public void consume_data_await(
SubscriptionTreePullConsumer consumer,
Session session,
Expand All @@ -395,18 +383,72 @@ public void consume_data_await(
}

public void consume_tsfile_await(
SubscriptionTreePullConsumer consumer,
List<String> devices,
List<Integer> expected,
List<Boolean> allowGte) {
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
for (int i = 0; i < devices.size(); i++) {
counters.add(new AtomicInteger(0));
}
AWAIT.untilAsserted(
() -> {
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
if (messages.isEmpty()) {
session_src.executeNonQueryStatement("flush");
}
for (final SubscriptionMessage message : messages) {
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
for (int i = 0; i < devices.size(); i++) {
final Path path = new Path(devices.get(i), "s_0", true);
final QueryDataSet dataSet =
tsFileReader.query(
QueryExpression.create(Collections.singletonList(path), null));
while (dataSet.hasNext()) {
dataSet.next();
counters.get(i).addAndGet(1);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
consumer.commitSync(messages);
for (int i = 0; i < devices.size(); i++) {
if (allowGte.get(i)) {
assertGte(counters.get(i).get(), expected.get(i));
} else {
assertEquals(counters.get(i).get(), expected.get(i));
}
}
});
}

public void consume_tsfile_await(
SubscriptionTreePullConsumer consumer, List<String> devices, List<Integer> expected) {
consume_tsfile_await(
consumer,
devices,
expected,
Stream.generate(() -> false).limit(devices.size()).collect(Collectors.toList()));
}

public void consume_tsfile_with_file_count_await(
SubscriptionTreePullConsumer consumer, List<String> devices, List<Integer> expected) {
final List<AtomicInteger> counters = new ArrayList<>(devices.size());
for (int i = 0; i < devices.size(); i++) {
counters.add(new AtomicInteger(0));
}
AtomicInteger onReceived = new AtomicInteger(0);
AWAIT.untilAsserted(
() -> {
List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
if (messages.isEmpty()) {
session_src.executeNonQueryStatement("flush");
}
for (final SubscriptionMessage message : messages) {
onReceived.incrementAndGet();
final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler();
try (final TsFileReader tsFileReader = tsFileHandler.openReader()) {
for (int i = 0; i < devices.size(); i++) {
Expand All @@ -427,6 +469,7 @@ public void consume_tsfile_await(
for (int i = 0; i < devices.size(); i++) {
assertEquals(counters.get(i).get(), expected.get(i));
}
assertEquals(onReceived.get(), expected.get(devices.size()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/***
Expand Down Expand Up @@ -135,23 +136,13 @@ public void do_test()
List<String> devices = new ArrayList<>(2);
devices.add(device);
devices.add(device2);
List<Integer> results = consume_tsfile(consumer, devices);
assertEquals(results.get(0), 10);
assertEquals(results.get(1), 10);
consume_tsfile_await(consumer, devices, Arrays.asList(10, 10));
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions(topicName).size(), 0, "unsubscribe:show subscriptions");
consumer.subscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 1, "subscribe again:show subscriptions");
insert_data(1707782400000L, device); // 2024-02-13 08:00:00+08:00
insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
results = consume_tsfile(consumer, devices);
assertEquals(
results.get(0),
15,
"Unsubscribing and then re-subscribing will not retain progress. Full synchronization.");
assertEquals(
results.get(1),
15,
"Unsubscribing and then re-subscribing will not retain progress. Full synchronization.");
consume_tsfile_await(consumer, devices, Arrays.asList(15, 15));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/***
Expand Down Expand Up @@ -130,9 +132,8 @@ public void do_test()
// insert_data(1706659200000L); //2024-01-31 08:00:00+08:00
insert_data(System.currentTimeMillis());
// Consumption data
List<Integer> results = consume_tsfile_withFileCount(consumer, device);
assertEquals(results.get(0), 10);
assertEquals(results.get(1), 2, "number of received files");
consume_tsfile_with_file_count_await(
consumer, Collections.singletonList(device), Arrays.asList(10, 2));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after unsubscription");
Expand All @@ -142,14 +143,7 @@ public void do_test()
insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained when re-subscribing after cancellation. Full
// synchronization.
results = consume_tsfile_withFileCount(consumer, device);
assertEquals(
results.get(0),
15,
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
assertEquals(
results.get(1),
3,
"Number of received files: After unsubscribing and resubscribing, progress is not retained. Full synchronization.");
consume_tsfile_with_file_count_await(
consumer, Collections.singletonList(device), Arrays.asList(15, 3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -171,10 +172,8 @@ public void do_test()
paths.add(device);
paths.add(device2);
paths.add(database2 + ".d_2");
List<Integer> rowCountList = consume_tsfile(consumer, paths);
assertGte(rowCountList.get(0), 8);
assertEquals(rowCountList.get(1), 0);
assertEquals(rowCountList.get(2), 0);
consume_tsfile_await(
consumer, paths, Arrays.asList(8, 0, 0), Arrays.asList(true, false, false));

// Unsubscribe
consumer.unsubscribe(topicName);
Expand All @@ -188,12 +187,7 @@ public void do_test()

// Consumption data: Progress is not retained after unsubscribing and resubscribing. Full
// synchronization.
rowCountList = consume_tsfile(consumer, paths);
assertGte(
rowCountList.get(0),
13,
"Unsubscribe and then resubscribe, progress is not retained. Full synchronization.");
assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + device2);
assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + database2 + ".d_2");
consume_tsfile_await(
consumer, paths, Arrays.asList(13, 0, 0), Arrays.asList(true, false, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -172,11 +173,8 @@ public void do_test()
paths.add(device);
paths.add(device2);
paths.add(database2 + ".d_2");
List<Integer> rowCountList = consume_tsfile(consumer, paths);
// Subscribe and write without consuming
assertEquals(rowCountList.get(0), 5, "Write without consume after subscription");
assertEquals(rowCountList.get(1), 0);
assertEquals(rowCountList.get(2), 0);
consume_tsfile_await(consumer, paths, Arrays.asList(5, 0, 0));

// Unsubscribe
consumer.unsubscribe(topicName);
Expand All @@ -190,12 +188,6 @@ public void do_test()

// Consumption data: Progress is not retained after unsubscribing and re-subscribing. Full
// synchronization.
rowCountList = consume_tsfile(consumer, paths);
assertEquals(
rowCountList.get(0),
10,
"Unsubscribe and then resubscribe, progress is not retained. Full synchronization.");
assertEquals(rowCountList.get(1), 0, "Unsubscribe and then resubscribe," + device2);
assertEquals(rowCountList.get(2), 0, "Unsubscribe and then resubscribe," + database2 + ".d_2");
consume_tsfile_await(consumer, paths, Arrays.asList(10, 0, 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/***
Expand Down Expand Up @@ -161,12 +162,7 @@ public void do_test()
devices.add(device);
devices.add(device2);
devices.add(database2 + ".d_2");
List<Integer> results = consume_tsfile(consumer, devices);

assertEquals(results.get(0), 10);
assertEquals(results.get(1), 0);
assertEquals(results.get(2), 0);
assertEquals(results.get(3), 2, "number of received files");
consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(10, 0, 0, 2));
// Unsubscribe
consumer.unsubscribe(topicName);
assertEquals(subs.getSubscriptions().size(), 0, "show subscriptions after cancellation");
Expand All @@ -177,13 +173,6 @@ public void do_test()
insert_data(1707782400000L, device2); // 2024-02-13 08:00:00+08:00
// Consumption data: Progress is not retained after unsubscribing and re-subscribing. Full
// synchronization.
results = consume_tsfile(consumer, devices);
assertEquals(
results.get(0),
15,
"Unsubscribe and resubscribe, progress is not retained. Full synchronization.");
assertEquals(results.get(1), 0, "Unsubscribe and then subscribe again," + device2);
assertEquals(results.get(2), 0, "Subscribe again after cancellation," + database2 + ".d_2");
assertEquals(results.get(3), 3, "Number of received files: resubscribe after unsubscribe");
consume_tsfile_with_file_count_await(consumer, devices, Arrays.asList(15, 0, 0, 3));
}
}
Loading
Loading