Skip to content

Commit 92425b5

Browse files
authored
Tokio 1.0 upgrade (#152)
1 parent d2063f7 commit 92425b5

File tree

11 files changed

+561
-833
lines changed

11 files changed

+561
-833
lines changed

Cargo.lock

Lines changed: 415 additions & 740 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,40 @@ edition = "2018"
1414
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1515

1616
[dependencies]
17-
tokio = { version = "0.2.22", features = ["full"] }
18-
serde = { version = "1.0.118", features = ["derive"] }
19-
twitch-irc = { version = "1.0.0", features = ["transport-tcp", "metrics-collection"] }
20-
futures = "0.3.12"
21-
toml = "0.5.8"
22-
structopt = "0.3.21"
23-
thiserror = "1.0.23"
24-
log = "0.4.14"
25-
metrics = "0.12.1"
26-
metrics-runtime = "0.13.0"
27-
metrics-observer-prometheus = "0.1.4"
28-
metrics-core = "0.5.2"
29-
warp = { git = "https://github.com/RAnders00/warp", branch = "v025-backports" }
30-
http = "0.2.2"
31-
hyper = "~0.13"
32-
chrono = { version = "0.4.19", features = ["serde"] }
33-
serde_json = "1.0.61"
34-
itertools = "0.10.0"
35-
humantime-serde = "1.0.1"
36-
refinery = { version = "0.4.0", features = ["tokio-postgres"] }
37-
regex = "1.4.3"
38-
lazy_static = "1.4.0"
39-
humantime = "2.1.0"
40-
derivative = "2.2.0"
41-
env_logger = "0.8.2"
42-
simple-process-stats = "=0.2.0"
43-
mobc = { version = "0.5.12", features = ["tokio"] }
44-
mobc-postgres = "0.5.0"
45-
tokio-postgres = { version = "0.5.5", features = ["with-chrono-0_4"] }
46-
rmp-serde = "0.15.1"
47-
reqwest = { version = "0.10.10", features = ["json"] }
48-
rand = "0.8.3"
17+
async-stream = "0.3.0"
18+
chrono = { version = "0.4", features = ["serde"] }
19+
derivative = "2.1"
20+
env_logger = "0.8"
21+
futures = "0.3"
22+
http = "0.2"
23+
humantime = "2.1"
24+
humantime-serde = "1.0"
25+
hyper = "0.14"
26+
itertools = "0.10"
27+
lazy_static = "1.4"
28+
log = "0.4"
29+
metrics = "0.14"
30+
metrics-exporter-prometheus = { version = "0.3", default-features = false }
31+
mobc = { version = "0.7", features = ["tokio"] }
32+
mobc-postgres = "0.7"
33+
rand = "0.8"
34+
refinery = { version = "0.5", features = ["tokio-postgres"] }
35+
regex = "1.4"
36+
reqwest = { version = "0.11", features = ["json"] }
37+
rmp-serde = "0.15"
38+
serde = { version = "1.0", features = ["derive"] }
39+
serde_json = "1.0"
40+
simple-process-stats = "1.0"
41+
structopt = "0.3"
42+
thiserror = "1.0"
43+
tokio = { version = "1.0", features = ["full"] }
44+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
45+
toml = "0.5"
46+
twitch-irc = { version = "2.2", features = ["transport-tcp", "metrics-collection"] }
47+
warp = { git = "https://github.com/RAnders00/warp", branch = "v030-backports" }
4948

5049
[target.'cfg(unix)'.dependencies]
51-
rlimit = "0.5.3"
50+
rlimit = "0.5"
5251

5352
[profile.release]
5453
lto = "fat"

src/db.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ WHERE access_token = $1",
356356
channel_messages.pop_front();
357357
} else {
358358
let new_gauge_value = self.messages_stored.fetch_add(1, Ordering::SeqCst) + 1;
359-
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as i64);
359+
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as f64);
360360
}
361361
}
362362

@@ -433,7 +433,7 @@ WHERE access_token = $1",
433433
.messages_stored
434434
.fetch_sub(messages_deleted, Ordering::SeqCst)
435435
- messages_deleted;
436-
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as i64);
436+
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as f64);
437437

438438
// remove the mapping from the map if there are no more messages.
439439
if channel_messages.len() == 0 {
@@ -497,7 +497,7 @@ WHERE access_token = $1",
497497
.messages_stored
498498
.fetch_add(messages_added, Ordering::SeqCst)
499499
+ messages_added;
500-
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as i64);
500+
metrics::gauge!("recent_messages_messages_stored", new_gauge_value as f64);
501501

502502
messages_map.insert(channel_login, Arc::new(Mutex::new(channel_messages)));
503503
}

src/irc_listener.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::config::Config;
22
use crate::db::DataStorage;
3-
use futures::prelude::*;
43
use std::borrow::Cow;
54
use tokio::sync::mpsc;
65
use twitch_irc::login::StaticLoginCredentials;
@@ -39,7 +38,7 @@ impl IrcListener {
3938
data_storage: &'static DataStorage,
4039
max_buffer_size: usize,
4140
) {
42-
while let Some(message) = incoming_messages.next().await {
41+
while let Some(message) = incoming_messages.recv().await {
4342
tokio::spawn(async move {
4443
if let Some(channel_login) = message.channel_login() {
4544
let message_source = message.source().as_raw_irc();

src/main.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ mod web;
1111

1212
use crate::config::{Args, Config};
1313
use crate::db::DataStorage;
14+
#[cfg(not(unix))]
1415
use futures::prelude::*;
15-
use metrics_runtime::Receiver;
16+
use metrics_exporter_prometheus::PrometheusBuilder;
1617
use structopt::StructOpt;
1718

1819
#[tokio::main]
@@ -24,12 +25,11 @@ async fn main() {
2425
increase_nofile_rlimit();
2526

2627
// init metrics system
27-
let metrics_receiver = Receiver::builder()
28-
.build()
29-
.expect("failed to create receiver");
30-
let metrics_controller = metrics_receiver.controller();
31-
metrics_receiver.install();
28+
let prom_recorder = Box::leak(Box::new(PrometheusBuilder::new().build()));
29+
let prom_handle = prom_recorder.handle();
30+
metrics::set_recorder(prom_recorder).unwrap();
3231
system_monitoring::spawn_system_monitoring();
32+
register_application_metrics();
3333

3434
// args and config parsing
3535
let args = Args::from_args();
@@ -101,7 +101,7 @@ async fn main() {
101101
tokio::spawn(data_storage.run_task_vacuum_old_messages(config));
102102
let web_join_handle = tokio::spawn(web::run(
103103
listener,
104-
metrics_controller,
104+
prom_handle,
105105
data_storage,
106106
irc_listener,
107107
config,
@@ -111,10 +111,13 @@ async fn main() {
111111
let ctrl_c_event = async {
112112
use tokio::signal::unix::{signal, SignalKind};
113113

114-
let sigint = signal(SignalKind::interrupt()).unwrap();
115-
let sigterm = signal(SignalKind::terminate()).unwrap();
114+
let mut sigint = signal(SignalKind::interrupt()).unwrap();
115+
let mut sigterm = signal(SignalKind::terminate()).unwrap();
116116

117-
futures::stream::select(sigint, sigterm).next().await
117+
tokio::select! {
118+
_ = sigint.recv() => {},
119+
_ = sigterm.recv() => {}
120+
}
118121
};
119122
#[cfg(not(unix))]
120123
let ctrl_c_event =
@@ -172,3 +175,29 @@ fn increase_nofile_rlimit() {
172175
log::debug!("NOFILE rlimit: no need to increase (soft limit is not below hard limit)")
173176
}
174177
}
178+
179+
/// Register all created metrics to initialize them as zero and give them their description.
180+
fn register_application_metrics() {
181+
metrics::register_counter!(
182+
"recent_messages_messages_appended",
183+
"Total number of messages appended to storage"
184+
);
185+
metrics::register_gauge!(
186+
"recent_messages_messages_stored",
187+
"Number of messages currently stored in storage"
188+
);
189+
metrics::register_counter!(
190+
"recent_messages_messages_vacuumed",
191+
"Total number of messages that were removed by the automatic vacuum runner"
192+
);
193+
metrics::register_counter!(
194+
"recent_messages_message_vacuum_runs",
195+
"Total number of times the automatic vacuum runner has been started for a certain channel"
196+
);
197+
metrics::register_histogram!(
198+
"http_request_duration_nanoseconds",
199+
metrics::Unit::Nanoseconds,
200+
"Distribution of how many nanoseconds incoming web requests took to answer them"
201+
);
202+
metrics::register_counter!("http_request", "Total number of incoming HTTP requests");
203+
}

src/message_export.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ impl ContainerFrame {
3232
}
3333

3434
if options.hide_moderation_messages
35-
&& matches!(self.original_message, ServerMessage::ClearChat(_) | ServerMessage::ClearMsg(_))
35+
&& matches!(
36+
self.original_message,
37+
ServerMessage::ClearChat(_) | ServerMessage::ClearMsg(_)
38+
)
3639
{
3740
return None;
3841
}
@@ -131,8 +134,15 @@ impl MessageContainer {
131134
ServerMessage::try_from(IRCMessage::parse(&message.message_source).unwrap()).unwrap();
132135

133136
// we export PRIVMSG, CLEARCHAT, CLEARMSG, USERNOTICE, NOTICE and ROOMSTATE
134-
if !matches!(server_message, ServerMessage::Privmsg(_) | ServerMessage::ClearChat(_) | ServerMessage::ClearMsg(_) | ServerMessage::UserNotice(_) | ServerMessage::Notice(_) | ServerMessage::RoomState(_))
135-
{
137+
if !matches!(
138+
server_message,
139+
ServerMessage::Privmsg(_)
140+
| ServerMessage::ClearChat(_)
141+
| ServerMessage::ClearMsg(_)
142+
| ServerMessage::UserNotice(_)
143+
| ServerMessage::Notice(_)
144+
| ServerMessage::RoomState(_)
145+
) {
136146
return;
137147
}
138148

src/system_monitoring.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,37 @@ use tokio::time::Duration;
44

55
/// Provides metrics for CPU and memory usage.
66
pub fn spawn_system_monitoring() {
7-
metrics::gauge!("process_start_time_seconds", Utc::now().timestamp());
7+
metrics::register_gauge!(
8+
"process_start_time_seconds",
9+
"UTC timestamp (in seconds) of when the process started."
10+
);
11+
metrics::gauge!("process_start_time_seconds", Utc::now().timestamp() as f64);
12+
13+
metrics::register_gauge!(
14+
"process_cpu_user_seconds_total",
15+
metrics::Unit::Seconds,
16+
"Cumulative number of seconds spent executing in user mode"
17+
);
18+
metrics::register_gauge!(
19+
"process_cpu_system_seconds_total",
20+
metrics::Unit::Seconds,
21+
"Cumulative number of seconds spent executing in kernel mode"
22+
);
23+
metrics::register_gauge!(
24+
"process_cpu_seconds_total",
25+
metrics::Unit::Seconds,
26+
"Cumulative number of seconds spent executing in either kernel or user mode"
27+
);
28+
metrics::register_gauge!(
29+
"process_resident_memory_bytes",
30+
metrics::Unit::Bytes,
31+
"Resident memory usage size as reported by the kernel, in bytes"
32+
);
833
tokio::spawn(run_system_monitoring());
934
}
1035

1136
async fn run_system_monitoring() {
1237
let mut interval = tokio::time::interval(Duration::from_secs(10));
13-
let mut last_seconds_user: u64 = 0;
14-
let mut last_seconds_kernel: u64 = 0;
15-
let mut last_seconds_total: u64 = 0;
1638
loop {
1739
interval.tick().await;
1840

@@ -25,23 +47,16 @@ async fn run_system_monitoring() {
2547
}
2648
};
2749

28-
// we do this retarded delta calculation because the `metrics` crate only has functionality
29-
// to _increment_ a counter. Additionally, it only supports whole numbers (i64).
30-
// For this reason, we simply calculate how many seconds to add since the last run.
31-
let user_seconds = system_stats.cpu_time_user.as_secs() - last_seconds_user;
32-
let kernel_seconds = system_stats.cpu_time_kernel.as_secs() - last_seconds_kernel;
33-
let total_seconds = (system_stats.cpu_time_user + system_stats.cpu_time_kernel).as_secs()
34-
- last_seconds_total;
35-
metrics::counter!("process_cpu_user_seconds_total", user_seconds);
36-
metrics::counter!("process_cpu_system_seconds_total", kernel_seconds);
37-
metrics::counter!("process_cpu_seconds_total", total_seconds);
38-
last_seconds_user += user_seconds;
39-
last_seconds_kernel += kernel_seconds;
40-
last_seconds_total += total_seconds;
41-
50+
let user_seconds = system_stats.cpu_time_user.as_secs_f64();
51+
let kernel_seconds = system_stats.cpu_time_kernel.as_secs_f64();
52+
let total_seconds =
53+
(system_stats.cpu_time_user + system_stats.cpu_time_kernel).as_secs_f64();
54+
metrics::gauge!("process_cpu_user_seconds_total", user_seconds);
55+
metrics::gauge!("process_cpu_system_seconds_total", kernel_seconds);
56+
metrics::gauge!("process_cpu_seconds_total", total_seconds);
4257
metrics::gauge!(
4358
"process_resident_memory_bytes",
44-
system_stats.memory_usage_bytes as i64
59+
system_stats.memory_usage_bytes as f64
4560
);
4661
}
4762
}

src/web/get_metrics.rs

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/web/get_recent_messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub async fn get_recent_messages(
8383

8484
if !is_confirmed_joined {
8585
// wait 5 seconds then check again
86-
tokio::time::delay_for(Duration::from_secs(5)).await;
86+
tokio::time::sleep(Duration::from_secs(5)).await;
8787
is_confirmed_joined = irc_listener.is_join_confirmed(channel_login.clone()).await;
8888
}
8989

src/web/ignored.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub async fn set_ignored(
5353

5454
data_storage.purge_messages(&authorization.user_login).await;
5555
tokio::spawn(async move {
56-
tokio::time::delay_for(Duration::from_secs(3)).await;
56+
tokio::time::sleep(Duration::from_secs(3)).await;
5757
data_storage.purge_messages(&authorization.user_login).await;
5858
});
5959
} else {

0 commit comments

Comments
 (0)