Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
python -m pip install --editable .[test]

- name: pytest
run: timeout --signal=TERM 60 python -m pytest --cov=fowl -s -v integration/
run: timeout --kill-after 65 --signal=TERM 60 python -m pytest --cov=fowl -s -v integration/

- name: Coverage graph
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
run: python -m pytest --version

- name: pytest
run: timeout --signal=TERM 60 python -m pytest --assert=plain --disable-warnings --cov=fowl -s -v src/fowl/test/
run: timeout --kill-after 65 --signal=TERM 60 python -m pytest --assert=plain --disable-warnings --cov=fowl -s -v src/fowl/test/

- name: Coverage graph
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pin:
git commit -m "upgrade pins"

utest:
python -m pytest --cov= --cov-report= -sv -x src/fowl/test
python -m coverage run -m pytest -s -v src/fowl/test/
cuv graph

test:
Expand Down
71 changes: 50 additions & 21 deletions src/fowl/_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import msgpack
import automat
from twisted.internet import reactor
from twisted.internet.defer import Deferred, ensureDeferred, DeferredList, race, CancelledError
from twisted.internet.defer import Deferred, ensureDeferred, DeferredList, race, CancelledError, TimeoutError
from twisted.internet.task import deferLater
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.error import ConnectionDone
Expand Down Expand Up @@ -183,7 +183,7 @@ async def frontend_accept_or_invite(reactor, config):

status_tracker = _StatusTracker()

fowl_wh = await create_fowl(config, status_tracker)
fowl_wh = await create_fowl(config, status_tracker, True)
fowl_wh.start()

# testing a TUI style output UI, maybe optional?
Expand Down Expand Up @@ -1070,12 +1070,13 @@ class FowlWormhole:
Co-ordinates between the wormhole, user I/O and the daemon state-machine.
"""

def __init__(self, reactor, wormhole, coop):
def __init__(self, reactor, wormhole, coop, interactive):
self._reactor = reactor
self._wormhole = wormhole
self._done = When() # we have shut down completely
self._connected = When() # our Peer has connected
self._got_welcome = When() # we received the Welcome from the server
self._interactive = interactive

self._we_sent_closing = False
self._did_disconnect = False
Expand Down Expand Up @@ -1187,7 +1188,7 @@ def was_closed(why):
# public API methods

# XXX moved from elsewhere, unify with close_wormhole()
async def disconnect_session(self, interactive=True):
async def disconnect_session(self):
"""
Nicely disconnect the session, by communicating with our peer.

Expand Down Expand Up @@ -1215,8 +1216,9 @@ async def disconnect_session(self, interactive=True):
this is received, the wormhole may be closed and the program
exits.
"""
# FIXME: all these messages should probably be via 'status' or
# similar, in case we're using all this as a library.
# FIXME: all these stdout / print messages should be via
# 'status' or similar, in case we're using all this as a
# library.

if self._did_disconnect:
return
Expand All @@ -1235,6 +1237,13 @@ async def disconnect_session(self, interactive=True):
# codepath anyway
_ = ensureDeferred(self._close_active_connections())

# sometimes (in tests) the mailbox server goes away before a
# wormhole tries to shut down .. which means the message will
# never be delivered, so we can't wait forever (for
# non-interactive) for the peer message
async def wait_non_interactive():
await deferLater(reactor, 1.2, lambda: None)

async def wait_for_user():

def user_got_bored_waiting(*args):
Expand Down Expand Up @@ -1265,39 +1274,53 @@ def user_got_bored_waiting(*args):
if delay > 10.0:
delay = 10.0
delta = humanize.naturaldelta(reactor.seconds() - start)
if interactive:
if self._interactive:
print(f'Waited {delta} for "closing" message from peer')

if self._interactive:
wait_how_long = ensureDeferred(wait_for_user())
else:
wait_how_long = ensureDeferred(wait_non_interactive())
which, result = await race([
self._got_closing_from_peer_d,
ensureDeferred(wait_for_user()),
wait_how_long,
])
if which == 0:
# XXX result can be None here if we never hit 'ready'
# notification, needs proper test ..
if result is not None and result >= 0:
if interactive:
if self._interactive:
print(f"Clean close; peer saw phase={result}")
else:
if interactive:
if self._interactive:
print("Never got closing message from peer")

try:
await self.close_wormhole()
except wormhole_errors.LonelyError:
# maybe just say nothing? why does the user care about
# this? (they probably hit ctrl-c anyway, how else can you get here?)
if interactive:
if self._interactive:
print("Wormhole closed without peer.")
if interactive:
if self._interactive:
print("Done.")

async def close_wormhole(self):
async def close_wormhole(self, timeout=1.0):
"""
Shut down the wormhole
"""
# XXX see also the whole "how to shutdown half-close etc"
await self._wormhole.close()
d = self._wormhole.close()
# this is (somehow?) timing out (in test, when mailbox is shut
# down but connection(s) aren't yet) ... so we timeout pretty
# fast since local close() TCP socket should be quick?)
if timeout > 0.0:
d.addTimeout(timeout, self._reactor)
try:
await d
except TimeoutError:
pass

# once the wormhole "actually" closes, the state-machine will
# trigger our "stop" codepath

Expand Down Expand Up @@ -1362,13 +1385,15 @@ async def _(msg):
# if we are not connected to a peer, we can just close and exit
if not self._peer_connected:
await self.close_wormhole()
self._reactor.stop()
# note: don't call reactor.stop() here, that's rude
# .. if anything depends on that exit style we need
# to fix it differently
return

# we do have a peer -- send them a close, but also honour
# the timeout our controller asked for (default: 10s)
timeout = deferLater(self._reactor, msg.timeout)
disconn = ensureDeferred(self.disconnect_session(interactive=False))
disconn = ensureDeferred(self.disconnect_session())
idx, _ = await race((timeout, disconn))
if idx == 0:
# it was the timeout, set exit-code to non-zero. this
Expand Down Expand Up @@ -1458,7 +1483,7 @@ def maybe_int(i):
# - sans-io style (send "messages" in / out of _forward_loop or so)
#
# Would like the second; so we can interact in unit-tests (or here)
# via parsed commands. e.g. we have an AGT union-type, and every
# via parsed commands. e.g. we have an ADT union-type, and every
# input-message is a class
def fowld_command_to_json(msg: FowlCommandMessage) -> dict:
"""
Expand Down Expand Up @@ -1635,7 +1660,7 @@ def parse(js):
"awaiting-connect": parser(AwaitingConnect, [("name", None), ("local_port", int)]),
"remote-connect-failed": parser(RemoteConnectFailed, [("id", int), ("reason", None)]),
"outgoing-connection": parser(OutgoingConnection, [("id", int), ("service_name", None)]),
"outgoing-done": parser(OutgoingDone, [("service_name", str)]),
"outgoing-done": parser(OutgoingDone, [("id", int), ("service_name", str)]),
"incoming-connection": parser(IncomingConnection, [("id", int), ("service_name", None)]),
"incoming-lost": parser(IncomingLost, [("id", int), ("reason", None)]),
"incoming-done": parser(IncomingDone, [("id", int)]),
Expand All @@ -1648,7 +1673,7 @@ def parse(js):
return kind_to_message[kind](cmd), cmd.get("timestamp", None)


async def create_fowl(config, fowl_status_tracker):
async def create_fowl(config, fowl_status_tracker, interactive):

# can we make this "a status listener" instead?
start_time = reactor.seconds()
Expand Down Expand Up @@ -1717,7 +1742,7 @@ def command_message(msg):
# @sm.set_trace
# def _(start, edge, end):
# print(f"trace: {start} --[ {edge} ]--> {end}")
fowl = FowlWormhole(reactor, w, coop)
fowl = FowlWormhole(reactor, w, coop, interactive)
return fowl


Expand All @@ -1744,14 +1769,18 @@ def output_fowl_message(msg):
status_tracker = _StatusTracker()
status_tracker.add_listener(output_fowl_message)

fowl = await create_fowl(config, status_tracker)
fowl = await create_fowl(config, status_tracker, False)
fowl.start()

# arrange to read incoming commands from stdin
create_stdio = config.create_stdio or StandardIO
dispatch = LocalCommandDispatch(config, fowl)
create_stdio(dispatch)

async def shutdown():
await fowl.disconnect_session()
reactor.addSystemEventTrigger("before", "shutdown", lambda: ensureDeferred(shutdown()))

try:
await fowl.when_done()
except CancelledError:
Expand Down
5 changes: 3 additions & 2 deletions src/fowl/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class OutgoingConnection(FowlOutputMessage):
an outgoing subchannel to the other peer).
"""
service_name: str
id: str
id: int


@frozen
Expand All @@ -180,6 +180,7 @@ class OutgoingDone(FowlOutputMessage):
We have lost one of our connections
"""
service_name: str
id: int


@frozen
Expand All @@ -188,7 +189,7 @@ class IncomingConnection(FowlOutputMessage):
The other side is requesting we open a connection
"""
service_name: str
id: str
id: int


@frozen
Expand Down
7 changes: 6 additions & 1 deletion src/fowl/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,12 @@ def outgoing_done(self, channel_id):
# we get both an "outgoing_lost()" and then an "outgoing_done()"...
self._current_status.subchannels[channel_id].done_at = self._time_provider()
self._notify_listeners()
self._emit(OutgoingDone(channel_id))
self._emit(
OutgoingDone(
self._current_status.subchannels[channel_id].service_name,
channel_id,
)
)
# todo: include a "summary" in OutgoingDone?

def outgoing_lost(self, channel_id, reason):
Expand Down
27 changes: 19 additions & 8 deletions src/fowl/test/test_iperf3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from fowl.observer import When, Framer
from fowl._proto import parse_fowld_output, fowld_command_to_json
from fowl.tcp import allocate_tcp_port
from fowl import messages
from .util import _MagicTextProtocol, _cleanup_service_process

Expand Down Expand Up @@ -48,9 +49,19 @@ def processEnded(self, reason):
self._done.trigger(self._reactor, reason)


@pytest.fixture()
def server_port():
return allocate_tcp_port()


@pytest.fixture()
def client_port():
return allocate_tcp_port()


@pytest_twisted.async_yield_fixture()
async def iperf3_server(reactor, request):
args = ["iperf3", "-V", "-4", "-s", "localhost", "-p", "54321"]
async def iperf3_server(reactor, request, server_port):
args = ["iperf3", "-V", "-4", "-s", "localhost", "-p", str(server_port)]
logs = []
protocol = _MagicTextProtocol("listening on", logs.append)
exe = shutil.which("iperf3")
Expand All @@ -68,8 +79,8 @@ async def iperf3_server(reactor, request):
yield protocol


async def iperf3_client(reactor, request):
args = ["iperf3", "-4", "-c", "localhost", "-p", "12345", "-n", "1G"]
async def iperf3_client(reactor, request, client_port):
args = ["iperf3", "-4", "-c", "localhost", "-p", str(client_port), "-n", "1G"]
logs = []
protocol = _MagicTextProtocol("iperf Done", logs.append)
exe = shutil.which("iperf3")
Expand All @@ -91,7 +102,7 @@ async def iperf3_client(reactor, request):


@pytest_twisted.ensureDeferred()
async def test_performance(reactor, request, mailbox, iperf3_server):
async def test_performance(reactor, request, mailbox, iperf3_server, client_port, server_port):
"""
Start up an iperf3 test as per:
https://github.com/magic-wormhole/fowl/issues/34
Expand Down Expand Up @@ -137,8 +148,8 @@ async def test_performance(reactor, request, mailbox, iperf3_server):
m = await accept_proto.next_message()
print("A: code", m.code)

invite_proto.write_message(messages.LocalListener("iperf", 12345))
accept_proto.write_message(messages.RemoteListener("iperf", local_connect_port=54321))
invite_proto.write_message(messages.LocalListener("iperf", client_port))
accept_proto.write_message(messages.RemoteListener("iperf", local_connect_port=server_port ))

m = await invite_proto.next_message()
print("I: peer", m.verifier)
Expand All @@ -149,7 +160,7 @@ async def test_performance(reactor, request, mailbox, iperf3_server):

print("start iperf3 client")
start = reactor.seconds()
_logs = await iperf3_client(reactor, request)
_logs = await iperf3_client(reactor, request, client_port)
elapsed = reactor.seconds() - start
print("elapsed", elapsed)
bps = 1*1024*1024*1024 / elapsed
Expand Down
Loading