Skip to content

agent: fix message delivery in case one of the connections has no snd queue for any reason - it could break delivery to all connections #1585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1412,37 +1412,39 @@ sendMessagesB' c reqs = do
sendMessagesB_ :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType MsgReq) -> Set ConnId -> AM' (t (Either AgentErrorType (AgentMsgId, PQEncryption)))
sendMessagesB_ c reqs connIds = withConnLocks c connIds "sendMessages" $ do
prev <- newTVarIO Nothing
reqs' <- withStoreBatch c $ \db -> fmap (bindRight $ getConn_ db prev) reqs
reqs' <- withStoreBatch c $ \db -> fmap (mapM $ getConn_ db prev) reqs
let (toEnable, reqs'') = mapAccumL prepareConn [] reqs'
void $ withStoreBatch' c $ \db -> map (\connId -> setConnPQSupport db connId PQSupportOn) $ S.toList toEnable
enqueueMessagesB c reqs''
where
getConn_ :: DB.Connection -> TVar (Maybe (Either AgentErrorType SomeConn)) -> MsgReq -> IO (Either AgentErrorType (MsgReq, SomeConn))
getConn_ :: DB.Connection -> TVar (Maybe (Either AgentErrorType SomeConn)) -> MsgReq -> IO (MsgReq, Either AgentErrorType SomeConn)
getConn_ db prev req@(connId, _, _, _) =
(req,)
<$$> if B.null connId
<$> if B.null connId
then fromMaybe (Left $ INTERNAL "sendMessagesB_: empty prev connId") <$> readTVarIO prev
else do
conn <- first storeError <$> getConn db connId
conn <$ atomically (writeTVar prev $ Just conn)
prepareConn :: Set ConnId -> Either AgentErrorType (MsgReq, SomeConn) -> (Set ConnId, Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage))
prepareConn :: Set ConnId -> Either AgentErrorType (MsgReq, Either AgentErrorType SomeConn) -> (Set ConnId, Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage))
prepareConn s (Left e) = (s, Left e)
prepareConn s (Right ((_, pqEnc, msgFlags, msgOrRef), SomeConn _ conn)) = case conn of
DuplexConnection cData _ sqs -> prepareMsg cData sqs
SndConnection cData sq -> prepareMsg cData [sq]
_ -> (s, Left $ CONN SIMPLEX "sendMessagesB_")
prepareConn s (Right ((_, pqEnc, msgFlags, msgOrRef), conn_)) = case conn_ of
Right (SomeConn cType conn) -> case conn of
DuplexConnection cData _ sqs -> prepareMsg cData sqs
SndConnection cData sq -> prepareMsg cData [sq]
-- we can't fail here, as it may prevent delivery of subsequent messages that reference the body of the failed message.
_ -> (s, mkReq $ Left $ CONN SIMPLEX $ "sendMessagesB_ " <> show (connType cType))
Left e -> (s, mkReq $ Left e)
where
prepareMsg :: ConnData -> NonEmpty SndQueue -> (Set ConnId, Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage))
prepareMsg :: ConnData -> NonEmpty SndQueue -> (Set ConnId, Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage))
prepareMsg cData@ConnData {connId, pqSupport} sqs
| ratchetSyncSendProhibited cData = (s, Left $ CMD PROHIBITED "sendMessagesB: send prohibited")
| ratchetSyncSendProhibited cData = (s, mkReq $ Left $ CMD PROHIBITED "sendMessagesB: send prohibited")
-- connection is only updated if PQ encryption was disabled, and now it has to be enabled.
-- support for PQ encryption (small message envelopes) will not be disabled when message is sent.
| pqEnc == PQEncOn && pqSupport == PQSupportOff =
let cData' = cData {pqSupport = PQSupportOn} :: ConnData
in (S.insert connId s, mkReq cData')
| otherwise = (s, mkReq cData)
where
mkReq cData' = Right (cData', sqs, Just pqEnc, msgFlags, A_MSG <$> msgOrRef)
in (S.insert connId s, mkReq $ Right (cData', sqs))
| otherwise = (s, mkReq $ Right (cData, sqs))
mkReq csqs_ = Right (csqs_, Just pqEnc, msgFlags, A_MSG <$> msgOrRef)

-- / async command processing v v v

Expand Down Expand Up @@ -1626,10 +1628,10 @@ enqueueMessages c cData sqs msgFlags aMessage = do

enqueueMessages' :: AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> AM (AgentMsgId, CR.PQEncryption)
enqueueMessages' c cData sqs msgFlags aMessage =
ExceptT $ runIdentity <$> enqueueMessagesB c (Identity (Right (cData, sqs, Nothing, msgFlags, vrValue aMessage)))
ExceptT $ runIdentity <$> enqueueMessagesB c (Identity (Right (Right (cData, sqs), Nothing, msgFlags, vrValue aMessage)))
{-# INLINE enqueueMessages' #-}

enqueueMessagesB :: Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType (AgentMsgId, PQEncryption)))
enqueueMessagesB :: Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType (AgentMsgId, PQEncryption)))
enqueueMessagesB c reqs = do
reqs' <- enqueueMessageB c reqs
enqueueSavedMessageB c $ mapMaybe snd $ rights $ toList reqs'
Expand All @@ -1641,35 +1643,50 @@ isActiveSndQ SndQueue {status} = status == Secured || status == Active

enqueueMessage :: AgentClient -> ConnData -> SndQueue -> MsgFlags -> AMessage -> AM (AgentMsgId, PQEncryption)
enqueueMessage c cData sq msgFlags aMessage =
ExceptT $ fmap fst . runIdentity <$> enqueueMessageB c (Identity (Right (cData, [sq], Nothing, msgFlags, vrValue aMessage)))
ExceptT $ fmap fst . runIdentity <$> enqueueMessageB c (Identity (Right (Right (cData, [sq]), Nothing, msgFlags, vrValue aMessage)))
{-# INLINE enqueueMessage #-}

-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB c reqs = do
cfg <- asks config
(_, reqMids) <- unsafeWithStore c $ \db -> do
mapAccumLM (\ids r -> storeSentMsg db cfg ids r `E.catchAny` \e -> (ids,) <$> handleInternal e) IM.empty reqs
forME reqMids $ \((cData, sq :| sqs, _, _, _), InternalId msgId, pqSecr) -> do
forME reqMids $ \((csqs_, _, _, _), InternalId msgId, pqSecr) -> forM csqs_ $ \(cData, sq :| sqs) -> do
submitPendingMsg c cData sq
let sqs' = filter isActiveSndQ sqs
pure $ Right ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
pure ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
where
storeSentMsg :: DB.Connection -> AgentConfig -> IntMap (Int64, AMessage) -> Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage) -> IO (IntMap (Int64, AMessage), Either AgentErrorType ((ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, ValueOrRef AMessage), InternalId, PQEncryption))
storeSentMsg ::
DB.Connection ->
AgentConfig ->
IntMap (Maybe Int64, AMessage) ->
Either AgentErrorType (Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage) ->
IO (IntMap (Maybe Int64, AMessage), Either AgentErrorType ((Either AgentErrorType (ConnData, NonEmpty SndQueue), Maybe PQEncryption, MsgFlags, ValueOrRef AMessage), InternalId, PQEncryption))
storeSentMsg db cfg aMessageIds = \case
Left e -> pure (aMessageIds, Left e)
Right req@(cData@ConnData {connId}, sq :| _, pqEnc_, msgFlags, mbr) -> case mbr of
Right req@(csqs_, pqEnc_, msgFlags, mbr) -> case mbr of
VRValue i_ aMessage -> case i_ >>= (`IM.lookup` aMessageIds) of
Just _ -> pure (aMessageIds, Left $ INTERNAL "enqueueMessageB: storeSentMsg duplicate saved message body")
Nothing -> do
mbId <- createSndMsgBody db aMessage
let aMessageIds' = maybe id (`IM.insert` (mbId, aMessage)) i_ aMessageIds
(aMessageIds',) <$> storeSentMsg_ mbId aMessage
VRRef i -> (aMessageIds,) <$> case IM.lookup i aMessageIds of
Just (mbId, aMessage) -> storeSentMsg_ mbId aMessage
Nothing -> pure $ Left $ INTERNAL "enqueueMessageB: storeSentMsg missing saved message body id"
(mbId_, r) <- case csqs_ of
Left e -> pure (Nothing, Left e)
Right (cData, sq :| _) -> do
mbId <- createSndMsgBody db aMessage
(Just mbId,) <$> storeSentMsg_ cData sq mbId aMessage
let aMessageIds' = maybe id (`IM.insert` (mbId_, aMessage)) i_ aMessageIds
pure (aMessageIds', r)
VRRef i -> case csqs_ of
Left e -> pure $ (aMessageIds, Left e)
Right (cData, sq :| _) -> case IM.lookup i aMessageIds of
Just (Just mbId, aMessage) -> (aMessageIds,) <$> storeSentMsg_ cData sq mbId aMessage
Just (Nothing, aMessage) -> do
mbId <- createSndMsgBody db aMessage
let aMessageIds' = IM.insert i (Just mbId, aMessage) aMessageIds
(aMessageIds',) <$> storeSentMsg_ cData sq mbId aMessage
Nothing -> pure (aMessageIds, Left $ INTERNAL "enqueueMessageB: storeSentMsg missing saved message body id")
where
storeSentMsg_ sndMsgBodyId aMessage = fmap (first storeError) $ runExceptT $ do
storeSentMsg_ cData@ConnData {connId} sq sndMsgBodyId aMessage = fmap (first storeError) $ runExceptT $ do
let AgentConfig {e2eEncryptVRange} = cfg
internalTs <- liftIO getCurrentTime
(internalId, internalSndId, prevMsgHash) <- ExceptT $ updateSndIds db connId
Expand Down
Loading