Skip to content

Commit ebaad0b

Browse files
committed
prototype: merge the union level into regular levels
1 parent 0343a45 commit ebaad0b

File tree

2 files changed

+121
-25
lines changed

2 files changed

+121
-25
lines changed

prototypes/ScheduledMerges.hs

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ module ScheduledMerges (
6565
MergeDebt(..),
6666
NominalCredit(..),
6767
NominalDebt(..),
68+
maxBufferSize,
6869
Run,
6970
runSize,
7071
UnionCredits (..),
@@ -85,6 +86,7 @@ import Prelude hiding (lookup)
8586

8687
import Data.Bits
8788
import Data.Foldable (for_, toList, traverse_)
89+
import Data.Functor ((<&>))
8890
import Data.Map.Strict (Map)
8991
import qualified Data.Map.Strict as Map
9092
import Data.Maybe (catMaybes)
@@ -344,19 +346,25 @@ invariant (LSMContent _ levels ul) = do
344346
expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s ()
345347
expectedRunLengths ln rs ls =
346348
case mergePolicyForLevel ln ls ul of
347-
-- Levels using levelling have only one (incoming) run, which almost
348-
-- always consists of an ongoing merge. The exception is when a
349-
-- levelling run becomes too large and is promoted, in that case
350-
-- initially there's no merge, but it is still represented as an
351-
-- 'IncomingRun', using 'Single'. Thus there are no other resident runs.
352-
MergePolicyLevelling -> assertST $ null rs
353-
-- Runs in tiering levels usually fit that size, but they can be one
354-
-- larger, if a run has been held back (creating a 5-way merge).
355-
MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs
356-
-- (This is actually still not really true, but will hold in practice.
357-
-- In the pathological case, all runs passed to the next level can be
358-
-- factor (5/4) too large, and there the same holding back can lead to
359-
-- factor (6/4) etc., until at level 12 a run is two levels too large.
349+
MergePolicyLevelling ->
350+
-- Levels using levelling have only one (incoming) run, which almost
351+
-- always consists of an ongoing merge. The exception is when a
352+
-- levelling run becomes too large and is promoted, in that case
353+
-- initially there's no merge, but it is still represented as an
354+
-- 'IncomingRun', using 'Single'. Thus there are no other resident
355+
-- runs.
356+
assertST $ null rs
357+
MergePolicyTiering -> do
358+
-- Runs in tiering levels usually fit that size, but they can be one
359+
-- larger, if a run has been held back (creating a 5-way merge).
360+
--
361+
-- TODO: This is actually still not really true, but will hold in
362+
-- practice. In the pathological case, all runs passed to the next
363+
-- level can be factor (5/4) too large, and there the same holding
364+
-- back can lead to factor (6/4) etc., until at level 12 a run is two
365+
-- levels too large.
366+
assertST $ all (\r -> runSize r > 0) rs
367+
assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs
360368

361369
-- Incoming runs being merged also need to be of the right size, but the
362370
-- conditions are more complicated.
@@ -367,11 +375,12 @@ invariant (LSMContent _ levels ul) = do
367375
MergePolicyLevelling -> do
368376
case (ir, mrs) of
369377
-- A single incoming run (which thus didn't need merging) must be
370-
-- of the expected size range already
378+
-- of the expected size range already, but it could also be smaller
379+
-- if it comes from a union level.
371380
(Single r, m) -> do
372381
assertST $ case m of CompletedMerge{} -> True
373382
OngoingMerge{} -> False
374-
assertST $ levellingRunSizeToLevel r == ln
383+
assertST $ levellingRunSizeToLevel r <= ln
375384

376385
-- A completed merge for levelling can be of almost any size at all!
377386
-- It can be smaller, due to deletions in the last level. But it
@@ -496,6 +505,11 @@ isCompletedMergingTree (MergingTree ref) = do
496505
OngoingTreeMerge mr -> isCompletedMergingRun mr
497506
PendingTreeMerge _ -> failI $ "not completed: PendingTreeMerge"
498507

508+
getCompletedMergingTree :: MergingTree s -> ST s (Maybe Run)
509+
getCompletedMergingTree t =
510+
either (const Nothing) Just
511+
<$> evalInvariant (isCompletedMergingTree t)
512+
499513
type Invariant s = E.ExceptT String (ST s)
500514

501515
assertI :: String -> Bool -> Invariant s ()
@@ -781,8 +795,11 @@ update tr (LSMHandle scr lsmr) k op = do
781795
let wb' = Map.insertWith combine k op wb
782796
if bufferSize wb' >= maxBufferSize
783797
then do
784-
ls' <- increment tr sc (bufferToRun wb') ls unionLevel
785-
let content' = LSMContent Map.empty ls' unionLevel
798+
-- Before adding the run to the regular levels, we check if we can get
799+
-- rid of the union level (by moving it into into the regular ones).
800+
(ls', ul') <- migrateUnionLevel tr sc ls unionLevel
801+
ls'' <- increment tr sc (bufferToRun wb') ls' ul'
802+
let content' = LSMContent Map.empty ls'' ul'
786803
invariant content'
787804
writeSTRef lsmr content'
788805
else
@@ -1158,9 +1175,44 @@ depositNominalCredit (NominalDebt nominalDebt)
11581175
-- Updates
11591176
--
11601177

1178+
-- | At some point, we want to merge the union level with the regular levels.
1179+
-- We achieve this by moving it into a new last regular level once it is both
1180+
-- completed (merged down to a single run) and fits into such a new level.
1181+
--
1182+
-- Our representation doesn't allow for empty levels, so we can only put the
1183+
-- run directly after the pre-existing regular levels. If it is too large for
1184+
-- that, we don't want to move it yet to avoid violating run size invariants
1185+
-- and doing inefficient merges of runs with very different sizes.
1186+
migrateUnionLevel :: forall s. Tracer (ST s) Event
1187+
-> Counter -> Levels s -> UnionLevel s
1188+
-> ST s (Levels s, UnionLevel s)
1189+
migrateUnionLevel _ _ ls NoUnion = do
1190+
-- nothing to do
1191+
return (ls, NoUnion)
1192+
migrateUnionLevel _tr _sc ls ul@(Union t _) =
1193+
-- TODO: tracing
1194+
getCompletedMergingTree t <&> \case
1195+
Just r
1196+
| null r ->
1197+
-- If the union level is empty, we can just drop it.
1198+
(ls, NoUnion)
1199+
| levellingRunSizeToLevel r <= length ls + 1 ->
1200+
-- If it fits into a hypothetical new last level, put it there.
1201+
--
1202+
-- TODO: In some cases it seems desirable to even add it to the
1203+
-- existing last regular level (so it becomes part of a merge
1204+
-- sooner), but that would lead to additional merging work that was
1205+
-- not accounted for. We'd need to be careful to ensure the merge
1206+
-- completes in time, without doing a lot of work in a short time.
1207+
(ls ++ [Level (Single r) []], NoUnion)
1208+
_ ->
1209+
-- Otherwise, just leave it for now.
1210+
(ls, ul)
1211+
11611212
increment :: forall s. Tracer (ST s) Event
1162-
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
1163-
increment tr sc run0 ls0 ul = do
1213+
-> Counter -> Run -> Levels s -> UnionLevel s
1214+
-> ST s (Levels s)
1215+
increment tr sc run0 ls0 ul =
11641216
go 1 [run0] ls0
11651217
where
11661218
mergeTypeFor :: Levels s -> LevelMergeType

prototypes/ScheduledMergesTest.hs

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ tests :: TestTree
2424
tests = testGroup "Unit and property tests"
2525
[ testCase "test_regression_empty_run" test_regression_empty_run
2626
, testCase "test_merge_again_with_incoming" test_merge_again_with_incoming
27-
, testProperty "prop_union" prop_union
27+
, testProperty "prop_union_supply_all" prop_union_supply_all
28+
, testProperty "prop_union_merge_into_levels" prop_union_merge_into_levels
2829
, testGroup "T"
2930
[ localOption (QuickCheckTests 1000) $ -- super quick, run more
3031
testProperty "Arbitrary satisfies invariant" prop_arbitrarySatisfiesInvariant
@@ -176,29 +177,72 @@ test_merge_again_with_incoming =
176177
-- properties
177178
--
178179

180+
-- TODO: also generate nested unions?
181+
179182
-- | Supplying enough credits for the remaining debt completes the union merge.
180-
prop_union :: [[(LSM.Key, LSM.Op)]] -> Property
181-
prop_union kopss = length (filter (not . null) kopss) > 1 QC.==>
183+
prop_union_supply_all :: [[(LSM.Key, LSM.Op)]] -> Property
184+
prop_union_supply_all kopss = length (filter (not . null) kopss) > 1 QC.==>
182185
QC.ioProperty $ runWithTracer $ \tr ->
183186
stToIO $ do
184187
ts <- traverse (mkTable tr) kopss
185188
t <- LSM.unions ts
186189

187190
debt@(UnionDebt x) <- LSM.remainingUnionDebt t
188-
_ <- LSM.supplyUnionCredits t (UnionCredits x)
191+
leftovers <- LSM.supplyUnionCredits t (UnionCredits x)
189192
debt' <- LSM.remainingUnionDebt t
190193

191194
rep <- dumpRepresentation t
192195
return $ QC.counterexample (show (debt, debt')) $ QC.conjoin
193-
[ debt =/= UnionDebt 0
194-
, debt' === UnionDebt 0
196+
[ QC.counterexample "debt" $ debt =/= UnionDebt 0
197+
, QC.counterexample "debt'" $ debt' === UnionDebt 0
198+
, QC.counterexample "leftovers" $ leftovers >= 0
195199
, hasUnionWith isCompleted rep
196200
]
197201
where
198202
isCompleted = \case
199203
MLeaf{} -> True
200204
MNode{} -> False
201205

206+
-- | The union level will get merged into the last regular level once the union
207+
-- merge is completed and sufficient new entries have been inserted.
208+
prop_union_merge_into_levels :: [[(LSM.Key, LSM.Op)]] -> Property
209+
prop_union_merge_into_levels kopss = length (filter (not . null) kopss) > 1 QC.==>
210+
QC.forAll arbitrary $ \firstPay ->
211+
QC.ioProperty $ runWithTracer $ \tr ->
212+
stToIO $ do
213+
ts <- traverse (mkTable tr) kopss
214+
t <- LSM.unions ts
215+
216+
-- pay off the union and insert enough that it fits into
217+
-- the last level
218+
let payOffDebt = do
219+
UnionDebt d <- LSM.remainingUnionDebt t
220+
_ <- LSM.supplyUnionCredits t (UnionCredits d)
221+
return ()
222+
223+
-- insert as many new entries as there are in the completed
224+
-- union level
225+
let fillTable = do
226+
unionRunSize <- length <$> LSM.logicalValue t
227+
LSM.inserts tr t
228+
[(K k, V 0, Nothing) | k <- [1 .. unionRunSize]]
229+
230+
-- we can do these in any order
231+
if firstPay
232+
then payOffDebt >> fillTable
233+
else fillTable >> payOffDebt
234+
235+
-- then flush the write buffer
236+
LSM.inserts tr t
237+
[(K k, V 0, Nothing) | k <- [1 .. maxBufferSize]]
238+
239+
(_, _, mtree) <- representationShape <$> dumpRepresentation t
240+
241+
-- the union level is gone
242+
return $ QC.conjoin
243+
[ mtree === Nothing
244+
]
245+
202246
mkTable :: Tracer (ST s) Event -> [(LSM.Key, LSM.Op)] -> ST s (LSM s)
203247
mkTable tr ks = do
204248
t <- LSM.new

0 commit comments

Comments
 (0)