Skip to content

Commit b283301

Browse files
author
Guillermo Alonso
committed
Function to guarantee PUB/SUB channel in SYNC_PUB
This function will guarantee the PUB/SUB channel is already up and running after returning, ready for sending data right away.
1 parent f04c9af commit b283301

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

osbrain/agent.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,9 @@ def _handle_async_requests(self, data):
912912
else:
913913
handler(self, response)
914914

915+
def ugly(self, alias, handlers):
916+
self._subscribe(alias, handlers)
917+
915918
def _subscribe(self, alias: str, handlers: Dict[Union[bytes, str], Any]):
916919
"""
917920
Subscribe the agent to another agent.
@@ -965,6 +968,9 @@ def set_attr(self, **kwargs):
965968
def get_attr(self, name):
966969
return getattr(self, name)
967970

971+
def del_attr(self, name):
972+
delattr(self, name)
973+
968974
def set_method(self, *args, **kwargs):
969975
"""
970976
Set object methods.

osbrain/helper.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,46 @@ def wait_agent_attr(agent, name='received', length=None, data=None, value=None,
192192
break
193193
time.sleep(0.01)
194194
return False
195+
196+
197+
def synchronize_sync_pub(server, server_alias, server_handler, client,
198+
client_alias, client_handler):
199+
'''
200+
Create a SYNC_PUB/SYNC_SUB channel and connect both agents.
201+
202+
Make sure they have stablished the PUB/SUB communication within the
203+
SYNC_PUB/SYNC_SUB channel before returning. This will guarantee that
204+
no PUB messages are lost.
205+
'''
206+
def assert_receive(agent, message, topic=None):
207+
try:
208+
agent.get_attr('_tmp_attr')
209+
agent._tmp_attr = True
210+
except AttributeError: # Attribute already deleted
211+
pass
212+
213+
addr = server.bind('SYNC_PUB', alias=server_alias, handler=server_handler)
214+
215+
client.set_attr(_tmp_attr=False)
216+
217+
client.connect(addr, alias=client_alias, handler=assert_receive)
218+
219+
# Send messages through the PUB socket until the client receives them
220+
server.each(0.1, 'send', addr, 'Synchronize', alias='_tmp_timer')
221+
222+
assert wait_agent_attr(client, name='_tmp_attr', value=True, timeout=5)
223+
224+
server.stop_timer('_tmp_timer')
225+
226+
# The following is an ugly hack to get the uuid used as the alias for the
227+
# SUB socket of the client in the SYNC_PUB channel.
228+
channel = client.addr(client_alias)
229+
client_addr = channel.twin().sender.twin()
230+
addr_to_access_uuid = client.addr(client_addr)
231+
client_async_req_uuid = client.get_attr('_async_req_uuid')
232+
uuid = client_async_req_uuid[addr_to_access_uuid]
233+
234+
# Set the handler passed as a parameter, now that connection is guaranteed
235+
client.ugly(uuid, client_handler)
236+
237+
client.del_attr('_tmp_attr')

osbrain/tests/test_helper.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,45 @@
99
from osbrain.helper import agent_dies
1010
from osbrain.helper import attribute_match_all
1111
from osbrain.helper import wait_agent_attr
12+
from osbrain.helper import synchronize_sync_pub
13+
from osbrain.address import AgentChannel
1214

1315
from common import nsproxy # pragma: no flakes
1416
from common import agent_logger # pragma: no flakes
1517

1618

19+
def receive(agent, response):
20+
agent.received.append(response)
21+
22+
23+
def test_synchronize_sync_pub(nsproxy):
24+
"""
25+
All publications in SYNC_PUB/SYNC_SUB connections stablished through
26+
`synchronize_sync_pub` should be received in the other end.
27+
"""
28+
server = run_agent('server')
29+
client = run_agent('client')
30+
31+
client.set_attr(received=[])
32+
33+
# Create a SYNC_PUB channel and guarantee the PUB/SUB is stablished
34+
synchronize_sync_pub(server, 'sync_pub', receive,
35+
client, 'sync_sub', receive)
36+
37+
# Send the message only once
38+
server.send('sync_pub', 'Hello')
39+
40+
assert wait_agent_attr(client, name='received', data='Hello')
41+
42+
# Check that no temporary attributes remain
43+
assert 'Synchronize' not in client.get_attr('received')
44+
45+
with pytest.raises(AttributeError):
46+
client.get_attr('_tmp_attr')
47+
48+
assert '_tmp_timer' not in server.list_timers()
49+
50+
1751
def test_agent_dies(nsproxy):
1852
"""
1953
The function `agent_dies` should return `False` if the agent does not die

0 commit comments

Comments
 (0)