@@ -375,11 +375,13 @@ where
375
375
/// of `self` called `inner`.
376
376
macro_rules! delegate_query_fragment_impl {
377
377
( $parent: ty) => {
378
- impl QueryFragment <Pg > for $parent {
378
+ impl :: diesel:: query_builder:: QueryFragment <:: diesel:: pg:: Pg >
379
+ for $parent
380
+ {
379
381
fn walk_ast<' a>(
380
382
& ' a self ,
381
- out: AstPass <' _, ' a, Pg >,
382
- ) -> diesel:: QueryResult <( ) > {
383
+ out: :: diesel :: query_builder :: AstPass <' _, ' a, :: diesel :: pg :: Pg >,
384
+ ) -> :: diesel:: QueryResult <( ) > {
383
385
self . inner. walk_ast( out)
384
386
}
385
387
}
@@ -470,3 +472,182 @@ impl<Item> ShiftGenerator<Item> for DefaultShiftGenerator<Item> {
470
472
& self . min_shift
471
473
}
472
474
}
475
+
476
+ #[ cfg( test) ]
477
+ mod tests {
478
+
479
+ use super :: DefaultShiftGenerator ;
480
+ use super :: NextItem ;
481
+ use crate :: db;
482
+ use async_bb8_diesel:: AsyncRunQueryDsl ;
483
+ use async_bb8_diesel:: AsyncSimpleConnection ;
484
+ use chrono:: DateTime ;
485
+ use chrono:: Utc ;
486
+ use diesel:: pg:: Pg ;
487
+ use diesel:: query_builder:: AstPass ;
488
+ use diesel:: query_builder:: QueryFragment ;
489
+ use diesel:: query_builder:: QueryId ;
490
+ use diesel:: Column ;
491
+ use diesel:: Insertable ;
492
+ use diesel:: SelectableHelper ;
493
+ use nexus_test_utils:: db:: test_setup_database;
494
+ use omicron_test_utils:: dev;
495
+ use std:: sync:: Arc ;
496
+ use uuid:: Uuid ;
497
+
498
+ table ! {
499
+ test_schema. item ( id) {
500
+ id -> Uuid ,
501
+ value -> Int4 ,
502
+ time_deleted -> Nullable <Timestamptz >,
503
+ }
504
+ }
505
+
506
+ async fn setup_test_schema ( pool : & db:: Pool ) {
507
+ let connection = pool. pool ( ) . get ( ) . await . unwrap ( ) ;
508
+ ( * connection)
509
+ . batch_execute_async (
510
+ "CREATE SCHEMA IF NOT EXISTS test_schema; \
511
+ CREATE TABLE IF NOT EXISTS test_schema.item ( \
512
+ id UUID PRIMARY KEY, \
513
+ value INT4 NOT NULL, \
514
+ time_deleted TIMESTAMPTZ \
515
+ ); \
516
+ TRUNCATE test_schema.item; \
517
+ CREATE UNIQUE INDEX ON test_schema.item (value) WHERE time_deleted IS NULL; \
518
+ ")
519
+ . await
520
+ . unwrap ( )
521
+ }
522
+
523
+ // Describes an item to be allocated with a NextItem query
524
+ #[ derive( Queryable , Debug , Insertable , Selectable , Clone ) ]
525
+ #[ diesel( table_name = item) ]
526
+ struct Item {
527
+ id : Uuid ,
528
+ value : i32 ,
529
+ time_deleted : Option < DateTime < Utc > > ,
530
+ }
531
+
532
+ #[ derive( Debug , Clone , Copy ) ]
533
+ struct NextItemQuery {
534
+ inner : NextItem < item:: dsl:: item , i32 , item:: dsl:: value > ,
535
+ }
536
+
537
+ // These implementations are needed to actually allow inserting the results
538
+ // of the `NextItemQuery` itself.
539
+ impl NextItemQuery {
540
+ fn new ( generator : DefaultShiftGenerator < i32 > ) -> Self {
541
+ Self { inner : NextItem :: new_unscoped ( generator) }
542
+ }
543
+ }
544
+
545
+ delegate_query_fragment_impl ! ( NextItemQuery ) ;
546
+
547
+ impl QueryId for NextItemQuery {
548
+ type QueryId = ( ) ;
549
+ const HAS_STATIC_QUERY_ID : bool = false ;
550
+ }
551
+
552
+ impl Insertable < item:: table > for NextItemQuery {
553
+ type Values = NextItemQueryValues ;
554
+ fn values ( self ) -> Self :: Values {
555
+ NextItemQueryValues ( self )
556
+ }
557
+ }
558
+
559
+ #[ derive( Debug , Clone ) ]
560
+ struct NextItemQueryValues ( NextItemQuery ) ;
561
+
562
+ impl QueryId for NextItemQueryValues {
563
+ type QueryId = ( ) ;
564
+ const HAS_STATIC_QUERY_ID : bool = false ;
565
+ }
566
+
567
+ impl diesel:: insertable:: CanInsertInSingleQuery < Pg > for NextItemQueryValues {
568
+ fn rows_to_insert ( & self ) -> Option < usize > {
569
+ Some ( 1 )
570
+ }
571
+ }
572
+
573
+ impl QueryFragment < Pg > for NextItemQueryValues {
574
+ fn walk_ast < ' a > (
575
+ & ' a self ,
576
+ mut out : AstPass < ' _ , ' a , Pg > ,
577
+ ) -> diesel:: QueryResult < ( ) > {
578
+ out. push_sql ( "(" ) ;
579
+ out. push_identifier ( item:: dsl:: id:: NAME ) ?;
580
+ out. push_sql ( ", " ) ;
581
+ out. push_identifier ( item:: dsl:: value:: NAME ) ?;
582
+ out. push_sql ( ", " ) ;
583
+ out. push_identifier ( item:: dsl:: time_deleted:: NAME ) ?;
584
+ out. push_sql ( ") VALUES (gen_random_uuid(), (" ) ;
585
+ self . 0 . walk_ast ( out. reborrow ( ) ) ?;
586
+ out. push_sql ( "), NULL)" ) ;
587
+ Ok ( ( ) )
588
+ }
589
+ }
590
+
591
+ // Test that we correctly insert the next available item
592
+ #[ tokio:: test]
593
+ async fn test_wrapping_next_item_query ( ) {
594
+ // Setup the test database
595
+ let logctx = dev:: test_setup_log ( "test_wrapping_next_item_query" ) ;
596
+ let log = logctx. log . new ( o ! ( ) ) ;
597
+ let mut db = test_setup_database ( & log) . await ;
598
+ let cfg = crate :: db:: Config { url : db. pg_config ( ) . clone ( ) } ;
599
+ let pool = Arc :: new ( crate :: db:: Pool :: new ( & cfg) ) ;
600
+
601
+ // We're going to operate on a separate table, for simplicity.
602
+ setup_test_schema ( & pool) . await ;
603
+
604
+ // We'll first insert an item at 0.
605
+ //
606
+ // This generator should start at 0, and then select over the range [0,
607
+ // 10], wrapping back to 0.
608
+ let generator =
609
+ DefaultShiftGenerator { base : 0 , max_shift : 10 , min_shift : 0 } ;
610
+ let query = NextItemQuery :: new ( generator) ;
611
+ let it = diesel:: insert_into ( item:: dsl:: item)
612
+ . values ( query)
613
+ . returning ( Item :: as_returning ( ) )
614
+ . get_result_async ( pool. pool ( ) )
615
+ . await
616
+ . unwrap ( ) ;
617
+ assert_eq ! ( it. value, 0 ) ;
618
+
619
+ // Insert the same query again, which should give us 1 now.
620
+ let it = diesel:: insert_into ( item:: dsl:: item)
621
+ . values ( query)
622
+ . returning ( Item :: as_returning ( ) )
623
+ . get_result_async ( pool. pool ( ) )
624
+ . await
625
+ . unwrap ( ) ;
626
+ assert_eq ! ( it. value, 1 ) ;
627
+
628
+ // Insert 10, and guarantee that we get it back.
629
+ let generator =
630
+ DefaultShiftGenerator { base : 10 , max_shift : 0 , min_shift : -10 } ;
631
+ let query = NextItemQuery :: new ( generator) ;
632
+ let it = diesel:: insert_into ( item:: dsl:: item)
633
+ . values ( query)
634
+ . returning ( Item :: as_returning ( ) )
635
+ . get_result_async ( pool. pool ( ) )
636
+ . await
637
+ . unwrap ( ) ;
638
+ assert_eq ! ( it. value, 10 ) ;
639
+
640
+ // Now, insert the same query again. Since 0, 1, and 10 are all
641
+ // allocated, we should wrap around and insert 2.
642
+ let it = diesel:: insert_into ( item:: dsl:: item)
643
+ . values ( query)
644
+ . returning ( Item :: as_returning ( ) )
645
+ . get_result_async ( pool. pool ( ) )
646
+ . await
647
+ . unwrap ( ) ;
648
+ assert_eq ! ( it. value, 2 ) ;
649
+
650
+ db. cleanup ( ) . await . unwrap ( ) ;
651
+ logctx. cleanup_successful ( ) ;
652
+ }
653
+ }
0 commit comments