Skip to content

Prettify loot-network [WIP] #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 58 additions & 40 deletions code/network/lib/Loot/Network/ZMQ/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ instance Default ZTServSettings where
-- Internal communication
----------------------------------------------------------------------------

-- | Internal requests which are arising during communication with server broker.
data InternalRequest
= IRRegister ListenerId (Set MsgType) ZTListenerEnv
| IRHeartBeat
Expand Down Expand Up @@ -149,13 +150,22 @@ termNetServEnv ZTNetServEnv{..} = liftIO $ do
adapterRelease ztServPubAdapter
Z.close ztServPub

data ServBrokerStmRes
-- | Server events which broker handles
data ServBrokerEvent
= SBListener ListenerId ZTServSendMsg
| SBFront
-- ^ Message from ROUTER socket
| SBRequest InternalRequest
-- ^ Internal request (or event)
deriving (Show)


-- | Broker opens one ROUTER socket to handle incoming
-- requests to server and one PUB socket to publish data.
-- Broker listens to three types of events:
-- * internal request to broker, either to register new listener or to send heartbeat
-- * external message from frontend ROUTER socket
-- * request to send a message to external network,
-- either a reply to some client or publishing new data
runBroker :: (MonadReader r m, HasLens' r ZTNetServEnv, MonadIO m, MonadMask m) => m ()
runBroker = do
ZTNetServEnv{..} <- view $ lensOf @ZTNetServEnv
Expand All @@ -166,7 +176,7 @@ runBroker = do
NE.fromList $ [unSubscription k,unZTInternalId ztOurNodeId] ++ v
--ztLog ztServLogging Debug "Published"

let processReq (IRRegister listenerId msgTypes lEnv) = do
let processInternalReq (IRRegister listenerId msgTypes lEnv) = do
res <- atomically $ runExceptT $ do
listenerRegistered <- Map.member listenerId <$> lift (readTVar ztListeners)
when listenerRegistered $ throwError "listener is already registered"
Expand All @@ -182,65 +192,73 @@ runBroker = do
whenLeft res $ \e -> error $ "Server IRRegister: " <> e
ztLog ztServLogging Debug $ "Registered listener " <> show listenerId

processReq IRHeartBeat = publish heartbeatSubscription []
processInternalReq IRHeartBeat = publish heartbeatSubscription []

let processMsg = \case
-- Send either a reply message via ROUTER socket or publish message via PUB socket
let processOutgoingMsg = \case
Reply cId msgT msg ->
Z.sendMulti ztServFront $
NE.fromList $ [unZtCliId cId,"",unMsgType msgT] ++ msg
Publish k v -> publish k v

let frontToListener = \case
-- Handle messages arrived to ROUTER socket
let processIncomingMsg = \case
-- Request to get server's internal id
[cId,"",t] | t == tag_getId -> do
Z.sendMulti ztServFront $
NE.fromList [cId,"",unZTInternalId ztOurNodeId]
NE.fromList [cId,"",unZTInternalId ztOurNodeId]
ztLog ztServLogging Debug "Received request connection, replied with our id"
-- Message which is dispatched to corresponding listener
(cId:"":t:msgT:msg) | t == tag_normal -> do
ztEnv <- atomically $ runMaybeT $ do
lId <- MaybeT $ Map.lookup (MsgType msgT) <$> readTVar ztMsgTypes
MaybeT $ Map.lookup lId <$> readTVar ztListeners
case ztEnv of
Nothing -> ztLog ztServLogging Warning "frontToListener: can't resolve msgT"
Just biQ ->
atomically $ TQ.writeTQueue (bReceiveQ biQ)
(ZTCliId cId, MsgType msgT, msg)
_ -> ztLog ztServLogging Warning "frontToListener: wrong format"
Nothing -> ztLog ztServLogging Warning "processIncomingMsg: can't resolve msgT"
Just biQ ->
atomically $ TQ.writeTQueue (bReceiveQ biQ) (ZTCliId cId, MsgType msgT, msg)
-- Unknown message type
_ -> ztLog ztServLogging Warning "processIncomingMsg: wrong message format"

-- Heartbeat worker
let hbWorker = forever $ do
threadDelay $
(fromIntegral $ zsHeartbeatsInterval ztServSettings) * 1000
atomically $
TQ.writeTQueue (unServRequestQueue ztServRequestQueue) IRHeartBeat

liftIO $ A.withAsync hbWorker $ const $ do
let action = liftIO $ do
results <- atomically $ do
lMap <- readTVar ztListeners
let readReq =
fmap SBRequest <$>
TQ.tryReadTQueue (unServRequestQueue ztServRequestQueue)
let readListeners :: [STM (Maybe ServBrokerStmRes)]
readListeners =
map (\(listId,biq) ->
fmap (\content -> SBListener listId content) <$>
TQ.tryReadTQueue (bSendQ biq))
(Map.toList lMap)
atLeastOne $ NE.fromList $
[ readReq
, (bool Nothing (Just SBFront)) <$> adapterTry ztServFrontAdapter ]
++ readListeners
forM_ results $ \case
SBRequest r -> processReq r
SBListener _lId msg -> processMsg msg
SBFront -> whileM (canReceive ztServFront) $
Z.receiveMulti ztServFront >>= frontToListener

forever $
(forever action)
`catchAny`
(\e -> do ztLog ztServLogging Error $
"Server broker exited, restarting in 2s: " <> show e
threadDelay 2000000)
let action = liftIO $ do
events <- atomically $ do
-- Fetch internal requests
lMap <- readTVar ztListeners
let readReq =
fmap SBRequest <$>
TQ.tryReadTQueue (unServRequestQueue ztServRequestQueue)
-- Determine listeners which are going to send something
let readListeners :: [STM (Maybe ServBrokerEvent)]
readListeners =
map (\(listId,biq) ->
fmap (\content -> SBListener listId content) <$>
TQ.tryReadTQueue (bSendQ biq))
(Map.toList lMap)
-- Poll frontend socket
let readFront :: STM (Maybe ServBrokerEvent)
readFront = bool Nothing (Just SBFront) <$> adapterTry ztServFrontAdapter
atLeastOne $ NE.fromList $ readReq : readFront : readListeners

forM_ events $ \case
SBRequest r -> processInternalReq r
SBListener _lId msg -> processOutgoingMsg msg
SBFront -> whileM (canReceive ztServFront) $
Z.receiveMulti ztServFront >>= processIncomingMsg

forever $
(forever action)
`catchAny`
(\e -> do ztLog ztServLogging Error $
"Server broker exited, restarting in 2s: " <> show e
threadDelay 2000000)

registerListener ::
(MonadReader r m, HasLens' r ZTNetServEnv, MonadIO m)
Expand Down
6 changes: 1 addition & 5 deletions snapshot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ packages:
- vinyl-0.8.1.1

- git: https://github.com/int-index/caps
commit: 2f46fc6d5480bdef0a17f64359ad6eb29510dba4

# for caps
- git: https://github.com/mokus0/dependent-sum
commit: f8909cb323b4ffa63af6c4e7cb3a9745e6199080
commit: 6938f8dcefa1aa8aa7d7f272eb0828a3da463671

# for serialise
- cborg-0.2.0.0
Expand Down