|
1 | 1 | package io.lettuce.core;
|
2 | 2 |
|
3 | 3 | import static io.lettuce.TestTags.UNIT_TEST;
|
| 4 | +import static java.util.concurrent.TimeUnit.MILLISECONDS; |
4 | 5 | import static org.assertj.core.api.Assertions.*;
|
5 | 6 |
|
6 | 7 | import java.nio.ByteBuffer;
|
7 | 8 | import java.util.List;
|
8 | 9 | import java.util.Map;
|
9 | 10 | import java.util.concurrent.CompletionStage;
|
10 | 11 |
|
| 12 | +import org.awaitility.Awaitility; |
11 | 13 | import org.junit.jupiter.api.Tag;
|
12 | 14 | import org.junit.jupiter.api.Test;
|
13 | 15 |
|
14 | 16 | import io.lettuce.core.output.CommandOutput;
|
15 | 17 | import io.lettuce.core.protocol.AsyncCommand;
|
16 | 18 | import io.lettuce.core.protocol.ProtocolVersion;
|
17 | 19 | import io.netty.channel.embedded.EmbeddedChannel;
|
| 20 | +import reactor.core.publisher.Mono; |
| 21 | +import reactor.core.publisher.Sinks; |
18 | 22 |
|
19 | 23 | /**
|
20 | 24 | * Unit tests for {@link RedisHandshake}.
|
@@ -106,6 +110,42 @@ void handshakeFireAndForgetPostHandshake() {
|
106 | 110 | assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse();
|
107 | 111 | }
|
108 | 112 |
|
| 113 | + @Test |
| 114 | + void handshakeDelayedCredentialProvider() { |
| 115 | + |
| 116 | + DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider(); |
| 117 | + // RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo", |
| 118 | + // "bar")).delayElement(Duration.ofMillis(3)); |
| 119 | + EmbeddedChannel channel = new EmbeddedChannel(true, false); |
| 120 | + |
| 121 | + ConnectionMetadata connectionMetdata = new ConnectionMetadata(); |
| 122 | + connectionMetdata.setLibraryName("library-name"); |
| 123 | + connectionMetdata.setLibraryVersion("library-version"); |
| 124 | + |
| 125 | + ConnectionState state = new ConnectionState(); |
| 126 | + state.setCredentialsProvider(cp); |
| 127 | + state.apply(connectionMetdata); |
| 128 | + RedisHandshake handshake = new RedisHandshake(null, false, state); |
| 129 | + CompletionStage<Void> handshakeInit = handshake.initialize(channel); |
| 130 | + cp.completeCredentials(RedisCredentials.just("foo", "bar")); |
| 131 | + |
| 132 | + Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds |
| 133 | + .pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds |
| 134 | + .until(() -> !channel.outboundMessages().isEmpty()); |
| 135 | + |
| 136 | + AsyncCommand<String, String, Map<String, String>> hello = channel.readOutbound(); |
| 137 | + helloResponse(hello.getOutput()); |
| 138 | + hello.complete(); |
| 139 | + |
| 140 | + List<AsyncCommand<String, String, Map<String, String>>> postHandshake = channel.readOutbound(); |
| 141 | + postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND); |
| 142 | + postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND)); |
| 143 | + postHandshake.get(0).complete(); |
| 144 | + |
| 145 | + assertThat(postHandshake.size()).isEqualTo(2); |
| 146 | + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); |
| 147 | + } |
| 148 | + |
109 | 149 | @Test
|
110 | 150 | void shouldParseVersionWithCharacters() {
|
111 | 151 |
|
@@ -136,4 +176,19 @@ private static void helloResponse(CommandOutput<String, String, Map<String, Stri
|
136 | 176 | output.set(ByteBuffer.wrap("1.2.3".getBytes()));
|
137 | 177 | }
|
138 | 178 |
|
| 179 | + static class DelayedRedisCredentialsProvider implements RedisCredentialsProvider { |
| 180 | + |
| 181 | + private final Sinks.One<RedisCredentials> credentialsSink = Sinks.one(); |
| 182 | + |
| 183 | + @Override |
| 184 | + public Mono<RedisCredentials> resolveCredentials() { |
| 185 | + return credentialsSink.asMono(); |
| 186 | + } |
| 187 | + |
| 188 | + public void completeCredentials(RedisCredentials credentials) { |
| 189 | + credentialsSink.tryEmitValue(credentials); |
| 190 | + } |
| 191 | + |
| 192 | + } |
| 193 | + |
139 | 194 | }
|
0 commit comments