Skip to content

Commit 3a3ed64

Browse files
committed
blockprod: Fix test race condition
1 parent e4bc375 commit 3a3ed64

12 files changed

+110
-60
lines changed

blockprod/src/detail/mod.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -464,19 +464,17 @@ impl BlockProduction {
464464
// scratch every time a different timestamp is attempted. That is more costly
465465
// in terms of computational resources but will allow the node to include more
466466
// transactions since the passing time may release some time locks.
467-
let collected_transactions = {
468-
let accumulator = self
469-
.collect_transactions(
470-
current_tip_index.block_id(),
471-
min_constructed_block_timestamp,
472-
transactions.clone(),
473-
transaction_ids.clone(),
474-
packing_strategy,
475-
)
476-
.await?;
477-
478-
accumulator.transactions().clone()
479-
};
467+
let collected_transactions = self
468+
.collect_transactions(
469+
current_tip_index.block_id(),
470+
min_constructed_block_timestamp,
471+
transactions.clone(),
472+
transaction_ids.clone(),
473+
packing_strategy,
474+
)
475+
.await?
476+
.transactions()
477+
.clone();
480478

481479
let block_body = BlockBody::new(block_reward, collected_transactions);
482480

blockprod/src/detail/tests.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ mod produce_block {
917917
chain_config,
918918
Arc::new(test_blockprod_config()),
919919
chainstate.clone(),
920-
mempool,
920+
mempool.clone(),
921921
p2p,
922922
Default::default(),
923923
prepare_thread_pool(1),
@@ -938,7 +938,7 @@ mod produce_block {
938938
job_finished_receiver.await.expect("Job finished receiver closed");
939939

940940
assert_job_count(&block_production, 0).await;
941-
assert_process_block(&chainstate, new_block).await;
941+
assert_process_block(&chainstate, &mempool, new_block).await;
942942
}
943943
});
944944

@@ -962,7 +962,7 @@ mod produce_block {
962962
chain_config,
963963
Arc::new(test_blockprod_config()),
964964
chainstate.clone(),
965-
mempool,
965+
mempool.clone(),
966966
p2p,
967967
Default::default(),
968968
prepare_thread_pool(1),
@@ -983,7 +983,7 @@ mod produce_block {
983983
job_finished_receiver.await.expect("Job finished receiver closed");
984984

985985
assert_job_count(&block_production, 0).await;
986-
assert_process_block(&chainstate, new_block).await;
986+
assert_process_block(&chainstate, &mempool, new_block).await;
987987
}
988988
});
989989

@@ -1095,7 +1095,7 @@ mod produce_block {
10951095
chain_config,
10961096
Arc::new(test_blockprod_config()),
10971097
chainstate.clone(),
1098-
mempool,
1098+
mempool.clone(),
10991099
p2p,
11001100
Default::default(),
11011101
prepare_thread_pool(1),
@@ -1115,7 +1115,7 @@ mod produce_block {
11151115
job_finished_receiver.await.expect("Job finished receiver closed");
11161116

11171117
assert_job_count(&block_production, 0).await;
1118-
assert_process_block(&chainstate, new_block).await;
1118+
assert_process_block(&chainstate, &mempool, new_block).await;
11191119
}
11201120
});
11211121

@@ -1152,7 +1152,7 @@ mod produce_block {
11521152
chain_config,
11531153
Arc::new(test_blockprod_config()),
11541154
chainstate.clone(),
1155-
mempool,
1155+
mempool.clone(),
11561156
p2p,
11571157
Default::default(),
11581158
prepare_thread_pool(1),
@@ -1174,7 +1174,7 @@ mod produce_block {
11741174
job_finished_receiver.await.expect("Job finished receiver closed");
11751175

11761176
assert_job_count(&block_production, 0).await;
1177-
assert_process_block(&chainstate, new_block).await;
1177+
assert_process_block(&chainstate, &mempool, new_block).await;
11781178
}
11791179
});
11801180

@@ -1209,7 +1209,7 @@ mod produce_block {
12091209
chain_config.clone(),
12101210
Arc::new(test_blockprod_config()),
12111211
chainstate.clone(),
1212-
mempool,
1212+
mempool.clone(),
12131213
p2p,
12141214
Default::default(),
12151215
prepare_thread_pool(1),
@@ -1240,7 +1240,7 @@ mod produce_block {
12401240
job_finished_receiver.await.expect("Job finished receiver closed");
12411241

12421242
assert_job_count(&block_production, 0).await;
1243-
assert_process_block(&chainstate, new_block).await;
1243+
assert_process_block(&chainstate, &mempool, new_block).await;
12441244
}
12451245
});
12461246

@@ -1354,7 +1354,7 @@ mod produce_block {
13541354
chain_config.clone(),
13551355
Arc::new(test_blockprod_config()),
13561356
chainstate.clone(),
1357-
mempool,
1357+
mempool.clone(),
13581358
p2p,
13591359
Default::default(),
13601360
prepare_thread_pool(1),
@@ -1400,7 +1400,7 @@ mod produce_block {
14001400

14011401
job_finished_receiver.await.expect("Job finished receiver closed");
14021402

1403-
assert_process_block(&chainstate, new_block.clone()).await;
1403+
assert_process_block(&chainstate, &mempool, new_block.clone()).await;
14041404
}
14051405
RequiredConsensus::PoS(_) => {
14061406
// Try no input data for PoS consensus
@@ -1457,7 +1457,8 @@ mod produce_block {
14571457

14581458
job_finished_receiver.await.expect("Job finished receiver closed");
14591459

1460-
let result = assert_process_block(&chainstate, new_block).await;
1460+
let result =
1461+
assert_process_block(&chainstate, &mempool, new_block).await;
14611462

14621463
// Update kernel input parameters for future PoS blocks
14631464

@@ -1528,7 +1529,7 @@ mod produce_block {
15281529

15291530
job_finished_receiver.await.expect("Job finished receiver closed");
15301531

1531-
assert_process_block(&chainstate, new_block.clone()).await;
1532+
assert_process_block(&chainstate, &mempool, new_block.clone()).await;
15321533
}
15331534
}
15341535
}

blockprod/src/lib.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ mod tests {
130130
Block, ConsensusUpgrade, Destination, Genesis, NetUpgrades, PoSChainConfigBuilder,
131131
TxOutput, UpgradeVersion,
132132
},
133-
primitives::{per_thousand::PerThousand, Amount, BlockHeight, H256},
133+
primitives::{per_thousand::PerThousand, Amount, BlockHeight, Idable, H256},
134134
time_getter::TimeGetter,
135135
};
136136
use crypto::{
@@ -154,13 +154,37 @@ mod tests {
154154

155155
pub async fn assert_process_block(
156156
chainstate: &ChainstateHandle,
157+
mempool: &MempoolHandle,
157158
new_block: Block,
158159
) -> BlockIndex {
159-
chainstate
160+
let block_id = new_block.get_id();
161+
162+
// Wait for mempool to be up-to-date with the new block. The subscriptions are not cleaned
163+
// up but hopefully it's not too bad just for testing.
164+
let (tip_sx, tip_rx) = tokio::sync::oneshot::channel();
165+
let tip_sx = utils::sync::Mutex::new(Some(tip_sx));
166+
mempool
167+
.call_mut(move |m| {
168+
m.subscribe_to_events(Arc::new({
169+
move |evt| match evt {
170+
mempool::event::MempoolEvent::NewTip(tip) => {
171+
if let Some(tip_sx) = tip_sx.lock().unwrap().take() {
172+
assert_eq!(tip.block_id(), &block_id);
173+
tip_sx.send(()).unwrap();
174+
}
175+
}
176+
mempool::event::MempoolEvent::TransactionProcessed(_) => (),
177+
}
178+
}))
179+
})
180+
.await
181+
.unwrap();
182+
183+
let block_index = chainstate
160184
.call_mut(move |this| {
161185
let new_block_index = this
162186
.process_block(new_block.clone(), BlockSource::Local)
163-
.expect("Failed to process block: {:?}")
187+
.expect("Failed to process block")
164188
.expect("Failed to activate best chain");
165189

166190
assert_eq!(
@@ -170,7 +194,7 @@ mod tests {
170194
);
171195

172196
let best_block_index =
173-
this.get_best_block_index().expect("Failed to get best block index: {:?}");
197+
this.get_best_block_index().expect("Failed to get best block index");
174198

175199
assert_eq!(
176200
new_block_index.clone().into_gen_block_index().block_id(),
@@ -181,7 +205,11 @@ mod tests {
181205
new_block_index
182206
})
183207
.await
184-
.expect("New block is not the new tip: {:?}")
208+
.expect("New block is not the new tip");
209+
210+
tip_rx.await.unwrap();
211+
212+
block_index
185213
}
186214

187215
pub fn setup_blockprod_test(

mempool/src/pool/collect_txs.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ pub fn collect_txs<M>(
9191
mempool.chainstate_handle.shallow_clone(),
9292
);
9393

94-
let verifier_time = tx_accumulator.block_timestamp();
95-
9694
let best_index = mempool
9795
.blocking_chainstate_handle()
9896
.call(|c| c.get_best_block_index())?
@@ -106,7 +104,7 @@ pub fn collect_txs<M>(
106104
.iter()
107105
.map(|transaction| {
108106
let _fee =
109-
tx_verifier.connect_transaction(&tx_source, transaction, &verifier_time, None)?;
107+
tx_verifier.connect_transaction(&tx_source, transaction, &block_timestamp, None)?;
110108
Ok(transaction.transaction().get_id())
111109
})
112110
.collect::<Result<Vec<_>, TxValidationError>>()?;
@@ -195,7 +193,7 @@ pub fn collect_txs<M>(
195193
let verification_result = tx_verifier.connect_transaction(
196194
&tx_source,
197195
next_tx.transaction(),
198-
&verifier_time,
196+
&block_timestamp,
199197
None,
200198
);
201199

test/functional/blockprod_generate_blocks_all_sources.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ def run_test(self):
138138
timeout = 5
139139
)
140140

141+
old_tip = node.chainstate_best_block_id()
142+
141143
block_hex = node.blockprod_generate_block(
142144
block_input_data,
143145
transactions,
@@ -150,11 +152,15 @@ def run_test(self):
150152
for expected_transaction in expected_transactions:
151153
self.assert_transaction_in_block(expected_transaction, block)
152154

153-
old_tip = node.chainstate_best_block_id()
154155
node.chainstate_submit_block(block_hex)
155156
new_tip = node.chainstate_best_block_id()
156157
assert(old_tip != new_tip)
157158

159+
self.wait_until(
160+
lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(),
161+
timeout = 5
162+
)
163+
158164
#
159165
# Check chainstate and mempool is as expected
160166
#

test/functional/blockprod_generate_pos_blocks.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,23 @@ def assert_tip(self, expected_block):
6969
assert_equal(block, expected_block)
7070

7171
def generate_block(self, expected_height, block_input_data, transactions):
72-
previous_block_id = self.nodes[0].chainstate_best_block_id()
72+
node = self.nodes[0]
73+
previous_block_id = node.chainstate_best_block_id()
7374

7475
# Block production may fail if the Job Manager found a new tip, so try and sleep
7576
for _ in range(5):
7677
try:
77-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
78+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
7879
break
7980
except JSONRPCException:
80-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
81+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
8182
time.sleep(1)
8283

8384
block_hex_array = bytearray.fromhex(block_hex)
8485
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()
8586

86-
self.nodes[0].chainstate_submit_block(block_hex)
87+
node.chainstate_submit_block(block_hex)
88+
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)
8789

8890
self.assert_tip(block_hex)
8991
self.assert_height(expected_height, block_hex)

test/functional/blockprod_generate_pos_blocks_rand_genesis_keys.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,23 @@ def block_height(self, n):
9494
return self.nodes[n].chainstate_block_height_in_main_chain(tip)
9595

9696
def generate_block(self, expected_height, block_input_data, transactions):
97-
previous_block_id = self.nodes[0].chainstate_best_block_id()
97+
node = self.nodes[0]
98+
previous_block_id = node.chainstate_best_block_id()
9899

99100
# Block production may fail if the Job Manager found a new tip, so try and sleep
100101
for _ in range(5):
101102
try:
102-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
103+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
103104
break
104105
except JSONRPCException:
105-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
106+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
106107
time.sleep(1)
107108

108109
block_hex_array = bytearray.fromhex(block_hex)
109110
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()
110111

111-
self.nodes[0].chainstate_submit_block(block_hex)
112+
node.chainstate_submit_block(block_hex)
113+
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)
112114

113115
self.assert_tip(block_hex)
114116
self.assert_height(expected_height, block_hex)

test/functional/blockprod_generate_pos_genesis_blocks.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,23 @@ def assert_tip(self, expected_block):
6767
assert_equal(block, expected_block)
6868

6969
def generate_block(self, expected_height, block_input_data, transactions):
70-
previous_block_id = self.nodes[0].chainstate_best_block_id()
70+
node = self.nodes[0]
71+
previous_block_id = node.chainstate_best_block_id()
7172

7273
# Block production may fail if the Job Manager found a new tip, so try and sleep
7374
for _ in range(5):
7475
try:
75-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
76+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
7677
break
7778
except JSONRPCException:
78-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
79+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
7980
time.sleep(1)
8081

8182
block_hex_array = bytearray.fromhex(block_hex)
8283
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()
8384

84-
self.nodes[0].chainstate_submit_block(block_hex)
85+
node.chainstate_submit_block(block_hex)
86+
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)
8587

8688
self.assert_tip(block_hex)
8789
self.assert_height(expected_height, block_hex)

test/functional/blockprod_generate_pow_blocks.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,23 @@ def block_height(self, n):
5454
return self.nodes[n].chainstate_block_height_in_main_chain(tip)
5555

5656
def generate_block(self, expected_height, block_input_data, transactions):
57-
previous_block_id = self.nodes[0].chainstate_best_block_id()
57+
node = self.nodes[0]
58+
previous_block_id = node.chainstate_best_block_id()
5859

5960
# Block production may fail if the Job Manager found a new tip, so try and sleep
6061
for _ in range(5):
6162
try:
62-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
63+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
6364
break
6465
except JSONRPCException:
65-
block_hex = self.nodes[0].blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
66+
block_hex = node.blockprod_generate_block(block_input_data, transactions, [], "LeaveEmptySpace")
6667
time.sleep(1)
6768

6869
block_hex_array = bytearray.fromhex(block_hex)
6970
block = ScaleDecoder.get_decoder_class('BlockV1', ScaleBytes(block_hex_array)).decode()
7071

71-
self.nodes[0].chainstate_submit_block(block_hex)
72+
node.chainstate_submit_block(block_hex)
73+
self.wait_until(lambda: node.mempool_local_best_block_id() == node.chainstate_best_block_id(), timeout = 5)
7274

7375
self.assert_tip(block_hex)
7476
self.assert_height(expected_height, block_hex)

0 commit comments

Comments
 (0)