Skip to content

bpo-41818: Updated tests for the standard pty library #22962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 25, 2020
122 changes: 111 additions & 11 deletions Lib/test/test_pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,22 @@
import io # readline
import unittest

import struct
import tty
import fcntl
import platform
import warnings

TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = b"For my pet fish, Eric.\n"

try:
_TIOCGWINSZ = tty.TIOCGWINSZ
_TIOCSWINSZ = tty.TIOCSWINSZ
_HAVE_WINSZ = True
except AttributeError:
_HAVE_WINSZ = False

if verbose:
def debug(msg):
print(msg)
Expand Down Expand Up @@ -60,6 +73,27 @@ def _readline(fd):
reader = io.FileIO(fd, mode='rb', closefd=False)
return reader.readline()

def expectedFailureIfStdinIsTTY(fun):
# avoid isatty() for now
try:
tty.tcgetattr(pty.STDIN_FILENO)
return unittest.expectedFailure(fun)
except tty.error:
pass
return fun

def expectedFailureOnBSD(fun):
PLATFORM = platform.system()
if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
return unittest.expectedFailure(fun)
return fun

def _get_term_winsz(fd):
s = struct.pack("HHHH", 0, 0, 0, 0)
return fcntl.ioctl(fd, _TIOCGWINSZ, s)

def _set_term_winsz(fd, winsz):
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)


# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
Expand All @@ -78,6 +112,20 @@ def setUp(self):
self.addCleanup(signal.alarm, 0)
signal.alarm(10)

# Save original stdin window size
self.stdin_rows = None
self.stdin_cols = None
if _HAVE_WINSZ:
try:
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
self.stdin_rows = stdin_dim.lines
self.stdin_cols = stdin_dim.columns
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
self.stdin_cols, 0, 0)
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
except OSError:
pass

def handle_sig(self, sig, frame):
self.fail("isatty hung")

Expand All @@ -86,26 +134,65 @@ def handle_sighup(signum, frame):
# bpo-38547: if the process is the session leader, os.close(master_fd)
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
# signal: just ignore the signal.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.
pass

def test_basic(self):
@expectedFailureIfStdinIsTTY
def test_openpty(self):
try:
debug("Calling master_open()")
master_fd, slave_name = pty.master_open()
debug("Got master_fd '%d', slave_name '%s'" %
(master_fd, slave_name))
debug("Calling slave_open(%r)" % (slave_name,))
slave_fd = pty.slave_open(slave_name)
debug("Got slave_fd '%d'" % slave_fd)
mode = tty.tcgetattr(pty.STDIN_FILENO)
except tty.error:
# not a tty or bad/closed fd
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
mode = None

new_stdin_winsz = None
if self.stdin_rows != None and self.stdin_cols != None:
try:
debug("Setting pty.STDIN_FILENO window size")
# Set number of columns and rows to be the
# floors of 1/5 of respective original values
target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
self.stdin_cols//5, 0, 0)
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)

# Were we able to set the window size
# of pty.STDIN_FILENO successfully?
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
"pty.STDIN_FILENO window size unchanged")
except OSError:
warnings.warn("Failed to set pty.STDIN_FILENO window size")
pass

try:
debug("Calling pty.openpty()")
try:
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
except TypeError:
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
except OSError:
# " An optional feature could not be imported " ... ?
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")

self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")

if mode:
self.assertEqual(tty.tcgetattr(slave_fd), mode,
"openpty() failed to set slave termios")
if new_stdin_winsz:
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
"openpty() failed to set slave window size")

# Solaris requires reading the fd before anything is returned.
# My guess is that since we open and close the slave fd
# in master_open(), we need to read the EOF.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.

# Ensure the fd is non-blocking in case there's nothing to read.
blocking = os.get_blocking(master_fd)
Expand Down Expand Up @@ -222,8 +309,20 @@ def test_fork(self):

os.close(master_fd)

# pty.fork() passed.
@expectedFailureOnBSD
def test_master_read(self):
debug("Calling pty.openpty()")
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")

debug("Closing slave_fd")
os.close(slave_fd)

debug("Reading from master_fd")
with self.assertRaises(OSError):
os.read(master_fd, 1)

os.close(master_fd)

class SmallPtyTests(unittest.TestCase):
"""These tests don't spawn children or hang."""
Expand Down Expand Up @@ -262,8 +361,9 @@ def _socketpair(self):
self.files.extend(socketpair)
return socketpair

def _mock_select(self, rfds, wfds, xfds):
def _mock_select(self, rfds, wfds, xfds, timeout=0):
# This will raise IndexError when no more expected calls exist.
# This ignores the timeout
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
return self.select_rfds_results.pop(0), [], []

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updated tests for the pty library. test_basic() has been changed to test_openpty(); this additionally checks if slave termios and slave winsize are being set properly by pty.openpty(). In order to add support for FreeBSD, NetBSD, OpenBSD, and Darwin, this also adds test_master_read(), which demonstrates that pty.spawn() should not depend on an OSError to exit from its copy loop.