-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Outbound channel adapter for Redis Stream #3259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bdc6c65
to
6b7fca5
Compare
Hey, @akuma8 ! Glad to see you back! Hope you feel good and ready to go 😄 So, thank you first of all for your effort! There bear with us if we don't merge it soon enough. Please, take a look into Travis report:
We are against |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any chances to see real tests that after calling this ReactiveRedisStreamMessageHandler
we get some data in the Redis?
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
Hi @artembilan ! |
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
ce00199
to
c14ae67
Compare
Hi @artembilan,
Do you think that I should add a |
Yes, sounds reasonable! |
Please, rebase your branch to the latest master. It is now |
2935f7e
to
4a26f09
Compare
@artembilan rebase done. |
@artembilan There are 2 tests that I am unable to make them pass, the first one is:
The goal is to override the default The 2nd test is similar to the 1st one but instead of a list of strings I use this model
The result is the same, data are not deserialized. If we solve this, the inboud side will be simpler. Please let me know if you find the cause. Thanks a lot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a review for the current state of code.
I'll take a look into your failing tests when you address this review.
Right now the change is a bit confusing especially in test configuration, so highly possible that something is wrong with your tests.
Please, come back to me with fixes or objection and we will figure out the rest together.
Thank you!
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Show resolved
Hide resolved
...pringframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
8929515
to
d404aa8
Compare
Hi @artembilan,
|
Hi @akuma8 ! Thank you for coming back with some update! Today I'm busy with releases, but going to take a look into your effort tomorrow in the morning.
Well, it really fails even for me. Not sure yet what is going on. Some race condition, probably... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is some review.
Also, please, consider to fix the RedisAvailableRule
. The base.evaluate();
should go after that catch (Exception e) {
with Assume.assumeTrue()
.
Thank you!
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java
Show resolved
Hide resolved
...pringframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java
Outdated
Show resolved
Hide resolved
...pringframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java
Outdated
Show resolved
Hide resolved
@artembilan Thanks for your reviews. I made requested changes. I also change my tests, I use a
fails. |
Hi @artembilan,
Let me know if I have to make further changes. |
That is probably because Travis uses an old Redis version unlike your local machine. I recently installed Linux sub-system onto my Windows 10 and use the latest Redis 6.0.5 from there: works really well. So, let's don't bother ourselves for a while! I'll review your PR today: very likely this one is going to make it into the release this Wednesday. Thank you for keeping me busy! 😉 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/*@Test | ||
@RedisAvailable*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, let me know if this test works well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rebased but still have the same issue, the .opsForStream().read(....)
method returns null.
java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:132)
at org.springframework.data.redis.core.ReactiveStreamOperations.lambda$read$4(ReactiveStreamOperations.java:426)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:279)
at io.lettuce.core.output.StreamingOutput$Subscriber.onNext(StreamingOutput.java:64)
at io.lettuce.core.output.StreamReadOutput.complete(StreamReadOutput.java:92)
at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:401)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:707)
at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:671)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:666)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:587)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:556)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockFirst(Flux.java:2452)
at org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandlerTests.explicitSerializationContextTest(ReactiveRedisStreamMessageHandlerTests.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.integration.redis.rules.RedisAvailableRule$1.evaluate(RedisAvailableRule.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
I checked the stream and it well contains data:
1) "1593030360926-0"
2) 1) "_class"
2) "java.util.Arrays$ArrayList"
3) "a.[0]._class"
4) "java.lang.String"
5) "a.[0]"
6) "Hello"
7) "a.[1]._class"
8) "java.lang.String"
9) "a.[1]"
10) "stream"
11) "a.[2]._class"
12) "java.lang.String"
13) "a.[2]"
14) "message"
15) "modCount"
16) "0"
It seems like reading List
causes issue.
Thanks for your help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really likely a List.class
as a return type is not possible from that read(....)
.
I think we need to disable this test for time being.
…shing message to the actual stream using ReactiveStreamOperations
Some tests with explicit RedisSerializationContext fail.
…eactiveRedisStreamMessageHandlerTests#explicitSerializationContextTest
|
||
|
||
/*@Test | ||
@RedisAvailable*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really likely a List.class
as a return type is not possible from that read(....)
.
I think we need to disable this test for time being.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please run gradlew check
Task locally to see Checkstyle problems. Looks like we are good to merge this soon enough!
👍
* | ||
* @since 5.4 | ||
*/ | ||
@SuppressWarnings({"rawtypes", "unchecked"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May we kinda get rid of rawtypes
?
Probably simple wildcard for genercis (<?>
) should be enough for us...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing rawtypes
causes compilation issue:
error: no suitable method found for opsForStream(HashMapper<CAP#1,CAP#2,CAP#3>)
template.opsForStream(this.hashMapper);
^
method ReactiveRedisOperations.<HK#1,HV#1>opsForStream() is not applicable
(cannot infer type-variable(s) HK#1,HV#1
(actual and formal argument lists differ in length))
on
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() :
template.opsForStream(this.hashMapper);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let me play with that locally!
Then I'll merge it since we need something for today's release.
You feel free to go ahead with Inbound Channel Adapter implementation and some docs for both of this.
Thanks
|
||
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); | ||
|
||
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, consider to not exceed 120 line length: much easier to review.
Merged as afa79d8 after some clean up. Please, review carefully what I did with your code in the end. Thank you and looking forward for more! |
@artembilan Thanks for merging. I reviewed your modifications which seem cleaner than what I did. |
This is the outbound side of that PR: #3227 related to #3226
TODO:
I have to add more tests cases.