-
-
Notifications
You must be signed in to change notification settings - Fork 389
Review masking and add traces when things don't cancel timely #2768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9464bbe
4db2234
f02c5ae
6019267
4a63919
596cd3e
a5d8316
973a0a7
df972e4
9822e02
3754ca6
6fc9189
b2108d3
866a62a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
{-# LANGUAGE RecordWildCards #-} | ||
{-# LANGUAGE ScopedTypeVariables #-} | ||
{-# LANGUAGE TypeFamilies #-} | ||
{-# LANGUAGE TupleSections #-} | ||
|
||
module Development.IDE.Graph.Internal.Database (newDatabase, incDatabase, build, getDirtySet, getKeysAndVisitAge) where | ||
|
||
|
@@ -32,14 +33,15 @@ import Data.IORef.Extra | |
import Data.Maybe | ||
import Data.Traversable (for) | ||
import Data.Tuple.Extra | ||
import Debug.Trace (traceM) | ||
import Development.IDE.Graph.Classes | ||
import Development.IDE.Graph.Internal.Rules | ||
import Development.IDE.Graph.Internal.Types | ||
import qualified Focus | ||
import qualified ListT | ||
import qualified StmContainers.Map as SMap | ||
import System.Time.Extra (duration, sleep) | ||
import System.IO.Unsafe | ||
import System.Time.Extra (duration) | ||
|
||
newDatabase :: Dynamic -> TheRules -> IO Database | ||
newDatabase databaseExtra databaseRules = do | ||
|
@@ -120,7 +122,7 @@ builder db@Database{..} stack keys = withRunInIO $ \(RunInIO run) -> do | |
pure (id, val) | ||
|
||
toForceList <- liftIO $ readTVarIO toForce | ||
let waitAll = run $ mapConcurrentlyAIO_ id toForceList | ||
let waitAll = run $ waitConcurrently_ toForceList | ||
case toForceList of | ||
[] -> return $ Left results | ||
_ -> return $ Right $ do | ||
|
@@ -170,6 +172,10 @@ compute db@Database{..} stack key mode result = do | |
deps | not(null deps) | ||
&& runChanged /= ChangedNothing | ||
-> do | ||
-- IMPORTANT: record the reverse deps **before** marking the key Clean. | ||
-- If an async exception strikes before the deps have been recorded, | ||
-- we won't be able to accurately propagate dirtiness for this key | ||
-- on the next build. | ||
void $ | ||
updateReverseDeps key db | ||
(getResultDepsDefault [] previousDeps) | ||
|
@@ -224,7 +230,8 @@ updateReverseDeps | |
-> [Key] -- ^ Previous direct dependencies of Id | ||
-> HashSet Key -- ^ Current direct dependencies of Id | ||
-> IO () | ||
updateReverseDeps myId db prev new = uninterruptibleMask_ $ do | ||
-- mask to ensure that all the reverse dependencies are updated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one I wonder if we could avoid by pushing the scope of the STM transaction out a bit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. STM transactions can still be interrupted by exceptions, so the question is how badly do we want to record the reverse deps. The answer is very badly - failure to do so will lead to unsoundness, specially if this is the first build. EDIT: oh, correcting myself, since we don't mark the build as done until after the reverse keys have updated, I don't think that the mask is needed at all. |
||
updateReverseDeps myId db prev new = do | ||
forM_ prev $ \d -> | ||
unless (d `HSet.member` new) $ | ||
doOne (HSet.delete myId) d | ||
|
@@ -252,20 +259,27 @@ transitiveDirtySet database = flip State.execStateT HSet.empty . traverse_ loop | |
next <- lift $ atomically $ getReverseDependencies database x | ||
traverse_ loop (maybe mempty HSet.toList next) | ||
|
||
-- | IO extended to track created asyncs to clean them up when the thread is killed, | ||
-- generalizing 'withAsync' | ||
-------------------------------------------------------------------------------- | ||
-- Asynchronous computations with cancellation | ||
|
||
-- | A simple monad to implement cancellation on top of 'Async', | ||
-- generalizing 'withAsync' to monadic scopes. | ||
newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async ()]) IO a } | ||
deriving newtype (Applicative, Functor, Monad, MonadIO) | ||
|
||
-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises | ||
runAIO :: AIO a -> IO a | ||
runAIO (AIO act) = do | ||
asyncs <- newIORef [] | ||
runReaderT act asyncs `onException` cleanupAsync asyncs | ||
|
||
-- | Like 'async' but with built-in cancellation. | ||
-- Returns an IO action to wait on the result. | ||
asyncWithCleanUp :: AIO a -> AIO (IO a) | ||
asyncWithCleanUp act = do | ||
st <- AIO ask | ||
io <- unliftAIO act | ||
-- mask to make sure we keep track of the spawned async | ||
liftIO $ uninterruptibleMask $ \restore -> do | ||
a <- async $ restore io | ||
atomicModifyIORef'_ st (void a :) | ||
|
@@ -284,27 +298,40 @@ withRunInIO k = do | |
k $ RunInIO (\aio -> runReaderT (unAIO aio) st) | ||
|
||
cleanupAsync :: IORef [Async a] -> IO () | ||
cleanupAsync ref = uninterruptibleMask_ $ do | ||
asyncs <- readIORef ref | ||
-- mask to make sure we interrupt all the asyncs | ||
cleanupAsync ref = uninterruptibleMask $ \unmask -> do | ||
asyncs <- atomicModifyIORef' ref ([],) | ||
-- interrupt all the asyncs without waiting | ||
mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs | ||
mapM_ waitCatch asyncs | ||
-- Wait until all the asyncs are done | ||
-- But if it takes more than 10 seconds, log to stderr | ||
unless (null asyncs) $ do | ||
let warnIfTakingTooLong = unmask $ forever $ do | ||
sleep 10 | ||
traceM "cleanupAsync: waiting for asyncs to finish" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really want to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are seeing this trace, then most likely Plus, I didn't want to thread a logger around.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're in IO, you could just print to stderr. It's a small difference, but I do think not having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a fair point |
||
withAsync warnIfTakingTooLong $ \_ -> | ||
mapM_ waitCatch asyncs | ||
|
||
data Wait | ||
= Wait {justWait :: !(IO ())} | ||
| Spawn {justWait :: !(IO ())} | ||
|
||
data Wait a | ||
= Wait {justWait :: !a} | ||
| Spawn {justWait :: !a} | ||
deriving Functor | ||
fmapWait :: (IO () -> IO ()) -> Wait -> Wait | ||
fmapWait f (Wait io) = Wait (f io) | ||
fmapWait f (Spawn io) = Spawn (f io) | ||
|
||
waitOrSpawn :: Wait (IO a) -> IO (Either (IO a) (Async a)) | ||
waitOrSpawn :: Wait -> IO (Either (IO ()) (Async ())) | ||
waitOrSpawn (Wait io) = pure $ Left io | ||
waitOrSpawn (Spawn io) = Right <$> async io | ||
|
||
mapConcurrentlyAIO_ :: (a -> IO ()) -> [Wait a] -> AIO () | ||
mapConcurrentlyAIO_ _ [] = pure () | ||
mapConcurrentlyAIO_ f [one] = liftIO $ justWait $ fmap f one | ||
mapConcurrentlyAIO_ f many = do | ||
waitConcurrently_ :: [Wait] -> AIO () | ||
waitConcurrently_ [] = pure () | ||
waitConcurrently_ [one] = liftIO $ justWait one | ||
waitConcurrently_ many = do | ||
ref <- AIO ask | ||
waits <- liftIO $ uninterruptibleMask $ \restore -> do | ||
waits <- liftIO $ traverse (waitOrSpawn . fmap (restore . f)) many | ||
-- mask to make sure we keep track of all the asyncs | ||
waits <- liftIO $ uninterruptibleMask $ \unmask -> do | ||
waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many | ||
let asyncs = rights waits | ||
liftIO $ atomicModifyIORef'_ ref (asyncs ++) | ||
return waits | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change doesn't make sense to me. I think this just makes the mask apply to the atomic execution of the STM block, which is already atomic. Previously it ensured that we would perform the effects of the returned action as well without being interrupted. But now I think it doesn't do that, and indeed maybe does nothing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still prevents the STM transaction from being interrupted by an edit, which is important to accurately track file dirtiness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can end up not calling
recordDirtyKeys
, since that happens in the IO action afterwards. Is that important? Maybe the comment could say.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IO is just logging,
recordDirtyKeys
mutates the collection inside the STM transaction. That said I appreciate how this is confusing and perhaps themask_
was better at the top level for clarity