@@ -22,35 +22,35 @@ use async_std::sync::{Arc, Mutex};
2222use async_trait:: async_trait;
2323use bp_parachains:: parachain_head_storage_key_at_source;
2424use bp_polkadot_core:: parachains:: { ParaHash , ParaHead , ParaHeadsProof , ParaId } ;
25+ use bp_runtime:: HeaderIdProvider ;
2526use codec:: Decode ;
2627use parachains_relay:: {
27- parachains_loop:: { ParaHashAtSource , SourceClient } ,
28+ parachains_loop:: { AvailableHeader , SourceClient } ,
2829 parachains_loop_metrics:: ParachainsLoopMetrics ,
2930} ;
3031use relay_substrate_client:: {
3132 Chain , Client , Error as SubstrateError , HeaderIdOf , HeaderOf , RelayChain ,
3233} ;
3334use relay_utils:: relay_loop:: Client as RelayClient ;
34- use sp_runtime:: traits:: Header as HeaderT ;
3535
3636/// Shared updatable reference to the maximal parachain header id that we want to sync from the
3737/// source.
38- pub type RequiredHeaderIdRef < C > = Arc < Mutex < Option < HeaderIdOf < C > > > > ;
38+ pub type RequiredHeaderIdRef < C > = Arc < Mutex < AvailableHeader < HeaderIdOf < C > > > > ;
3939
4040/// Substrate client as parachain heads source.
4141#[ derive( Clone ) ]
4242pub struct ParachainsSource < P : SubstrateParachainsPipeline > {
4343 client : Client < P :: SourceRelayChain > ,
44- maximal_header_id : Option < RequiredHeaderIdRef < P :: SourceParachain > > ,
44+ max_head_id : RequiredHeaderIdRef < P :: SourceParachain > ,
4545}
4646
4747impl < P : SubstrateParachainsPipeline > ParachainsSource < P > {
4848 /// Creates new parachains source client.
4949 pub fn new (
5050 client : Client < P :: SourceRelayChain > ,
51- maximal_header_id : Option < RequiredHeaderIdRef < P :: SourceParachain > > ,
51+ max_head_id : RequiredHeaderIdRef < P :: SourceParachain > ,
5252 ) -> Self {
53- ParachainsSource { client, maximal_header_id }
53+ ParachainsSource { client, max_head_id }
5454 }
5555
5656 /// Returns reference to the underlying RPC client.
@@ -59,11 +59,11 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
5959 }
6060
6161 /// Return decoded head of given parachain.
62- pub async fn on_chain_parachain_header (
62+ pub async fn on_chain_para_head_id (
6363 & self ,
6464 at_block : HeaderIdOf < P :: SourceRelayChain > ,
6565 para_id : ParaId ,
66- ) -> Result < Option < HeaderOf < P :: SourceParachain > > , SubstrateError > {
66+ ) -> Result < Option < HeaderIdOf < P :: SourceParachain > > , SubstrateError > {
6767 let storage_key =
6868 parachain_head_storage_key_at_source ( P :: SourceRelayChain :: PARAS_PALLET_NAME , para_id) ;
6969 let para_head = self . client . raw_storage_value ( storage_key, Some ( at_block. 1 ) ) . await ?;
@@ -72,8 +72,8 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
7272 Some ( para_head) => para_head,
7373 None => return Ok ( None ) ,
7474 } ;
75-
76- Ok ( Some ( Decode :: decode ( & mut & para_head. 0 [ .. ] ) ? ) )
75+ let para_head : HeaderOf < P :: SourceParachain > = Decode :: decode ( & mut & para_head . 0 [ .. ] ) ? ;
76+ Ok ( Some ( para_head. id ( ) ) )
7777 }
7878}
7979
@@ -105,7 +105,7 @@ where
105105 at_block : HeaderIdOf < P :: SourceRelayChain > ,
106106 metrics : Option < & ParachainsLoopMetrics > ,
107107 para_id : ParaId ,
108- ) -> Result < ParaHashAtSource , Self :: Error > {
108+ ) -> Result < AvailableHeader < ParaHash > , Self :: Error > {
109109 // we don't need to support many parachains now
110110 if para_id. 0 != P :: SOURCE_PARACHAIN_PARA_ID {
111111 return Err ( SubstrateError :: Custom ( format ! (
@@ -115,44 +115,28 @@ where
115115 ) ) )
116116 }
117117
118- let mut para_hash_at_source = ParaHashAtSource :: None ;
119- let mut para_header_number_at_source = None ;
120- match self . on_chain_parachain_header ( at_block, para_id) . await ? {
121- Some ( parachain_header) => {
122- para_hash_at_source = ParaHashAtSource :: Some ( parachain_header. hash ( ) ) ;
123- para_header_number_at_source = Some ( * parachain_header. number ( ) ) ;
124- // never return head that is larger than requested. This way we'll never sync
125- // headers past `maximal_header_id`
126- if let Some ( ref maximal_header_id) = self . maximal_header_id {
127- let maximal_header_id = * maximal_header_id. lock ( ) . await ;
128- match maximal_header_id {
129- Some ( maximal_header_id)
130- if * parachain_header. number ( ) > maximal_header_id. 0 =>
131- {
132- // we don't want this header yet => let's report previously requested
133- // header
134- para_hash_at_source = ParaHashAtSource :: Some ( maximal_header_id. 1 ) ;
135- para_header_number_at_source = Some ( maximal_header_id. 0 ) ;
136- } ,
137- Some ( _) => ( ) ,
138- None => {
139- // on-demand relay has not yet asked us to sync anything let's do that
140- para_hash_at_source = ParaHashAtSource :: Unavailable ;
141- para_header_number_at_source = None ;
142- } ,
143- }
144- }
145- } ,
146- None => { } ,
147- } ;
118+ let mut para_head_id = AvailableHeader :: Missing ;
119+ if let Some ( on_chain_para_head_id) = self . on_chain_para_head_id ( at_block, para_id) . await ? {
120+ // Never return head that is larger than requested. This way we'll never sync
121+ // headers past `max_header_id`.
122+ para_head_id = match * self . max_head_id . lock ( ) . await {
123+ AvailableHeader :: Unavailable => AvailableHeader :: Unavailable ,
124+ AvailableHeader :: Missing => {
125+ // `max_header_id` is not set. There is no limit.
126+ AvailableHeader :: Available ( on_chain_para_head_id)
127+ } ,
128+ AvailableHeader :: Available ( max_head_id) => {
129+ // We report at most `max_header_id`.
130+ AvailableHeader :: Available ( std:: cmp:: min ( on_chain_para_head_id, max_head_id) )
131+ } ,
132+ }
133+ }
148134
149- if let ( Some ( metrics) , Some ( para_header_number_at_source) ) =
150- ( metrics, para_header_number_at_source)
151- {
152- metrics. update_best_parachain_block_at_source ( para_id, para_header_number_at_source) ;
135+ if let ( Some ( metrics) , AvailableHeader :: Available ( para_head_id) ) = ( metrics, para_head_id) {
136+ metrics. update_best_parachain_block_at_source ( para_id, para_head_id. 0 ) ;
153137 }
154138
155- Ok ( para_hash_at_source )
139+ Ok ( para_head_id . map ( |para_head_id| para_head_id . 1 ) )
156140 }
157141
158142 async fn prove_parachain_heads (
0 commit comments