Skip to content

Commit 97edea0

Browse files
committed
[#noissue] Fixed redis pubsub test
1 parent b419e90 commit 97edea0

File tree

4 files changed

+116
-53
lines changed

4 files changed

+116
-53
lines changed

redis/pom.xml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,16 @@
6262
</dependency>
6363

6464
<dependency>
65-
<groupId>junit</groupId>
66-
<artifactId>junit</artifactId>
65+
<groupId>org.junit.jupiter</groupId>
66+
<artifactId>junit-jupiter-api</artifactId>
6767
<scope>test</scope>
6868
</dependency>
69+
<dependency>
70+
<groupId>org.testcontainers</groupId>
71+
<artifactId>junit-jupiter</artifactId>
72+
<version>1.17.6</version>
73+
</dependency>
74+
6975
<dependency>
7076
<groupId>org.testcontainers</groupId>
7177
<artifactId>testcontainers</artifactId>

redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,8 @@ public Flux<S> request(D demand) {
6060

6161
static class LongTermSubConsumer<S> implements SubConsumer<SupplyMessage<S>> {
6262

63-
private static final Comparator<SupplyMessage<?>> supplyComparator = new Comparator<>() {
64-
@Override
65-
public int compare(SupplyMessage<?> o1, SupplyMessage<?> o2) {
66-
return o1.getSequence() - o2.getSequence();
67-
}
68-
};
63+
private static final Comparator<SupplyMessage<?>> supplyComparator =
64+
Comparator.comparing(el -> el.getSequence());
6965

7066
final Sinks.Many<S> sink;
7167
final Identifier demandId;
@@ -94,15 +90,15 @@ public boolean consume(SupplyMessage<S> supply) {
9490
if (supply.getSequence() == nextSequence) {
9591
consume0(supply);
9692
nextSequence += 1;
97-
} else {
98-
supplies.add(supply);
9993
while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) {
10094
final SupplyMessage<S> pended = supplies.poll();
10195
if (pended != null) {
10296
consume0(pended);
10397
nextSequence += 1;
10498
}
10599
}
100+
} else {
101+
supplies.add(supply);
106102
}
107103
}
108104
return true;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.redis.pubsub;
17+
18+
import com.navercorp.pinpoint.pubsub.endpoint.PubSubClientFactory;
19+
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
20+
import org.junit.jupiter.api.BeforeAll;
21+
import org.junit.jupiter.api.DisplayName;
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.api.extension.ExtendWith;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.test.context.ContextConfiguration;
26+
import org.springframework.test.context.junit.jupiter.SpringExtension;
27+
import org.testcontainers.containers.GenericContainer;
28+
import org.testcontainers.junit.jupiter.Container;
29+
import org.testcontainers.junit.jupiter.Testcontainers;
30+
import org.testcontainers.utility.DockerImageName;
31+
32+
import static com.navercorp.pinpoint.redis.pubsub.RedisStreamReqResTest.testPubSubServerClient;
33+
34+
/**
35+
* @author youngjin.kim2
36+
*/
37+
@DisplayName("req/res based on redis pubsub")
38+
@ExtendWith(SpringExtension.class)
39+
@ContextConfiguration(classes = {RedisPubSubConfig.class})
40+
@Testcontainers
41+
public class RedisPubSubReqResTest {
42+
43+
@Container
44+
@SuppressWarnings("resource")
45+
private static final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
46+
.withExposedPorts(6379)
47+
.withReuse(true);
48+
49+
@Autowired
50+
private PubSubServerFactory serverFactory;
51+
52+
@Autowired
53+
private PubSubClientFactory clientFactory;
54+
55+
@BeforeAll
56+
public static void beforeAll() {
57+
System.setProperty("spring.data.redis.host", redisContainer.getHost());
58+
System.setProperty("spring.redis.host", redisContainer.getHost());
59+
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
60+
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
61+
}
62+
63+
@DisplayName("req/res based on redis pubsub")
64+
@Test
65+
public void testRedisPubSub() {
66+
testPubSubServerClient(this.serverFactory, this.clientFactory);
67+
}
68+
69+
}

redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java renamed to redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisStreamReqResTest.java

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@
2121
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
2222
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor;
2323
import com.navercorp.pinpoint.redis.stream.RedisStreamConfig;
24+
import org.junit.jupiter.api.BeforeAll;
2425
import org.junit.jupiter.api.DisplayName;
2526
import org.junit.jupiter.api.Test;
26-
import org.springframework.context.ApplicationContext;
27-
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.test.context.ContextConfiguration;
30+
import org.springframework.test.context.junit.jupiter.SpringExtension;
2831
import org.testcontainers.containers.GenericContainer;
32+
import org.testcontainers.junit.jupiter.Container;
33+
import org.testcontainers.junit.jupiter.Testcontainers;
2934
import org.testcontainers.utility.DockerImageName;
3035
import reactor.core.publisher.Flux;
3136
import reactor.core.publisher.Mono;
@@ -38,53 +43,39 @@
3843
/**
3944
* @author youngjin.kim2
4045
*/
41-
@DisplayName("req/res based on redis")
42-
public class RedisReqResTest {
46+
@DisplayName("req/res based on redis stream")
47+
@ExtendWith(SpringExtension.class)
48+
@ContextConfiguration(classes = {RedisStreamConfig.class})
49+
@Testcontainers
50+
public class RedisStreamReqResTest {
4351

44-
@DisplayName("req/res based on redis pubsub")
45-
@Test
46-
public void testRedisPubSub() {
47-
testConfigClass(RedisPubSubConfig.class);
52+
@Container
53+
@SuppressWarnings("resource")
54+
private static final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
55+
.withExposedPorts(6379)
56+
.withReuse(true);
57+
58+
@Autowired
59+
private PubSubServerFactory serverFactory;
60+
61+
@Autowired
62+
private PubSubClientFactory clientFactory;
63+
64+
@BeforeAll
65+
public static void beforeAll() {
66+
System.setProperty("spring.data.redis.host", redisContainer.getHost());
67+
System.setProperty("spring.redis.host", redisContainer.getHost());
68+
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
69+
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
4870
}
4971

5072
@DisplayName("req/res based on redis stream")
5173
@Test
5274
public void testRedisStreamPubSub() {
53-
testConfigClass(RedisStreamConfig.class);
54-
}
55-
56-
private void testConfigClass(Class<?> configClass) {
57-
runWithRedisContainer(() -> {
58-
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
59-
testServerClientFactory(
60-
context.getBean(PubSubServerFactory.class),
61-
context.getBean(PubSubClientFactory.class)
62-
);
63-
});
64-
}
65-
66-
@SuppressWarnings("resource")
67-
private void runWithRedisContainer(Runnable r) {
68-
try (final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
69-
.withExposedPorts(6379)
70-
.withReuse(true)
71-
) {
72-
redisContainer.start();
73-
System.setProperty("spring.data.redis.host", redisContainer.getHost());
74-
System.setProperty("spring.redis.host", redisContainer.getHost());
75-
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
76-
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
77-
78-
r.run();
79-
80-
redisContainer.stop();
81-
}
75+
testPubSubServerClient(this.serverFactory, this.clientFactory);
8276
}
8377

84-
private void testServerClientFactory(
85-
PubSubServerFactory serverFactory,
86-
PubSubClientFactory clientFactory
87-
) {
78+
static void testPubSubServerClient(PubSubServerFactory serverFactory, PubSubClientFactory clientFactory) {
8879
final PubSubMonoServiceDescriptor<String, String> greeterService =
8980
PubSubServiceDescriptor.mono("greeter", String.class, String.class);
9081
serverFactory.build(name -> Mono.just("Hello, " + name), greeterService).afterPropertiesSet();
@@ -99,9 +90,10 @@ private void testServerClientFactory(
9990
PubSubServiceDescriptor.flux("range", Integer.class, Integer.class);
10091
serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet();
10192
assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4));
93+
assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2));
10294
}
10395

104-
private <D, S> S syncRequestMono(
96+
static <D, S> S syncRequestMono(
10597
PubSubClientFactory clientFactory,
10698
PubSubMonoServiceDescriptor<D, S> descriptor,
10799
D demand
@@ -111,7 +103,7 @@ private <D, S> S syncRequestMono(
111103
.block();
112104
}
113105

114-
private <D, S> List<S> syncRequestFlux(
106+
static <D, S> List<S> syncRequestFlux(
115107
PubSubClientFactory clientFactory,
116108
PubSubFluxServiceDescriptor<D, S> descriptor,
117109
D demand

0 commit comments

Comments
 (0)