diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index a6febcf..688b346 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -41,7 +41,7 @@ jobs: python -m pip install --editable .[test] - name: pytest - run: timeout --signal=TERM 60 python -m pytest --cov=fowl -s -v integration/ + run: timeout --kill-after 65 --signal=TERM 60 python -m pytest --cov=fowl -s -v integration/ - name: Coverage graph shell: bash diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml index 2221d43..832d26d 100644 --- a/.github/workflows/unit.yml +++ b/.github/workflows/unit.yml @@ -44,7 +44,7 @@ jobs: run: python -m pytest --version - name: pytest - run: timeout --signal=TERM 60 python -m pytest --assert=plain --disable-warnings --cov=fowl -s -v src/fowl/test/ + run: timeout --kill-after 65 --signal=TERM 60 python -m pytest --assert=plain --disable-warnings --cov=fowl -s -v src/fowl/test/ - name: Coverage graph shell: bash diff --git a/Makefile b/Makefile index 55e916f..f8b8daf 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ pin: git commit -m "upgrade pins" utest: - python -m pytest --cov= --cov-report= -sv -x src/fowl/test + python -m coverage run -m pytest -s -v src/fowl/test/ cuv graph test: diff --git a/src/fowl/_proto.py b/src/fowl/_proto.py index 3177763..20d2c19 100644 --- a/src/fowl/_proto.py +++ b/src/fowl/_proto.py @@ -18,7 +18,7 @@ import msgpack import automat from twisted.internet import reactor -from twisted.internet.defer import Deferred, ensureDeferred, DeferredList, race, CancelledError +from twisted.internet.defer import Deferred, ensureDeferred, DeferredList, race, CancelledError, TimeoutError from twisted.internet.task import deferLater from twisted.internet.protocol import Factory, Protocol from twisted.internet.error import ConnectionDone @@ -183,7 +183,7 @@ async def frontend_accept_or_invite(reactor, config): status_tracker = _StatusTracker() - fowl_wh = await create_fowl(config, status_tracker) + fowl_wh = await create_fowl(config, status_tracker, True) fowl_wh.start() # testing a TUI style output UI, maybe optional? @@ -1070,12 +1070,13 @@ class FowlWormhole: Co-ordinates between the wormhole, user I/O and the daemon state-machine. """ - def __init__(self, reactor, wormhole, coop): + def __init__(self, reactor, wormhole, coop, interactive): self._reactor = reactor self._wormhole = wormhole self._done = When() # we have shut down completely self._connected = When() # our Peer has connected self._got_welcome = When() # we received the Welcome from the server + self._interactive = interactive self._we_sent_closing = False self._did_disconnect = False @@ -1187,7 +1188,7 @@ def was_closed(why): # public API methods # XXX moved from elsewhere, unify with close_wormhole() - async def disconnect_session(self, interactive=True): + async def disconnect_session(self): """ Nicely disconnect the session, by communicating with our peer. @@ -1215,8 +1216,9 @@ async def disconnect_session(self, interactive=True): this is received, the wormhole may be closed and the program exits. """ - # FIXME: all these messages should probably be via 'status' or - # similar, in case we're using all this as a library. + # FIXME: all these stdout / print messages should be via + # 'status' or similar, in case we're using all this as a + # library. if self._did_disconnect: return @@ -1235,6 +1237,13 @@ async def disconnect_session(self, interactive=True): # codepath anyway _ = ensureDeferred(self._close_active_connections()) + # sometimes (in tests) the mailbox server goes away before a + # wormhole tries to shut down .. which means the message will + # never be delivered, so we can't wait forever (for + # non-interactive) for the peer message + async def wait_non_interactive(): + await deferLater(reactor, 1.2, lambda: None) + async def wait_for_user(): def user_got_bored_waiting(*args): @@ -1265,21 +1274,25 @@ def user_got_bored_waiting(*args): if delay > 10.0: delay = 10.0 delta = humanize.naturaldelta(reactor.seconds() - start) - if interactive: + if self._interactive: print(f'Waited {delta} for "closing" message from peer') + if self._interactive: + wait_how_long = ensureDeferred(wait_for_user()) + else: + wait_how_long = ensureDeferred(wait_non_interactive()) which, result = await race([ self._got_closing_from_peer_d, - ensureDeferred(wait_for_user()), + wait_how_long, ]) if which == 0: # XXX result can be None here if we never hit 'ready' # notification, needs proper test .. if result is not None and result >= 0: - if interactive: + if self._interactive: print(f"Clean close; peer saw phase={result}") else: - if interactive: + if self._interactive: print("Never got closing message from peer") try: @@ -1287,17 +1300,27 @@ def user_got_bored_waiting(*args): except wormhole_errors.LonelyError: # maybe just say nothing? why does the user care about # this? (they probably hit ctrl-c anyway, how else can you get here?) - if interactive: + if self._interactive: print("Wormhole closed without peer.") - if interactive: + if self._interactive: print("Done.") - async def close_wormhole(self): + async def close_wormhole(self, timeout=1.0): """ Shut down the wormhole """ # XXX see also the whole "how to shutdown half-close etc" - await self._wormhole.close() + d = self._wormhole.close() + # this is (somehow?) timing out (in test, when mailbox is shut + # down but connection(s) aren't yet) ... so we timeout pretty + # fast since local close() TCP socket should be quick?) + if timeout > 0.0: + d.addTimeout(timeout, self._reactor) + try: + await d + except TimeoutError: + pass + # once the wormhole "actually" closes, the state-machine will # trigger our "stop" codepath @@ -1362,13 +1385,15 @@ async def _(msg): # if we are not connected to a peer, we can just close and exit if not self._peer_connected: await self.close_wormhole() - self._reactor.stop() + # note: don't call reactor.stop() here, that's rude + # .. if anything depends on that exit style we need + # to fix it differently return # we do have a peer -- send them a close, but also honour # the timeout our controller asked for (default: 10s) timeout = deferLater(self._reactor, msg.timeout) - disconn = ensureDeferred(self.disconnect_session(interactive=False)) + disconn = ensureDeferred(self.disconnect_session()) idx, _ = await race((timeout, disconn)) if idx == 0: # it was the timeout, set exit-code to non-zero. this @@ -1458,7 +1483,7 @@ def maybe_int(i): # - sans-io style (send "messages" in / out of _forward_loop or so) # # Would like the second; so we can interact in unit-tests (or here) -# via parsed commands. e.g. we have an AGT union-type, and every +# via parsed commands. e.g. we have an ADT union-type, and every # input-message is a class def fowld_command_to_json(msg: FowlCommandMessage) -> dict: """ @@ -1635,7 +1660,7 @@ def parse(js): "awaiting-connect": parser(AwaitingConnect, [("name", None), ("local_port", int)]), "remote-connect-failed": parser(RemoteConnectFailed, [("id", int), ("reason", None)]), "outgoing-connection": parser(OutgoingConnection, [("id", int), ("service_name", None)]), - "outgoing-done": parser(OutgoingDone, [("service_name", str)]), + "outgoing-done": parser(OutgoingDone, [("id", int), ("service_name", str)]), "incoming-connection": parser(IncomingConnection, [("id", int), ("service_name", None)]), "incoming-lost": parser(IncomingLost, [("id", int), ("reason", None)]), "incoming-done": parser(IncomingDone, [("id", int)]), @@ -1648,7 +1673,7 @@ def parse(js): return kind_to_message[kind](cmd), cmd.get("timestamp", None) -async def create_fowl(config, fowl_status_tracker): +async def create_fowl(config, fowl_status_tracker, interactive): # can we make this "a status listener" instead? start_time = reactor.seconds() @@ -1717,7 +1742,7 @@ def command_message(msg): # @sm.set_trace # def _(start, edge, end): # print(f"trace: {start} --[ {edge} ]--> {end}") - fowl = FowlWormhole(reactor, w, coop) + fowl = FowlWormhole(reactor, w, coop, interactive) return fowl @@ -1744,7 +1769,7 @@ def output_fowl_message(msg): status_tracker = _StatusTracker() status_tracker.add_listener(output_fowl_message) - fowl = await create_fowl(config, status_tracker) + fowl = await create_fowl(config, status_tracker, False) fowl.start() # arrange to read incoming commands from stdin @@ -1752,6 +1777,10 @@ def output_fowl_message(msg): dispatch = LocalCommandDispatch(config, fowl) create_stdio(dispatch) + async def shutdown(): + await fowl.disconnect_session() + reactor.addSystemEventTrigger("before", "shutdown", lambda: ensureDeferred(shutdown())) + try: await fowl.when_done() except CancelledError: diff --git a/src/fowl/messages.py b/src/fowl/messages.py index 997d8a9..fedf5eb 100644 --- a/src/fowl/messages.py +++ b/src/fowl/messages.py @@ -162,7 +162,7 @@ class OutgoingConnection(FowlOutputMessage): an outgoing subchannel to the other peer). """ service_name: str - id: str + id: int @frozen @@ -180,6 +180,7 @@ class OutgoingDone(FowlOutputMessage): We have lost one of our connections """ service_name: str + id: int @frozen @@ -188,7 +189,7 @@ class IncomingConnection(FowlOutputMessage): The other side is requesting we open a connection """ service_name: str - id: str + id: int @frozen diff --git a/src/fowl/status.py b/src/fowl/status.py index 4f065cc..6f46bdd 100644 --- a/src/fowl/status.py +++ b/src/fowl/status.py @@ -219,7 +219,12 @@ def outgoing_done(self, channel_id): # we get both an "outgoing_lost()" and then an "outgoing_done()"... self._current_status.subchannels[channel_id].done_at = self._time_provider() self._notify_listeners() - self._emit(OutgoingDone(channel_id)) + self._emit( + OutgoingDone( + self._current_status.subchannels[channel_id].service_name, + channel_id, + ) + ) # todo: include a "summary" in OutgoingDone? def outgoing_lost(self, channel_id, reason): diff --git a/src/fowl/test/test_iperf3.py b/src/fowl/test/test_iperf3.py index 5e32752..2440d4f 100644 --- a/src/fowl/test/test_iperf3.py +++ b/src/fowl/test/test_iperf3.py @@ -12,6 +12,7 @@ from fowl.observer import When, Framer from fowl._proto import parse_fowld_output, fowld_command_to_json +from fowl.tcp import allocate_tcp_port from fowl import messages from .util import _MagicTextProtocol, _cleanup_service_process @@ -48,9 +49,19 @@ def processEnded(self, reason): self._done.trigger(self._reactor, reason) +@pytest.fixture() +def server_port(): + return allocate_tcp_port() + + +@pytest.fixture() +def client_port(): + return allocate_tcp_port() + + @pytest_twisted.async_yield_fixture() -async def iperf3_server(reactor, request): - args = ["iperf3", "-V", "-4", "-s", "localhost", "-p", "54321"] +async def iperf3_server(reactor, request, server_port): + args = ["iperf3", "-V", "-4", "-s", "localhost", "-p", str(server_port)] logs = [] protocol = _MagicTextProtocol("listening on", logs.append) exe = shutil.which("iperf3") @@ -68,8 +79,8 @@ async def iperf3_server(reactor, request): yield protocol -async def iperf3_client(reactor, request): - args = ["iperf3", "-4", "-c", "localhost", "-p", "12345", "-n", "1G"] +async def iperf3_client(reactor, request, client_port): + args = ["iperf3", "-4", "-c", "localhost", "-p", str(client_port), "-n", "1G"] logs = [] protocol = _MagicTextProtocol("iperf Done", logs.append) exe = shutil.which("iperf3") @@ -91,7 +102,7 @@ async def iperf3_client(reactor, request): @pytest_twisted.ensureDeferred() -async def test_performance(reactor, request, mailbox, iperf3_server): +async def test_performance(reactor, request, mailbox, iperf3_server, client_port, server_port): """ Start up an iperf3 test as per: https://github.com/magic-wormhole/fowl/issues/34 @@ -137,8 +148,8 @@ async def test_performance(reactor, request, mailbox, iperf3_server): m = await accept_proto.next_message() print("A: code", m.code) - invite_proto.write_message(messages.LocalListener("iperf", 12345)) - accept_proto.write_message(messages.RemoteListener("iperf", local_connect_port=54321)) + invite_proto.write_message(messages.LocalListener("iperf", client_port)) + accept_proto.write_message(messages.RemoteListener("iperf", local_connect_port=server_port )) m = await invite_proto.next_message() print("I: peer", m.verifier) @@ -149,7 +160,7 @@ async def test_performance(reactor, request, mailbox, iperf3_server): print("start iperf3 client") start = reactor.seconds() - _logs = await iperf3_client(reactor, request) + _logs = await iperf3_client(reactor, request, client_port) elapsed = reactor.seconds() - start print("elapsed", elapsed) bps = 1*1024*1024*1024 / elapsed