Skip to content

Feature request: mongo change stream #2788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Lewik opened this issue Mar 6, 2019 · 7 comments
Closed

Feature request: mongo change stream #2788

Lewik opened this issue Mar 6, 2019 · 7 comments

Comments

@Lewik
Copy link

Lewik commented Mar 6, 2019

Mongo change stream is stream with changes from database. It's pushed from db side. It's not pulled by "us". So this source is very helpful to build live interfaces.

So SI inbound-change-stream-adapter will be great thing. I'd like to make this by myself, but I haven't enough knowledge to do this %).

https://docs.mongodb.com/manual/changeStreams/
https://github.com/spring-projects/spring-data-examples/tree/master/mongodb/change-streams (container sould be autostarted)

@artembilan artembilan added this to the 5.2.x milestone Mar 6, 2019
@artembilan
Copy link
Member

That's great request, @Lewik !

I think we need to consider to make it reactive like that sample you are pointing:

Flux changeStream = reactiveTemplate
	.changeStream(newAggregation(match(where("operationType").is("insert"))),
				Person.class, ChangeStreamOptions.empty(), "person");

When an outputChannel is FluxMessageChannel we are good just to subscribe to that Flux form MongoDB.

If it is not a FluxMessageChannel we just need to subscribe immediately and poll messages to the channel via regular send().

The ChangeStreamOptions must be as an optional configuration option to our MongoDbChangeStreamChannelAdapter alongside with the ReactiveMongoOperations.

@Lewik
Copy link
Author

Lewik commented Mar 6, 2019

I've just implemented change stream messages to si queue channel. Want to share some conclusions:

  • as described in https://docs.mongodb.com/manual/changeStreams/ : change streams needs mongo with replica set
  • we need to provide fabric for reactiveTemplate.
  • I think it's ok to pass raw mongo messages ChangeStreamDocument in si message bodies without any additional modification (without si headers). Or make option to enrich headers with ChangeStreamDocument data and leave only Document in body. (Headers with same name could be different types in different message types)
  • There is a message with type INVALIDATE that will close change stream cursor. (change stream is a cursor of oplog.rs) https://docs.mongodb.com/manual/reference/change-events/#invalidate-event We should provide a reconnection strategy.
  • change streams can create a lot of messages. I think we should provide strategy how to consume all these messages. Like "how to drain all available messages from channel to handle them all as pack"

Useful links:
https://docs.mongodb.com/manual/changeStreams/
https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/
https://docs.mongodb.com/manual/reference/change-events/
https://github.com/spring-projects/spring-data-examples/tree/master/mongodb/change-streams (container sould be autostarted)

@artembilan
Copy link
Member

You cannot resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.

So, our channel adapter should go to the stop state with an appropriate application event emission without any possibility to restart.
We should just document this case with a caution that current channel adapter configuration won't make sense after restart.

I think I don't care about container since reactive variant doesn't advertise it.
Looks like returned ChangeStreamEvent is fully enough to remap to the Message with converted payload and appropriate headers from the ChangeStreamEvent getters.

Does it make sense to you?

@Lewik
Copy link
Author

Lewik commented Mar 6, 2019

I agree except this:

So, our channel adapter should go to the stop state with an appropriate application event emission without any possibility to restart.

INVALIDATE as result of dropping or renaming collection could be normal use case in nosql. Not a system or maintenance kind.

@artembilan
Copy link
Member

OK. So, we may consider to "re-stream" with some fallback hook provided.
Buy default we just stop, but when it is provided we are good to react to a new collection according that hook result.

We may consider it as an improvement when we already have a basic adapter implementation.

@Lewik
Copy link
Author

Lewik commented Mar 7, 2019

@artembilan Thank you very much

@artembilan artembilan modified the milestones: 5.3.x, 5.4.x Mar 31, 2020
@artembilan artembilan self-assigned this Apr 3, 2020
@artembilan artembilan modified the milestones: 5.4.x, 5.3 RC1 Apr 3, 2020
artembilan added a commit to artembilan/spring-integration that referenced this issue Apr 5, 2020
Fixes spring-projects#2788

* Introduce a `MessageProducerSupport.subscribeToPublisher(Publisher<Message<?>>)`
for components which produces `Flux` for data from their source
* Such a component is auto-stopped when subscription to that `Publisher` is canceled
* Implement a `MongoDbChangeStreamMessageProducer` based on the reactive support for
in Spring Data MongoDb
* Implement a Java DSL for `MongoDbChangeStreamMessageProducer`
* Disable a test for change stream since it requires server of version 4.x started with 'replSet' option
artembilan added a commit to artembilan/spring-integration that referenced this issue Apr 5, 2020
Fixes spring-projects#2788

* Introduce a `MessageProducerSupport.subscribeToPublisher(Publisher<Message<?>>)`
for components which produces `Flux` for data from their source
* Such a component is auto-stopped when subscription to that `Publisher` is canceled
* Implement a `MongoDbChangeStreamMessageProducer` based on the reactive support for
in Spring Data MongoDb
* Implement a Java DSL for `MongoDbChangeStreamMessageProducer`
* Disable a test for change stream since it requires server of version 4.x started with 'replSet' option
* Add `MongoHeaders` for change stream events
artembilan added a commit to artembilan/spring-integration that referenced this issue Apr 5, 2020
Fixes spring-projects#2788

* Introduce a `MessageProducerSupport.subscribeToPublisher(Publisher<Message<?>>)`
for components which produces `Flux` for data from their source
* Such a component is auto-stopped when subscription to that `Publisher` is canceled
* Implement a `MongoDbChangeStreamMessageProducer` based on the reactive support for
in Spring Data MongoDb
* Implement a Java DSL for `MongoDbChangeStreamMessageProducer`
* Disable a test for change stream since it requires server of version 4.x started with 'replSet' option
* Add `MongoHeaders` for change stream events
@artembilan
Copy link
Member

Please, see a PR on the matter: #3240

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants