Skip to content

Spawn websocket connections in the core pool #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions graphql/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,31 @@ use graphql_parser::{query as q, schema as s, Style};
use std::collections::HashMap;
use std::result::Result;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;

use graph::prelude::*;

use crate::execution::*;
use crate::query::ast as qast;
use crate::schema::ast as sast;

use lazy_static::lazy_static;

lazy_static! {
static ref SUBSCRIPTION_QUERY_SEMAPHORE: Semaphore = {
// This is duplicating the logic in main.rs to get the connection pool size, which is
// unfourtunate. But because this module has no share state otherwise, it's not simple to
// refactor so that the semaphore isn't a global.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the right place for this semaphore would be internal to the Store anyway, so we guard any attempt to get a connection with it, but this is totally fine for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment about the 'right' way to do this to #905

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for registering this in the issue, merging the PR.

let db_conn_pool_size = std::env::var("STORE_CONNECTION_POOL_SIZE")
.unwrap_or("10".into())
.parse::<usize>()
.expect("invalid STORE_CONNECTION_POOL_SIZE");

// Limit the amount of connections that can be taken up by subscription queries.
Semaphore::new((0.7 * db_conn_pool_size as f64).ceil() as usize)
};
}

/// Options available for subscription execution.
pub struct SubscriptionExecutionOptions<R>
where
Expand Down Expand Up @@ -226,6 +244,9 @@ async fn execute_subscription_event(
.unwrap()
.clone();

// Use a semaphore to prevent subscription queries, which can be numerous and might query all at
// once, from flooding the blocking thread pool and the DB connection pool.
let _permit = SUBSCRIPTION_QUERY_SEMAPHORE.acquire();
let result = graph::spawn_blocking_allow_panic(async move {
execute_selection_set(&ctx, &selection_set, &subscription_type, &None)
})
Expand Down