Skip to content

PostgresSubscribableChannel initial message polling dies due to exception #8770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
joshiste opened this issue Oct 20, 2023 · 3 comments · Fixed by #8777
Closed

PostgresSubscribableChannel initial message polling dies due to exception #8770

joshiste opened this issue Oct 20, 2023 · 3 comments · Fixed by #8777

Comments

@joshiste
Copy link
Contributor

joshiste commented Oct 20, 2023

Currently, on the first subscribe of a MessageHandler, notifyUpdate() is called to process messages already in the message store.
The problem with notifyUpdate() is that the try-catch block is outside the loop, so the loop will die on an exception, leaving further messages unprocessed.
When it receives notifications for inserted messages, this is not a problem because those notifications will submit a new task to poll further messages.

We could move the try-catch inside the loop to prevent it from dying. (If we don't do this, we might as well remove the try-catch, as it's good for nothing)
This would be fine for non-transactional usage. Messages are all processed and removed, and eventually, the loop stops.

This solution is a problem for transactional usage: The loop will never terminate, and the same message will be read repeatedly. For incoming notifications, we'd start further tasks that will go looping and exhaust the thread pool.
As of today, with transactional usage and without any other measures, messages producing an exception will also effectively block the channel (but not exhaust the thread pool)

Here is a reproducing test case and the mentioned solution. I'm not very lucky with that solution but currently have no better idea.

@joshiste joshiste added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Oct 20, 2023
@joshiste
Copy link
Contributor Author

joshiste commented Oct 20, 2023

Hmm maybe add to option to set a recovery callback that we pass to the retry template?

@artembilan
Copy link
Member

I think I agree that we don't need that try..catch in that notifyUpdate(): the message would be already removed from DB anyway, so we are good to move on to the next one in the queue.
I also think that you are right and we have to expose a RecoveryCallback<T> recoveryCallback option to mitigate retry exhaust.
By default the error log should be enough.
It also looks like that RecoveryCallback must always return something to make our loop to continue working.

I think I'll revert my recent change to the askForMessage() since with just status.setRollbackOnly() we don't re-throw an exception to the RetryTemplate 🤷

@artembilan artembilan added in: jdbc and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Oct 20, 2023
@artembilan artembilan added this to the 6.2.0 milestone Oct 20, 2023
@artembilan
Copy link
Member

artembilan commented Oct 20, 2023

No, I'm taking that back.
My logic is like this:

		.filter(message -> {
											if (!this.hasHandlers) {
												status.setRollbackOnly();
												return false;
											}
											return true;
										})

So, that applies as an optimization in case we don't have subscribers to dispatch message any more.

@artembilan artembilan self-assigned this Oct 25, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 25, 2023
Fixes spring-projects#8770

The problem with the `PostgresSubscribableChannel.notifyUpdate()` is that the try-catch block is outside the loop,
so the loop will die on an exception, leaving further messages unprocessed.

* Add ``PostgresSubscribableChannel.errorHandler` option to be invoked
after a `RetryTemplate` and for every failed message.
* The `askForMessage()` new logic is to catch an exception on a message and call `errorHandler`
returning a `FALLBACK_STUB` to continue an outer loop in the `notifyUpdate()`

**Cherry-pick to `6.1.x` & `6.0.x`**
garyrussell pushed a commit that referenced this issue Oct 25, 2023
* GH-8770: Add `PostgresSubsChannel.errorHandler`

Fixes #8770

The problem with the `PostgresSubscribableChannel.notifyUpdate()` is that the try-catch block is outside the loop,
so the loop will die on an exception, leaving further messages unprocessed.

* Add ``PostgresSubscribableChannel.errorHandler` option to be invoked
after a `RetryTemplate` and for every failed message.
* The `askForMessage()` new logic is to catch an exception on a message and call `errorHandler`
returning a `FALLBACK_STUB` to continue an outer loop in the `notifyUpdate()`

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Rename private `PostgresSubscribableChannel.askForMessage()` method to more specific `pollAndDispatchMessage()`
garyrussell pushed a commit that referenced this issue Oct 25, 2023
* GH-8770: Add `PostgresSubsChannel.errorHandler`

Fixes #8770

The problem with the `PostgresSubscribableChannel.notifyUpdate()` is that the try-catch block is outside the loop,
so the loop will die on an exception, leaving further messages unprocessed.

* Add ``PostgresSubscribableChannel.errorHandler` option to be invoked
after a `RetryTemplate` and for every failed message.
* The `askForMessage()` new logic is to catch an exception on a message and call `errorHandler`
returning a `FALLBACK_STUB` to continue an outer loop in the `notifyUpdate()`

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Rename private `PostgresSubscribableChannel.askForMessage()` method to more specific `pollAndDispatchMessage()`
garyrussell pushed a commit that referenced this issue Oct 25, 2023
* GH-8770: Add `PostgresSubsChannel.errorHandler`

Fixes #8770

The problem with the `PostgresSubscribableChannel.notifyUpdate()` is that the try-catch block is outside the loop,
so the loop will die on an exception, leaving further messages unprocessed.

* Add ``PostgresSubscribableChannel.errorHandler` option to be invoked
after a `RetryTemplate` and for every failed message.
* The `askForMessage()` new logic is to catch an exception on a message and call `errorHandler`
returning a `FALLBACK_STUB` to continue an outer loop in the `notifyUpdate()`

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Rename private `PostgresSubscribableChannel.askForMessage()` method to more specific `pollAndDispatchMessage()`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants