Skip to content

Commit 0c1ba54

Browse files
committed
metric for time between MSG and ACK
1 parent 4ea1152 commit 0c1ba54

File tree

5 files changed

+76
-40
lines changed

5 files changed

+76
-40
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -687,24 +687,33 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
687687
let threadsCount = 0
688688
#endif
689689
clientsCount <- IM.size <$> getServerClients srv
690-
deliveredSubs <- getDeliveredMetrics
690+
(deliveredSubs, sumTimes, maxTime) <- getDeliveredMetrics =<< getSystemSeconds
691691
smpSubs <- getSubscribersMetrics subscribers
692692
ntfSubs <- getSubscribersMetrics ntfSubscribers
693693
loadedCounts <- loadedQueueCounts $ fromMsgStore ms
694-
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, smpSubs, ntfSubs, loadedCounts}
694+
let avgTime = sumTimes `div` fromIntegral (subsCount deliveredSubs)
695+
deliveredTimes = TimeAggregations {avgTime, maxTime}
696+
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, deliveredSubs, deliveredTimes, smpSubs, ntfSubs, loadedCounts}
695697
where
696698
getSubscribersMetrics ServerSubscribers {queueSubscribers, serviceSubscribers, subClients} = do
697699
subsCount <- M.size <$> getSubscribedClients queueSubscribers
698700
subClientsCount <- IS.size <$> readTVarIO subClients
699701
subServicesCount <- M.size <$> getSubscribedClients serviceSubscribers
700702
pure RTSubscriberMetrics {subsCount, subClientsCount, subServicesCount}
701-
getDeliveredMetrics = foldM countClnt (RTSubscriberMetrics 0 0 0) =<< getServerClients srv
702-
countClnt metrics Client {subscriptions} = do
703-
cnt <- foldM countSubs 0 =<< readTVarIO subscriptions
704-
pure $ if cnt > 0
705-
then metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}
706-
else metrics
707-
countSubs !cnt Sub {delivered} = (\empty -> if empty then cnt else cnt + 1) <$> atomically (isEmptyTMVar delivered)
703+
getDeliveredMetrics (RoundedSystemTime ts') = foldM countClnt (RTSubscriberMetrics 0 0 0, 0, 0) =<< getServerClients srv
704+
where
705+
countClnt acc@(metrics, !sumTimes, !maxTime) Client {subscriptions} = do
706+
(cnt, sumTimes', maxTime') <- foldM countSubs (0, sumTimes, maxTime) =<< readTVarIO subscriptions
707+
pure $ if cnt > 0
708+
then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, sumTimes', maxTime')
709+
else acc
710+
countSubs acc@(!cnt, !sumTimes, !maxTime) Sub {delivered} = do
711+
delivered_ <- readTVarIO delivered
712+
pure $ case delivered_ of
713+
Nothing -> acc
714+
Just (_, RoundedSystemTime ts) ->
715+
let t = ts' - ts
716+
in (cnt + 1, sumTimes + t, max maxTime t)
708717

709718
runClient :: Transport c => X.CertificateChain -> C.APrivateSignKey -> TProxy c 'TServer -> c 'TServer -> M s ()
710719
runClient srvCert srvSignKey tp h = do
@@ -1588,15 +1597,16 @@ client
15881597
pure (err (CMD PROHIBITED), Nothing)
15891598
_ -> do
15901599
incStat $ qSubDuplicate stats
1591-
atomically (tryTakeTMVar $ delivered s) >> deliver False s
1600+
atomically (writeTVar (delivered s) Nothing) >> deliver False s
15921601
where
15931602
deliver :: Bool -> Sub -> M s ResponseAndMessage
15941603
deliver hasSub sub = do
15951604
stats <- asks serverStats
15961605
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
15971606
msg_ <- tryPeekMsg ms q
1598-
msg' <- forM msg_ $ \msg -> do
1599-
void $ atomically $ setDelivered sub msg
1607+
msg' <- forM msg_ $ \msg -> liftIO $ do
1608+
ts <- getSystemSeconds
1609+
atomically $ setDelivered sub msg ts
16001610
unless hasSub $ incStat $ qSub stats
16011611
pure (NoCorrId, entId, MSG (encryptMsg qr msg))
16021612
pure ((corrId, entId, SOK clntServiceId), msg')
@@ -1627,7 +1637,7 @@ client
16271637
Just s@Sub {subThread} ->
16281638
case subThread of
16291639
ProhibitSub ->
1630-
atomically (tryTakeTMVar $ delivered s)
1640+
atomically (swapTVar (delivered s) Nothing)
16311641
>>= getMessage_ s
16321642
-- cannot use GET in the same connection where there is an active subscription
16331643
_ -> do
@@ -1644,15 +1654,16 @@ client
16441654
-- This is tracked as "subscription" in the client to prevent these
16451655
-- clients from being able to subscribe.
16461656
pure s
1647-
getMessage_ :: Sub -> Maybe MsgId -> M s (Transmission BrokerMsg)
1657+
getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg)
16481658
getMessage_ s delivered_ = do
16491659
stats <- asks serverStats
16501660
fmap (either err id) $ liftIO $ runExceptT $
16511661
tryPeekMsg ms q >>= \case
16521662
Just msg -> do
16531663
let encMsg = encryptMsg qr msg
16541664
incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats
1655-
atomically $ setDelivered s msg $> (corrId, entId, MSG encMsg)
1665+
ts <- liftIO getSystemSeconds
1666+
atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg)
16561667
Nothing -> incStat (msgGetNoMsg stats) $> ok
16571668

16581669
withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg)
@@ -1760,16 +1771,18 @@ client
17601771
(deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId
17611772
liftIO $ do
17621773
mapM_ (updateStats stats False) deletedMsg_
1763-
mapM_ (atomically . setDelivered sub) msg_
1774+
forM_ msg_ $ \msg -> do
1775+
ts <- getSystemSeconds
1776+
atomically $ setDelivered sub msg ts
17641777
pure (corrId, entId, maybe OK (MSG . encryptMsg qr) msg_)
17651778
_ -> pure $ err NO_MSG
17661779
where
17671780
getDelivered :: Sub -> STM (Maybe ServerSub)
17681781
getDelivered Sub {delivered, subThread} = do
1769-
tryTakeTMVar delivered $>>= \msgId' ->
1782+
readTVar delivered $>>= \(msgId', _) ->
17701783
if msgId == msgId' || B.null msgId
1771-
then pure $ Just subThread
1772-
else putTMVar delivered msgId' $> Nothing
1784+
then writeTVar delivered Nothing $> Just subThread
1785+
else pure Nothing
17731786
updateStats :: ServerStats -> Bool -> Message -> IO ()
17741787
updateStats stats isGet = \case
17751788
MessageQuota {} -> pure ()
@@ -1855,11 +1868,14 @@ client
18551868
-- the subscribed client var is read outside of STM to avoid transaction cost
18561869
-- in case no client is subscribed.
18571870
getSubscribedClient rId (queueSubscribers subscribers)
1858-
$>>= atomically . deliverToSub
1871+
$>>= deliverToSub
18591872
>>= mapM_ forkDeliver
18601873
where
18611874
rId = recipientId q
1862-
deliverToSub rcv =
1875+
deliverToSub rcv = do
1876+
ts <- getSystemSeconds
1877+
atomically $ deliverToSub_ rcv ts
1878+
deliverToSub_ rcv ts = do
18631879
-- reading client TVar in the same transaction,
18641880
-- so that if subscription ends, it re-evalutates
18651881
-- and delivery is cancelled -
@@ -1870,18 +1886,18 @@ client
18701886
ProhibitSub -> pure Nothing
18711887
ServerSub st -> readTVar st >>= \case
18721888
NoSub ->
1873-
tryReadTMVar delivered >>= \case
1889+
readTVar delivered >>= \case
18741890
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
18751891
Nothing ->
18761892
ifM
18771893
(isFullTBQueue sndQ')
18781894
(writeTVar st SubPending $> Just (rc, s, st))
1879-
(deliver sndQ' s $> Nothing)
1895+
(deliver sndQ' s ts $> Nothing)
18801896
_ -> pure Nothing
1881-
deliver sndQ' s = do
1897+
deliver sndQ' s ts = do
18821898
let encMsg = encryptMsg qr msg
18831899
writeTBQueue sndQ' ([(NoCorrId, rId, MSG encMsg)], [])
1884-
void $ setDelivered s msg
1900+
setDelivered s msg ts
18851901
forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do
18861902
t <- mkWeakThreadId =<< forkIO deliverThread
18871903
atomically $ modifyTVar' st $ \case
@@ -1894,13 +1910,14 @@ client
18941910
-- lookup can be outside of STM transaction,
18951911
-- as long as the check that it is the same client is inside.
18961912
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
1897-
deliverIfSame rcv = atomically $
1898-
whenM (sameClient rc rcv) $
1899-
tryReadTMVar delivered >>= \case
1913+
deliverIfSame rcv = do
1914+
ts <- getSystemSeconds
1915+
atomically $ whenM (sameClient rc rcv) $
1916+
readTVar delivered >>= \case
19001917
Just _ -> pure () -- if a message was already delivered, should not deliver more
19011918
Nothing -> do
19021919
-- a separate thread is needed because it blocks when client sndQ is full.
1903-
deliver sndQ' s
1920+
deliver sndQ' s ts
19041921
writeTVar st NoSub
19051922

19061923
enqueueNotification :: NtfCreds -> Message -> M s ()
@@ -1984,8 +2001,10 @@ client
19842001
msgId' = messageId msg
19852002
msgTs' = messageTs msg
19862003

1987-
setDelivered :: Sub -> Message -> STM Bool
1988-
setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg
2004+
setDelivered :: Sub -> Message -> RoundedSystemTime -> STM ()
2005+
setDelivered Sub {delivered} msg !ts = do
2006+
let !msgId = messageId msg
2007+
writeTVar delivered $ Just (msgId, ts)
19892008

19902009
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
19912010
delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do
@@ -2026,7 +2045,7 @@ client
20262045
SubPending -> QSubPending
20272046
SubThread _ -> QSubThread
20282047
ProhibitSub -> pure QProhibitSub
2029-
qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
2048+
qDelivered <- decodeLatin1 . encode . fst <$$> readTVarIO delivered
20302049
pure QSub {qSubThread, qDelivered}
20312050

20322051
ok :: Transmission BrokerMsg

src/Simplex/Messaging/Server/Env/STM.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
415415

416416
data Sub = Sub
417417
{ subThread :: ServerSub, -- Nothing value indicates that sub
418-
delivered :: TMVar MsgId
418+
delivered :: TVar (Maybe (MsgId, RoundedSystemTime))
419419
}
420420

421421
newServer :: IO (Server s)
@@ -494,13 +494,13 @@ newClient clientId qSize clientTHParams createdAt = do
494494

495495
newSubscription :: SubscriptionThread -> STM Sub
496496
newSubscription st = do
497-
delivered <- newEmptyTMVar
497+
delivered <- newTVar Nothing
498498
subThread <- ServerSub <$> newTVar st
499499
return Sub {subThread, delivered}
500500

501501
newProhibitedSub :: STM Sub
502502
newProhibitedSub = do
503-
delivered <- newEmptyTMVar
503+
delivered <- newTVar Nothing
504504
return Sub {subThread = ProhibitSub, delivered}
505505

506506
newEnv :: ServerConfig s -> IO (Env s)

src/Simplex/Messaging/Server/Prometheus.hs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,17 @@ data RealTimeMetrics = RealTimeMetrics
3535
threadsCount :: Int,
3636
clientsCount :: Int,
3737
deliveredSubs :: RTSubscriberMetrics,
38+
deliveredTimes :: TimeAggregations,
3839
smpSubs :: RTSubscriberMetrics,
3940
ntfSubs :: RTSubscriberMetrics,
4041
loadedCounts :: LoadedQueueCounts
4142
}
4243

44+
data TimeAggregations = TimeAggregations
45+
{ avgTime :: Int64,
46+
maxTime :: Int64
47+
}
48+
4349
data RTSubscriberMetrics = RTSubscriberMetrics
4450
{ subsCount :: Int,
4551
subClientsCount :: Int,
@@ -57,6 +63,7 @@ prometheusMetrics sm rtm ts =
5763
threadsCount,
5864
clientsCount,
5965
deliveredSubs,
66+
deliveredTimes,
6067
smpSubs,
6168
ntfSubs,
6269
loadedCounts
@@ -436,6 +443,14 @@ prometheusMetrics sm rtm ts =
436443
\# TYPE simplex_smp_delivered_clients_total gauge\n\
437444
\simplex_smp_delivered_clients_total " <> mshow (subClientsCount deliveredSubs) <> "\n# delivered.subClientsCount\n\
438445
\\n\
446+
\# HELP simplex_smp_delivery_conf_time_avg Average time to confirm message delivery\n\
447+
\# TYPE simplex_smp_delivery_conf_time_avg gauge\n\
448+
\simplex_smp_delivery_conf_time_avg " <> mshow (avgTime deliveredTimes) <> "\n# delivered.avgTime\n\
449+
\\n\
450+
\# HELP simplex_smp_delivery_conf_time_max Max time to confirm message delivery\n\
451+
\# TYPE simplex_smp_delivery_conf_time_max gauge\n\
452+
\simplex_smp_delivery_conf_time_max " <> mshow (maxTime deliveredTimes) <> "\n# delivered.maxTime\n\
453+
\\n\
439454
\# HELP simplex_smp_subscribtion_total Total SMP subscriptions\n\
440455
\# TYPE simplex_smp_subscribtion_total gauge\n\
441456
\simplex_smp_subscribtion_total " <> mshow (subsCount smpSubs) <> "\n# smp.subsCount\n\

src/Simplex/Messaging/Server/QueueStore.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,6 @@ getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` pr
127127

128128
getSystemDate :: IO RoundedSystemTime
129129
getSystemDate = getRoundedSystemTime 86400
130+
131+
getSystemSeconds :: IO RoundedSystemTime
132+
getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB
4343
import Data.ByteString.Char8 (ByteString)
4444
import qualified Data.ByteString.Lazy as LB
4545
import Data.Bitraversable (bimapM)
46-
import Data.Either (fromRight, lefts, rights)
46+
import Data.Either (fromRight, lefts)
4747
import Data.Functor (($>))
4848
import Data.Int (Int64)
4949
import Data.List (foldl', intersperse, partition)
5050
import Data.List.NonEmpty (NonEmpty)
51-
import qualified Data.List.NonEmpty as L
5251
import qualified Data.Map.Strict as M
53-
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
52+
import Data.Maybe (catMaybes, fromMaybe)
5453
import qualified Data.Set as S
5554
import Data.Text (Text)
5655
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
@@ -64,7 +63,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
6463
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
6564
import Database.PostgreSQL.Simple.SqlQQ (sql)
6665
import GHC.IO (catchAny)
67-
import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap)
66+
import Simplex.Messaging.Agent.Client (withLockMap)
6867
import Simplex.Messaging.Agent.Lock (Lock)
6968
import Simplex.Messaging.Agent.Store.AgentStore ()
7069
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
@@ -83,7 +82,7 @@ import Simplex.Messaging.Server.StoreLog
8382
import Simplex.Messaging.TMap (TMap)
8483
import qualified Simplex.Messaging.TMap as TM
8584
import Simplex.Messaging.Transport (SMPServiceRole (..))
86-
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=))
85+
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
8786
import System.Exit (exitFailure)
8887
import System.IO (IOMode (..), hFlush, stdout)
8988
import UnliftIO.STM

0 commit comments

Comments
 (0)