Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -122,7 +121,7 @@ public void createTopics(final TestInfo testInfo) throws InterruptedException {
appId = safeUniqueTestName(testInfo);
}

private Properties props(final boolean stateUpdaterEnabled) {
private Properties props() {
final Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
Expand All @@ -134,7 +133,6 @@ private Properties props(final boolean stateUpdaterEnabled) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
properties.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
return properties;
}

Expand All @@ -151,10 +149,9 @@ private static void produceToInputTopics(final String topic, final Collection<Ke
IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, CLUSTER.time);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled) throws Exception {
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
@Test
public void shouldPauseAndResumeKafkaStreams() throws Exception {
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT);

Expand All @@ -176,10 +173,9 @@ public void shouldPauseAndResumeKafkaStreams(final boolean stateUpdaterEnabled)
assertTopicSize(OUTPUT_STREAM_1, 10);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
@Test
public void shouldAllowForTopologiesToStartPaused() throws Exception {
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();
waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
Expand All @@ -197,11 +193,10 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab
assertTopicSize(OUTPUT_STREAM_1, 5);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@Test
@SuppressWarnings("deprecation")
public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies() throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();

Expand Down Expand Up @@ -233,11 +228,10 @@ public void shouldPauseAndResumeKafkaStreamsWithNamedTopologies(final boolean st
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA2);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@Test
@SuppressWarnings("deprecation")
public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean stateUpdaterEnabled) throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies() throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();

Expand Down Expand Up @@ -270,11 +264,10 @@ public void shouldPauseAndResumeAllKafkaStreamsWithNamedTopologies(final boolean
assertTopicSize(OUTPUT_STREAM_2, 5);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@Test
@SuppressWarnings("deprecation")
public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdaterEnabled) throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props(stateUpdaterEnabled));
public void shouldAllowForNamedTopologiesToStartPaused() throws Exception {
streamsNamedTopologyWrapper = new KafkaStreamsNamedTopologyWrapper(props());
final NamedTopologyBuilder builder1 = getNamedTopologyBuilder1();
final NamedTopologyBuilder builder2 = getNamedTopologyBuilder2();

Expand All @@ -301,19 +294,18 @@ public void shouldAllowForNamedTopologiesToStartPaused(final boolean stateUpdate
awaitOutput(OUTPUT_STREAM_2, 5, COUNT_OUTPUT_DATA);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabled) throws Exception {
@Test
public void pauseResumeShouldWorkAcrossInstances() throws Exception {
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);

kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1);
kafkaStreams.pause();
kafkaStreams.start();

waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT);
assertTrue(kafkaStreams.isPaused());

kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2, stateUpdaterEnabled);
kafkaStreams2 = buildKafkaStreams(OUTPUT_STREAM_2);
kafkaStreams2.pause();
kafkaStreams2.start();
waitForApplicationState(singletonList(kafkaStreams2), State.REBALANCING, STARTUP_TIMEOUT);
Expand All @@ -331,12 +323,11 @@ public void pauseResumeShouldWorkAcrossInstances(final boolean stateUpdaterEnabl
awaitOutput(OUTPUT_STREAM_1, 5, COUNT_OUTPUT_DATA);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void pausedTopologyShouldNotRestoreStateStores(final boolean stateUpdaterEnabled) throws Exception {
final Properties properties1 = props(stateUpdaterEnabled);
@Test
public void pausedTopologyShouldNotRestoreStateStores() throws Exception {
final Properties properties1 = props();
properties1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
final Properties properties2 = props(stateUpdaterEnabled);
final Properties properties2 = props();
properties2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);

Expand Down Expand Up @@ -380,8 +371,8 @@ private void assertStreamsLocalStoreLagStaysConstant(final KafkaStreams streams)
assertEquals(stateStoreLag1, stateStoreLag2);
}

private KafkaStreams buildKafkaStreams(final String outputTopic, final boolean stateUpdaterEnabled) {
return buildKafkaStreams(outputTopic, props(stateUpdaterEnabled));
private KafkaStreams buildKafkaStreams(final String outputTopic) {
return buildKafkaStreams(outputTopic, props());
}

private KafkaStreams buildKafkaStreams(final String outputTopic, final Properties properties) {
Expand Down