Skip to content

Fix ProducerBusy or ConsumerBusy error when configuring multiple brokers per connection #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

BewareMyPower
Copy link
Contributor

Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in ConnectionPool to create or get the ClientConnection object from the pool.

ss << logicalAddress << '-' << randomDistribution_(randomEngine_);

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a ProducerBusy or ConsumerBusy error so that the reconnection will never succeed.

Modifications

  • Add an overload of ConnectionPool::getConnectionAsync that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by key suffix % size.
  • Generate the random number and save it when creating the HandlerBase object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same ClientConnection object.

Verifying this change

ProducerTest.testReconnectMultiConnectionsPerBroker is added to protected the change.

…ers per connection

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.
@BewareMyPower BewareMyPower force-pushed the bewaremypower/fix-same-producer branch from f9621ef to 42f56c1 Compare November 6, 2023 09:20
@BewareMyPower BewareMyPower self-assigned this Nov 6, 2023
@BewareMyPower BewareMyPower added the bug Something isn't working label Nov 6, 2023
@BewareMyPower BewareMyPower added this to the 3.4.0 milestone Nov 6, 2023
@BewareMyPower BewareMyPower merged commit 6f115e7 into apache:main Nov 7, 2023
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-same-producer branch November 7, 2023 02:42
BewareMyPower added a commit that referenced this pull request Nov 7, 2023
…ers per connection (#337)

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.

(cherry picked from commit 6f115e7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants