Spawn websocket connections in the core pool#1522
Conversation
| Panic(String), | ||
| EventStreamError, |
There was a problem hiding this comment.
I wonder if these aren't too much of an implementation detail; then again... why not! They might help us debug things in the future.
6c51603 to
4df8c61
Compare
| let result = graph::spawn_blocking_allow_panic(async move { | ||
| execute_selection_set(&ctx, &selection_set, &subscription_type, &None) | ||
| }) | ||
| .await |
There was a problem hiding this comment.
This should helps since now the number of blocking threads we need is no longer dependent on the number of subscriptions, but we can still exhaust the blocking pool because of the thundering herd behavior of subscriptions. If the individual queries for each subscription take a while to run (say 1s which is not that hard to cause) the first few subscriptions through here will exhaust the connection pool, which will cause following subscriptions to wait for those queries to finish, eventually filling up the blocking pool. We should gate spawning the blocking thread on a semaphore that is sized so that we do not exhaust the connection pool (say allows 75% of the connection pool through)
There was a problem hiding this comment.
What we should really do is change the store to move actual work to the blocking pool itself by having some internal function Store.with_connection(f:Fn(Connection)->Result) which first acquires a semaphore sized according to the max number of connections in the pool and then executes f on the blocking pool. We'd then change all Store methods that right now just get a connection to use with_connection and do their work inside that.
But that's too much work for this fix; that's why I suggested a semaphore here with a guess at how big it should be.
There was a problem hiding this comment.
It does make sense to throttle subscription queries so they don't delay normal queries. Ideally we'd have query nodes dedicated only to subscriptions. I'll put an async semaphore here as you suggest.
lutter
left a comment
There was a problem hiding this comment.
This looks great! Thanks for adding the semaphore
| static ref SUBSCRIPTION_QUERY_SEMAPHORE: Semaphore = { | ||
| // This is duplicating the logic in main.rs to get the connection pool size, which is | ||
| // unfourtunate. But because this module has no share state otherwise, it's not simple to | ||
| // refactor so that the semaphore isn't a global. |
There was a problem hiding this comment.
And the right place for this semaphore would be internal to the Store anyway, so we guard any attempt to get a connection with it, but this is totally fine for now.
There was a problem hiding this comment.
Added a comment about the 'right' way to do this to #905
There was a problem hiding this comment.
Thanks for registering this in the issue, merging the PR.
Otherwise they would indefinitely take up a thread in the blocking pool, causing that pool to run out of threads under high load and freeze up the node. Now only the execution of the selection set, which is the actually blocking part, is spawned as blocking. Lesson learned, don't put long-running tasks in the blocking pool.
Also bumped the unresponsive timeout from 10s to 100s, since we don't expect it to happen anymore and might want to avoid crashing the node in case of temporary unresponsiveness for some other reason.
Tested locally that the issue no longer reproduces.
Most of the diff if from dropping the lifetime from
Resolver, it needs to be static now so it can be put in a tokio task.