-
Notifications
You must be signed in to change notification settings - Fork 1.1k
WIP : Channel Adapters for Redis Streams #3227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope you find my superficial review sufficient and we are good to continue a work here.
Thank you for your effort!
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
@akuma8 , please, don't squash commits when you push them to the PR: it is going to be much easier for us to review when change history is present. See more info about good commits in Chris Beams' article: https://chris.beams.io/posts/git-commit/ Thanks for understanding. P.S. for now it is OK because PR is small yet, but it is going to grow I believe. |
@artembilan Thank you for the remark, I will be careful in the future. It's a personal habit. |
I'm in review process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know I have just discovered for myself a ReactiveStreamOperations
in Spring Data Redis.
I my experience it is better to develop Reactive component and let it to be used in the blocking scenarios.
Can we revise your solution in favor of ReactiveStreamOperations
?
See ReactiveMongoDbStoringMessageHandler
for example.
The RedisStreamInboundChannelAdapter
could also be based on the ReactiveStreamOperations.read()
. The returned Flux
could be damped into a ReactiveStreamsSubscribableChannel
if configured that way or as regular send()
otherwise.
Let me know WDYT?
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/redis/outbound/RedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/redis/outbound/RedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/redis/outbound/RedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/springframework/integration/redis/outbound/RedisStreamMessageHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we revise your solution in favor of
ReactiveStreamOperations
?
Sure, I saw ReactiveStreamOperations
in the doc but I thought it only be relevant to use it in a full reactive context which is not our case but I will revise my first solution if this one is better.
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...n/java/org/springframework/integration/redis/outbound/RedisStreamOutboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/springframework/integration/redis/inbound/RedisStreamInboundChannelAdapter.java
Outdated
Show resolved
Hide resolved
@artembilan I tried the reactive approach without success. I followed this https://docs.spring.io/spring-data/redis/docs/2.3.0.M4/reference/html/#redis.streams.receive.containers with |
Hi, @akuma8 ! Thank you for update! I'll review your solution and try to figure problems the next week. Right now I'm working on some reactive streams functionality for the |
Hi @artembilan, |
I have just raised a PR #3240 about reactive Message Producer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @artembilan |
Stay safe, @akuma8 , and get well soon! No need to be in a hurry: looks like no pressure for the feature yet. So, we are good to bring it whenever it is ready. When you come back from your sick time off, please, divide this into two PRs - inbound & outbound. At a glance I haven't looked into an inbound part because wanted to be sure that Now it is merged to master and you can start looking into the Fingers crossed for your health! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @akuma8 !
Any chances to see an update from you on the matter?
What we have so far and what we still need to do to have that Redis Stream Inbound Channel Adapter?
Thanks
Hi @artembilan, |
Hi @akuma8 ! No problem with your load! Just let me know that you can make it before September or should I pull this task from you and fix myself! |
Yes, I think so. If we keep doing it like we did for the outbound side I can make it before September. |
Just did: ebec500 Upgraded to Spring Data |
Hi @artembilan, |
Yes, if that is possible, I'd prefer to have a one inbound component with a Thanks |
8deed7a
to
9c482ec
Compare
- Changed the imperative `StreamMessageListenerContainer` listener in favor of the reactive one `StreamReceiver` - Added RedisHeaders values for Redis Stream Outbound: - Added a new constructor taking `Expression` as parameter - Changed the imperative `StreamOperations` in favor of `ReactiveStreamOperations` - Renaming class from `RedisStreamMessageHandler` to `ReactiveRedisStreamMessageHandler` Took into account other improvement remarks: copyright, setters position, methods names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good progress, but definitely there is still some work to do: tests, docs, JavaDocs etc.
Thanks
@@ -0,0 +1,171 @@ | |||
/* | |||
* Copyright 2020-2021 the original author or authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can merge this still in 2020
😄
import org.springframework.util.StringUtils; | ||
|
||
/** | ||
* Read Message from a Redis Stream and publish it to the indicated output channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@since 5.4
and your name to @author
private Message<?> convertMessage( Record<Object, Object> record ) { | ||
Map<String, Object> headers = new HashMap<>(); | ||
headers.put( RedisHeaders.STREAM_KEY, record.getStream() ); | ||
headers.put( RedisHeaders.STREAM_MESSAGE_ID, record.getId() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need also populate a consumerGroup
header.
Map<String, Object> headers = new HashMap<>(); | ||
headers.put( RedisHeaders.STREAM_KEY, record.getStream() ); | ||
headers.put( RedisHeaders.STREAM_MESSAGE_ID, record.getId() ); | ||
return this.messageConverter.toMessage( record.getValue(), new MessageHeaders( headers ) ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Doesn't look like we do something fancy with the converter. What is wrong with just using a plain getMessageBuilderFactory()
?
this.createGroupIfNotExist = createGroupIfNotExist; | ||
} | ||
|
||
public void setReceiver( StreamReceiver receiver ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this option?
Looks like you create it in the registerListener()
anyway.
this.readOffset = readOffset; | ||
} | ||
|
||
void setTemplate( RedisTemplate template ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to set this one?
Why can't we create it based on the provided connection factory?
this.consumerName = consumerName; | ||
} | ||
|
||
public void setCreateGroupIfNotExist( boolean createGroupIfNotExist ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just call it setCreateGroup()
and explain a behavior in JavaDocs.
} | ||
catch ( Exception e ) { | ||
// An exception is thrown when the group already exists | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to just log INFO that group already exist.
Anyway it's probably better to look into a ReactiveRedisTemplate
and perform the creation operation together with subsequent receiver as a reactive stream composition.
@@ -45,4 +45,8 @@ private RedisHeaders() { | |||
|
|||
public static final String MESSAGE_SOURCE = PREFIX + "messageSource"; | |||
|
|||
public static final String STREAM_KEY = PREFIX + "streamKey"; | |||
|
|||
public static final String STREAM_MESSAGE_ID = PREFIX + "streamMessageID"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe streamMessageId
? No ID
, please.
Assert.hasText( consumerName, "'consumerName' must be set" ); | ||
Consumer consumer = Consumer.from( this.consumerGroupName, this.consumerName ); | ||
this.receiver | ||
.receiveAutoAck( consumer, offset ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we also need to think about manual acknowledgement:
* Every record must be acknowledged using
* {@link org.springframework.data.redis.connection.ReactiveStreamCommands#xAck(ByteBuffer, String, String...)}
See receive()
API for the StreamReceiver
.
This has to be populated as a IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
.
And also see SimpleAcknowledgment
abstraction to wrap the call to that xAck
. for the particular record returned.
Saying that I'd expect an autoAck(true|false)
option on this channel adapter.
Hi @artembilan, |
OK. So, I'm closing this one to avoid confusion. Thanks for confirmation! |
#3226
The PR is in WIP because I need information for improvements.