Skip to content

(asyncio) PubSub does not automatically reconnect #2089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
kristjanvalur opened this issue Apr 6, 2022 · 8 comments
Closed

(asyncio) PubSub does not automatically reconnect #2089

kristjanvalur opened this issue Apr 6, 2022 · 8 comments

Comments

@kristjanvalur
Copy link
Contributor

kristjanvalur commented Apr 6, 2022

Version: 4.2.2

Platform: python 3.8 on windows

Description: Running a pub-sub task in a loop, requires me to manually call pubsub.connection.connect()

So:

I'm new to python redis, having previously used hiredis and written clients for UnrealEngine using it.

So, I was pleasantly surprised at how the client typically handles reconnections for you, in case there are disconnects.
However, this does not happen with the PubSub client.

Please note that I am using the redis.asyncio module here.

There is a retry mechanism in PubSub._execute, which callse PubSub._disconnect_raise_connect in case of failure.
However, that function only calls connect() in case of a TimeoutError.

If there is a disconnect, for example, because of the connection to the server disappearing for a few seconds, or other,
the connection stays in a disconnected state.

In my tests, I killed and restarted the server and the PubSub.connection stayed in a is_connected==False state, with any read operations resulting in a ConnectionError("Connection closed by server.").

Previously the loop looked somethign like:

async def loop(self):
    await self.perform_any_subscriptions()
    while True:
        try:
            await self.loop_step()
        except redis.ConnectionError:
            pass

async def loop_step(self):
    async for message in self.pubsub.listen():
        await self.dispatch_message(message)
        break

If the connection becomes disconnected, the listen() will simply continually raise ConnectionError("Connection closed by server.").

What I had to do, as a workaround, was to do something like this, for my listening loop:

async def loop(self):
    while True:
        await self.pubsub.connection.call_with_retry(
            self.loop_step,
            lambda e: if isinstance(e, redis.ConnectionError) and self.pubsub.connection and not self.pubsub.connection.is_connected: await self.pubsubconnection.connect()
            )

(use the call_with_retry as a handy catch-all system)

I'm not sure if this is expected, or if it is an error, but from the documentation I'm reading, it seems like PubSub ought to be re-usable across disconnects, automatically reconnecting any previous subscriptions, etc.

UPDATE:

After PR #2148, the async PubSub class now has a connect() method, which can be used to remedy
this. The top loop becomes:

async def loop(self):
    await self.perform_any_subscriptions()
    while True:
        try:
            await self.pubsub.connect()
            await self.loop_step()
        except redis.ConnectionError:
            pass

Previously it was not possible to reconnect a PubSub object after a ConnectionError without issuing a "subscribe()" or "unsubscribe"

@agronholm
Copy link
Contributor

Does the synchronous version auto-reconnect?

@kristjanvalur
Copy link
Contributor Author

Good question. I'd guess not, because the "connect" feature is part of the PubSub.execute() which only gets invoked as part of a subscribe/unsubscribe, not as part of a get_message. But I'll try to see if I can create a similar unittest for syncronous (although that is probably more tedious)

@kristjanvalur
Copy link
Contributor Author

Finally got round to do this.
Wrtingin the unit-tests for automatic PubSub reconnectin in syncronous PubSub (#2256) confirms that syncronous version does indeed auto-connect.
It would appear to be the intention that things behave in this way. And so, it is my opitnion that the asyncio PubSub should do so as well.

@kristjanvalur
Copy link
Contributor Author

So, what should I do? Should I attempt to provide a PR (or extend #2256) to make the corresponding fix to the async version? @agronholm ? Are there any maintainers here who have an opinion on these things? @chayim ?

@agronholm
Copy link
Contributor

The best course of action would probably be to make a new PR that adapts the tests from #2256 to async and implements that functionality on the async side.

@dvora-h
Copy link
Collaborator

dvora-h commented Jul 11, 2022

@kristjanvalur Agree with @agronholm, if you can provide a new PR that will be great!

@kristjanvalur
Copy link
Contributor Author

Will do. Working on getting the tests clear and indicative of the problem.

@kristjanvalur
Copy link
Contributor Author

So, I spent some time on making these unittests and having them reliable. In the process, I discovered that the issue was only present when using pubsub.listen() and not pubsub.get_message(), and that it was an issue in both sync and async versions of the library. The pr #2281 addresses this issue.

Researching this, I noticed how very much the async library is just a straight copy of the synchronous library. There are many inefficiencies, such as the use of can_read which is superfluous in the async library. I expect I'll make a separate PR to simplify the async code a tiny bit, while maintaining api compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants