Skip to content

Commit 9396188

Browse files
authored
Merge pull request #192 from brave/multiple-instances
Add support for multiple OPRF instances in a single server
2 parents 6fc06a0 + 862f8a2 commit 9396188

File tree

8 files changed

+501
-216
lines changed

8 files changed

+501
-216
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ edition = "2021"
1010
axum = "0.6.20"
1111
axum-prometheus = "0.4.0"
1212
base64 = "0.21.4"
13+
calendar-duration = "1.0.0"
1314
clap = { version = "4.4.4", features = ["derive"] }
1415
metrics-exporter-prometheus = "0.12.1"
1516
ppoprf = "0.3.1"

src/handler.rs

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
//! STAR Randomness web service route implementation
22
3-
use axum::extract::{Json, State};
3+
use std::sync::RwLockReadGuard;
4+
5+
use axum::extract::{Json, Path, State};
46
use axum::http::StatusCode;
57
use base64::prelude::{Engine as _, BASE64_STANDARD as BASE64};
68
use serde::{Deserialize, Serialize};
7-
use tracing::debug;
9+
use tracing::{debug, instrument};
810

9-
use crate::OPRFState;
11+
use crate::state::{OPRFInstance, OPRFState};
1012
use ppoprf::ppoprf;
1113

12-
/// Request format for the randomness endpoint
14+
/// Request structure for the randomness endpoint
1315
#[derive(Deserialize, Debug)]
1416
pub struct RandomnessRequest {
1517
/// Array of points to evaluate
@@ -19,7 +21,7 @@ pub struct RandomnessRequest {
1921
epoch: Option<u8>,
2022
}
2123

22-
/// Response format for the randomness endpoint
24+
/// Response structure for the randomness endpoint
2325
#[derive(Serialize, Debug)]
2426
pub struct RandomnessResponse {
2527
/// Resulting points from the OPRF valuation
@@ -30,26 +32,34 @@ pub struct RandomnessResponse {
3032
epoch: u8,
3133
}
3234

33-
/// Response format for the info endpoint
35+
/// Response structure for the info endpoint
3436
/// Rename fields to match the earlier golang implementation.
3537
#[derive(Serialize, Debug)]
38+
#[serde(rename_all = "camelCase")]
3639
pub struct InfoResponse {
3740
/// ServerPublicKey used to verify zero-knowledge proof
38-
#[serde(rename = "publicKey")]
3941
public_key: String,
4042
/// Currently active randomness epoch
41-
#[serde(rename = "currentEpoch")]
4243
current_epoch: u8,
4344
/// Timestamp of the next epoch rotation
4445
/// This should be a string in RFC 3339 format,
4546
/// e.g. 2023-03-14T16:33:05Z.
46-
#[serde(rename = "nextEpochTime")]
4747
next_epoch_time: Option<String>,
4848
/// Maximum number of points accepted in a single request
49-
#[serde(rename = "maxPoints")]
5049
max_points: usize,
5150
}
5251

52+
/// Response structure for the "list instances" endpoint.
53+
#[derive(Serialize, Debug)]
54+
#[serde(rename_all = "camelCase")]
55+
pub struct ListInstancesResponse {
56+
/// A list of available instances on the server.
57+
instances: Vec<String>,
58+
/// The default instance on this server.
59+
/// A requests made to /info and /randomness will utilize this instance.
60+
default_instance: String,
61+
}
62+
5363
/// Response returned to report error conditions
5464
#[derive(Serialize, Debug)]
5565
struct ErrorResponse {
@@ -63,6 +73,8 @@ struct ErrorResponse {
6373
/// handling requests.
6474
#[derive(thiserror::Error, Debug)]
6575
pub enum Error {
76+
#[error("instance '{0}' not found")]
77+
InstanceNotFound(String),
6678
#[error("Couldn't lock state: RwLock poisoned")]
6779
LockFailure,
6880
#[error("Invalid point")]
@@ -93,6 +105,7 @@ impl axum::response::IntoResponse for Error {
93105
/// Construct an http response from our error type
94106
fn into_response(self) -> axum::response::Response {
95107
let code = match self {
108+
Error::InstanceNotFound(_) => StatusCode::NOT_FOUND,
96109
// This indicates internal failure.
97110
Error::LockFailure => StatusCode::INTERNAL_SERVER_ERROR,
98111
// Other cases are the client's fault.
@@ -105,13 +118,28 @@ impl axum::response::IntoResponse for Error {
105118
}
106119
}
107120

121+
type Result<T> = std::result::Result<T, Error>;
122+
123+
fn get_server_from_state<'a>(
124+
state: &'a OPRFState,
125+
instance_name: &'a str,
126+
) -> Result<RwLockReadGuard<'a, OPRFInstance>> {
127+
Ok(state
128+
.instances
129+
.get(instance_name)
130+
.ok_or_else(|| Error::InstanceNotFound(instance_name.to_string()))?
131+
.read()?)
132+
}
133+
108134
/// Process PPOPRF evaluation requests
109-
pub async fn randomness(
110-
State(state): State<OPRFState>,
111-
Json(request): Json<RandomnessRequest>,
112-
) -> Result<Json<RandomnessResponse>, Error> {
135+
#[instrument(skip(state, request))]
136+
async fn randomness(
137+
state: OPRFState,
138+
instance_name: String,
139+
request: RandomnessRequest,
140+
) -> Result<Json<RandomnessResponse>> {
113141
debug!("recv: {request:?}");
114-
let state = state.read()?;
142+
let state = get_server_from_state(&state, &instance_name)?;
115143
let epoch = request.epoch.unwrap_or(state.epoch);
116144
if epoch != state.epoch {
117145
return Err(Error::BadEpoch(epoch));
@@ -138,12 +166,29 @@ pub async fn randomness(
138166
Ok(Json(response))
139167
}
140168

141-
/// Process PPOPRF epoch and key requests
142-
pub async fn info(
169+
/// Process PPOPRF evaluation requests using default instance
170+
pub async fn default_instance_randomness(
143171
State(state): State<OPRFState>,
144-
) -> Result<Json<InfoResponse>, Error> {
172+
Json(request): Json<RandomnessRequest>,
173+
) -> Result<Json<RandomnessResponse>> {
174+
let instance_name = state.default_instance.clone();
175+
randomness(state, instance_name, request).await
176+
}
177+
178+
/// Process PPOPRF evaluation requests using specific instance
179+
pub async fn specific_instance_randomness(
180+
State(state): State<OPRFState>,
181+
Path(instance_name): Path<String>,
182+
Json(request): Json<RandomnessRequest>,
183+
) -> Result<Json<RandomnessResponse>> {
184+
randomness(state, instance_name, request).await
185+
}
186+
187+
/// Provide PPOPRF epoch and key metadata
188+
#[instrument(skip(state))]
189+
async fn info(state: OPRFState, instance_name: String) -> Result<Json<InfoResponse>> {
145190
debug!("recv: info request");
146-
let state = state.read()?;
191+
let state = get_server_from_state(&state, &instance_name)?;
147192
let public_key = state.server.get_public_key().serialize_to_bincode()?;
148193
let public_key = BASE64.encode(public_key);
149194
let response = InfoResponse {
@@ -155,3 +200,25 @@ pub async fn info(
155200
debug!("send: {response:?}");
156201
Ok(Json(response))
157202
}
203+
204+
/// Provide PPOPRF epoch and key metadata using default instance
205+
pub async fn default_instance_info(State(state): State<OPRFState>) -> Result<Json<InfoResponse>> {
206+
let instance_name = state.default_instance.clone();
207+
info(state, instance_name).await
208+
}
209+
210+
/// Provide PPOPRF epoch and key metadata using specific instance
211+
pub async fn specific_instance_info(
212+
State(state): State<OPRFState>,
213+
Path(instance_name): Path<String>,
214+
) -> Result<Json<InfoResponse>> {
215+
info(state, instance_name).await
216+
}
217+
218+
// Lists all available instances, as well as the default instance
219+
pub async fn list_instances(State(state): State<OPRFState>) -> Result<Json<ListInstancesResponse>> {
220+
Ok(Json(ListInstancesResponse {
221+
instances: state.instances.keys().cloned().collect(),
222+
default_instance: state.default_instance.clone(),
223+
}))
224+
}

src/main.rs

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,23 @@
22
33
use axum::{routing::get, routing::post, Router};
44
use axum_prometheus::PrometheusMetricLayer;
5+
use calendar_duration::CalendarDuration;
56
use clap::Parser;
67
use metrics_exporter_prometheus::PrometheusHandle;
78
use rlimit::Resource;
8-
use std::sync::{Arc, RwLock};
9+
use state::{OPRFServer, OPRFState};
910
use tikv_jemallocator::Jemalloc;
10-
use time::format_description::well_known::Rfc3339;
1111
use time::OffsetDateTime;
1212
use tracing::{debug, info, metadata::LevelFilter};
1313
use tracing_subscriber::EnvFilter;
14+
use util::{assert_unique_names, parse_timestamp};
1415

1516
#[global_allocator]
1617
static GLOBAL: Jemalloc = Jemalloc;
1718

1819
mod handler;
1920
mod state;
20-
21-
pub use state::OPRFState;
21+
mod util;
2222

2323
#[cfg(test)]
2424
mod tests;
@@ -27,15 +27,22 @@ mod tests;
2727
const MAX_POINTS: usize = 1024;
2828

2929
/// Command line switches
30-
#[derive(Parser, Debug)]
30+
#[derive(Parser, Debug, Clone)]
3131
#[command(author, version, about, long_about = None)]
3232
pub struct Config {
3333
/// Host and port to listen for http connections
3434
#[arg(long, default_value = "127.0.0.1:8080")]
3535
listen: String,
36-
/// Duration of each randomness epoch
37-
#[arg(long, default_value_t = 5)]
38-
epoch_seconds: u32,
36+
/// Name of OPRF instance contained in server. Multiple instances may be defined
37+
/// by defining this switch multiple times. The first defined instance will
38+
/// become the default instance.
39+
#[arg(long = "instance-name", default_value = "main")]
40+
instance_names: Vec<String>,
41+
/// Duration of each randomness epoch. This switch may be defined multiple times
42+
/// to set the epoch length for each respective instance, if multiple instances
43+
/// are defined.
44+
#[arg(long = "epoch-duration", value_name = "Duration string i.e. 1mon5h2s", default_values = ["5s"])]
45+
epoch_durations: Vec<CalendarDuration>,
3946
/// First epoch tag to make available
4047
#[arg(long, default_value_t = 0)]
4148
first_epoch: u8,
@@ -56,20 +63,25 @@ pub struct Config {
5663
prometheus_listen: Option<String>,
5764
}
5865

59-
/// Parse a timestamp given as a config option
60-
fn parse_timestamp(stamp: &str) -> Result<OffsetDateTime, &'static str> {
61-
OffsetDateTime::parse(stamp, &Rfc3339).map_err(|_| "Try something like '2023-05-15T04:30:00Z'.")
62-
}
63-
6466
/// Initialize an axum::Router for our web service
6567
/// Having this as a separate function makes testing easier.
6668
fn app(oprf_state: OPRFState) -> Router {
6769
Router::new()
6870
// Friendly default route to identify the site
6971
.route("/", get(|| async { "STAR randomness server\n" }))
70-
// Main endpoints
71-
.route("/randomness", post(handler::randomness))
72-
.route("/info", get(handler::info))
72+
// Endpoints for all instances
73+
.route(
74+
"/instances/:instance/randomness",
75+
post(handler::specific_instance_randomness),
76+
)
77+
.route(
78+
"/instances/:instance/info",
79+
get(handler::specific_instance_info),
80+
)
81+
.route("/instances", get(handler::list_instances))
82+
// Endpoints for default instance
83+
.route("/randomness", post(handler::default_instance_randomness))
84+
.route("/info", get(handler::default_instance_info))
7385
// Attach shared state
7486
.with_state(oprf_state)
7587
// Logging must come after active routes
@@ -126,22 +138,28 @@ async fn main() {
126138
increase_nofile_limit();
127139
}
128140

129-
// Oblivious function state
130-
info!("initializing OPRF state...");
131-
let server = state::OPRFServer::new(&config).expect("Could not initialize PPOPRF state");
132-
info!("epoch now {}", server.epoch);
133-
let oprf_state = Arc::new(RwLock::new(server));
141+
assert_unique_names(&config.instance_names);
142+
assert!(
143+
!config.epoch_durations.iter().any(|d| d.is_zero()),
144+
"all epoch lengths must be non-zero"
145+
);
146+
assert!(
147+
!config.instance_names.is_empty(),
148+
"at least one instance name must be defined"
149+
);
150+
assert!(
151+
config.instance_names.len() == config.epoch_durations.len(),
152+
"instance-name switch count must match epoch-seconds switch count"
153+
);
134154

135155
let metric_layer = config.prometheus_listen.as_ref().map(|listen| {
136156
let (layer, handle) = PrometheusMetricLayer::pair();
137157
start_prometheus_server(handle, listen.clone());
138158
layer
139159
});
140160

141-
// Spawn a background process to advance the epoch
142-
info!("Spawning background epoch rotation task...");
143-
let background_state = oprf_state.clone();
144-
tokio::spawn(async move { state::epoch_loop(background_state, &config).await });
161+
let oprf_state = OPRFServer::new(&config);
162+
oprf_state.start_background_tasks(&config);
145163

146164
// Set up routes and middleware
147165
info!("initializing routes...");

0 commit comments

Comments
 (0)