Skip to content

Commit d8c876a

Browse files
committed
query missing logs directly
1 parent de04dad commit d8c876a

File tree

1 file changed

+62
-34
lines changed

1 file changed

+62
-34
lines changed

be/src/bin/sync_logs.rs

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
11
use alloy::primitives::U64;
22
use clap::Parser;
33
use shared::{jrpc, pg};
4-
use tokio::{
5-
fs::File,
6-
io::{AsyncBufReadExt, BufReader},
7-
};
84

95
#[derive(Parser)]
106
struct Args {
117
#[arg(long = "pg", env = "PG_URL", default_value = "postgres://localhost/be")]
128
pg_url: String,
139
#[arg(long = "rpc", env = "RPC_URL")]
1410
rpc_url: String,
15-
#[arg(long = "block")]
16-
block: Option<u64>,
17-
#[arg(long = "input")]
18-
input: Option<String>,
11+
#[arg(long = "range")]
12+
range: u64,
13+
#[clap(short = 'd', action = clap::ArgAction::SetTrue)]
14+
download: bool,
1915
}
2016

2117
#[tokio::main]
@@ -27,54 +23,86 @@ async fn main() {
2723
let client = jrpc::Client::new(&args.rpc_url);
2824
let chain = client.chain_id().await.expect("getting chain id");
2925

30-
let blocks = if let Some(file) = args.input {
31-
read_blocks_from_file(&file).await
32-
} else if let Some(block) = args.block {
33-
vec![block]
34-
} else {
35-
panic!("must provide either --block or --input");
36-
};
37-
38-
for block in blocks {
39-
sync_if_missing(&mut pg, &client, chain, block).await;
26+
let blocks = find_missing(&pg, chain, args.range)
27+
.await
28+
.expect("finding missing logs");
29+
if blocks.is_empty() {
30+
println!("nothing to do");
4031
}
41-
}
42-
43-
async fn read_blocks_from_file(path: &str) -> Vec<u64> {
44-
let file = File::open(path).await.expect("failed to open input file");
45-
let reader = BufReader::new(file);
46-
let mut lines = reader.lines();
47-
let mut blocks = Vec::new();
48-
49-
while let Some(line) = lines.next_line().await.expect("failed to read line") {
50-
let block = line.trim().parse().expect("invalid block number");
51-
blocks.push(block);
32+
for block in blocks {
33+
println!("missing {} txs: {}", block.num, block.txs);
34+
if args.download {
35+
sync_if_missing(&mut pg, &client, chain, block.num).await;
36+
}
5237
}
53-
54-
blocks
5538
}
5639

5740
async fn sync_if_missing(
5841
pg: &mut tokio_postgres::Client,
5942
client: &jrpc::Client,
6043
chain: u64,
61-
block: u64,
44+
block: i64,
6245
) {
6346
let count: i64 = pg
6447
.query_one(
6548
"select count(*) from logs where chain = $1 and block_num = $2",
66-
&[&U64::from(chain), &U64::from(block)],
49+
&[&U64::from(chain), &block],
6750
)
6851
.await
6952
.expect("query failed")
7053
.get(0);
7154

7255
if count == 0 {
73-
let num_logs = be::sync::sync_one(pg, client, chain, block)
56+
let num_logs = be::sync::sync_one(pg, client, chain, block as u64)
7457
.await
7558
.expect("sync failed");
7659
println!("downloaded {} logs", num_logs);
7760
} else {
7861
println!("nothing to do");
7962
}
8063
}
64+
65+
fn table_name(tbl: &str, chain: u64, range: u64) -> String {
66+
format!("{}_c{}_b{}", tbl, chain, range)
67+
}
68+
69+
struct Missing {
70+
num: i64,
71+
txs: i64,
72+
}
73+
74+
async fn find_missing(
75+
pg: &tokio_postgres::Client,
76+
chain: u64,
77+
range: u64,
78+
) -> Result<Vec<Missing>, shared::Error> {
79+
let (from, to) = ((range * 1000000), (range + 2 * 1000000));
80+
Ok(pg
81+
.query(
82+
&format!(
83+
"
84+
select b, t.tx_count
85+
from generate_series($1::int8, $2::int8) as b
86+
left join (
87+
select block_num from {}
88+
group by block_num
89+
) l on b = l.block_num
90+
left join (
91+
select block_num, count(*) as tx_count from {}
92+
group by block_num having count(*) > 1
93+
) t on b = t.block_num
94+
where l.block_num is null and t.block_num is not null
95+
",
96+
table_name("logs", chain, range),
97+
table_name("txs", chain, range)
98+
),
99+
&[&U64::from(from), &U64::from(to)],
100+
)
101+
.await?
102+
.iter()
103+
.map(|row| Missing {
104+
num: row.get("b"),
105+
txs: row.get("tx_count"),
106+
})
107+
.collect())
108+
}

0 commit comments

Comments
 (0)