@@ -111,16 +111,7 @@ impl<const N: usize> Store<N> {
111111 /// Returns the current free space in the queue, in bytes. This is raw;
112112 /// subtract 12 to get the largest single message that can be enqueued.
113113 pub fn free_space ( & self ) -> usize {
114- match self . insert_state {
115- InsertState :: Collecting => N - self . storage . len ( ) ,
116- InsertState :: Losing { .. } => {
117- // Indicate how much space we have after recovery succeeds,
118- // which may be zero if we can't recover yet.
119- ( N - self . storage . len ( ) )
120- . saturating_sub ( OVERHEAD )
121- . saturating_sub ( DATA_LOSS_LEN )
122- }
123- }
114+ N - self . storage . len ( )
124115 }
125116
126117 /// Inserts a record, or records it as lost.
@@ -275,7 +266,7 @@ impl<const N: usize> Store<N> {
275266
276267 match & mut self . insert_state {
277268 InsertState :: Collecting => {
278- let room = self . storage . capacity ( ) - self . storage . len ( ) ;
269+ let room = self . free_space ( ) ;
279270 if data_len. is_some_and ( |n| room >= OVERHEAD + n as usize ) {
280271 self . write_header (
281272 data_len. unwrap_lite ( ) ,
@@ -320,9 +311,17 @@ impl<const N: usize> Store<N> {
320311 fn recover_if_required ( & mut self , space_required : Option < usize > ) {
321312 // We only need to take action if we're in Losing state.
322313 if let InsertState :: Losing { count, timestamp } = self . insert_state {
323- // Note: already includes OVERHEAD/DATA_LOSS_LEN
324314 let room = self . free_space ( ) ;
325- let required = space_required. unwrap_or ( 0 ) ;
315+
316+ // We can recover only if there is room for both the required
317+ // amount of space *and* the additional loss record we must
318+ // insert in order to recover.
319+ let required =
320+ // The space we hope to be able to use after recovery
321+ space_required. unwrap_or ( 0 )
322+ // The length of the loss record itself
323+ + DATA_LOSS_LEN + OVERHEAD
324+ ;
326325 if room >= required {
327326 // We can recover!
328327 self . write_header (
@@ -634,9 +633,9 @@ mod tests {
634633
635634 // Fill half the buffer.
636635 s. insert ( ANOTHER_FAKE_TID , 5 , & [ 0 ; 32 - OVERHEAD ] ) ;
637- // Try to fill the other half of the buffer, *to the brim*. Allowing
638- // this record in will mean that the buffer no longer has space for a
639- // last loss record, so this record should *not* be accepted .
636+ // Try to fill the other half of the buffer, *to the brim*. After this
637+ // record is accepted, we start losing data, but we cannot yet create a
638+ // loss record until something is removed from the buffer .
640639 s. insert ( ANOTHER_FAKE_TID , 6 , & [ 0 ; 32 - OVERHEAD ] ) ;
641640 // This one definitely gets lost.
642641 s. insert ( ANOTHER_FAKE_TID , 7 , & [ 0 ; 32 - OVERHEAD ] ) ;
@@ -653,12 +652,37 @@ mod tests {
653652 }
654653 ) ;
655654 assert_eq ! (
656- snapshot[ 1 ] . decode_as :: < LossRecord > ( ) ,
655+ snapshot[ 1 ] ,
657656 Item {
658657 ena: 3 ,
659- tid: OUR_FAKE_TID ,
658+ tid: ANOTHER_FAKE_TID ,
660659 timestamp: 6 ,
661- contents: LossRecord { lost: Some ( 2 ) } ,
660+ contents: Vec :: from( [ 0 ; 32 - OVERHEAD ] )
661+ }
662+ ) ;
663+
664+ // Flush a record. We will now record the lost data, as there's now
665+ // room for the loss record.
666+ s. flush_thru ( 2 ) ;
667+
668+ let snapshot: Vec < Item < Vec < u8 > > > = copy_contents_raw ( & mut s) ;
669+ assert_eq ! ( snapshot. len( ) , 2 , "{snapshot:?}" ) ;
670+ assert_eq ! (
671+ snapshot[ 0 ] ,
672+ Item {
673+ ena: 3 ,
674+ tid: ANOTHER_FAKE_TID ,
675+ timestamp: 6 ,
676+ contents: Vec :: from( [ 0 ; 32 - OVERHEAD ] )
677+ }
678+ ) ;
679+ assert_eq ! (
680+ snapshot[ 1 ] . decode_as:: <LossRecord >( ) ,
681+ Item {
682+ ena: 4 ,
683+ tid: OUR_FAKE_TID ,
684+ timestamp: 7 ,
685+ contents: LossRecord { lost: Some ( 1 ) } ,
662686 }
663687 ) ;
664688 }
0 commit comments