-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpty2.py
228 lines (190 loc) · 6.45 KB
/
pty2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Pseudo terminal utilities v2."""
# Copyright (C) 2020 Soumendra Ganguly
#
# Prepared for cpython version >= 3.8
#
# Tested on Debian 10 GNU/Linux amd64,
# FreeBSD 12.1-RELEASE amd64,
# NetBSD 9.0 amd64,
# OpenBSD 6.7 amd64.
#
# To-do: Test on Solaris, Illumos, macOS, Cygwin, etc. Add Windows ConPTY support.
# v1 author: Steen Lumholt -- with additions by Guido.
# Copyright (C) 2001-2020 Python Software Foundation; All Rights Reserved
from select import select
from fcntl import ioctl
import os
import sys
import tty
import signal
__all__ = ["openpty", "fork", "spawn"]
STDIN_FILENO = 0
STDOUT_FILENO = 1
STDERR_FILENO = 2
CHILD = 0
ALL_SIGNALS = signal.valid_signals()
if hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ"):
HAVE_WINSZ = True
else:
HAVE_WINSZ = False
if HAVE_WINSZ and hasattr(signal, "SIGWINCH"):
HAVE_WINCH = True
else:
HAVE_WINCH = False
def openpty(mode=None, winsz=None, name=False):
"""openpty() -> (master_fd, slave_fd)
Open a pty master/slave pair, using os.openpty() if possible."""
master_fd, slave_fd = os.openpty()
if mode:
tty.tcsetattr(slave_fd, tty.TCSAFLUSH, mode)
if tty.HAVE_WINSZ and winsz:
tty.tcsetwinsize(slave_fd, winsz)
if name:
return master_fd, slave_fd, os.ttyname(slave_fd)
else:
return master_fd, slave_fd
def fork(mode=None, winsz=None):
"""fork() -> (pid, master_fd)
Fork and make the child a session leader with a controlling terminal."""
try:
pid, master_fd = os.forkpty()
except AttributeError:
master_fd, slave_fd = openpty(mode, winsz)
pid = os.fork()
if pid == CHILD:
os.close(master_fd)
os.login_tty(slave_fd)
else:
os.close(slave_fd)
else:
# re-introduce the os.setsid() call here?
#
# os.forkpty() makes sure that the slave end of
# the pty becomes the stdin of the child; this
# is usually done via a dup2() call
if pid == CHILD:
if mode:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
if tty.HAVE_WINSZ and winsz:
tty.tcsetwinsize(STDIN_FILENO, winsz)
return pid, master_fd
def _writen(fd, data):
"""Write all the data to a descriptor."""
while data:
n = os.write(fd, data)
data = data[n:]
def _read(fd):
"""Default read function."""
return os.read(fd, 1024)
def _getmask():
"""Gets signal mask of current thread."""
return signal.pthread_sigmask(signal.SIG_BLOCK, [])
def _sigblock():
"""Blocks all signals."""
signal.pthread_sigmask(signal.SIG_BLOCK, ALL_SIGNALS)
def _sigreset(saved_mask):
"""Restores signal mask."""
signal.pthread_sigmask(signal.SIG_SETMASK, saved_mask)
def _pty_setup(slave_echo):
"""Opens a pty pair. If current stdin is a tty, then
applies current stdin's termios and winsize to the slave,
sets current stdin to raw mode. Returns (master, slave,
original stdin mode/None, stdin winsize/None)."""
mode = None
winsz = None
try:
mode = tty.tcgetattr(STDIN_FILENO)
except tty.error:
master_fd, slave_fd = openpty()
_mode = tty.tcgetattr(slave_fd)
tty.mode_echo(_mode, slave_echo)
tty.tcsetattr(slave_fd, tty.TCSAFLUSH, _mode)
else:
if tty.HAVE_WINSZ:
winsz = tty.getwinsize(STDIN_FILENO)
_mode = list(mode)
tty.mode_echo(_mode, slave_echo)
master_fd, slave_fd = openpty(_mode, winsz)
tty.mode_raw(_mode)
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, _mode)
return master_fd, slave_fd, mode, winsz
def _winchset(slave_fd, saved_mask, handle_winch):
"""Installs SIGWINCH handler. Returns old SIGWINCH
handler if relevant; returns None otherwise."""
bkh = None
if handle_winch:
def _hwinch(signum, frame):
"""SIGWINCH handler."""
_sigblock()
new_slave_fd = os.open(slave_path, os.O_RDWR)
tty.setwinsize(new_slave_fd, tty.getwinsize(STDIN_FILENO))
os.close(new_slave_fd)
_sigreset(saved_mask)
slave_path = os.ttyname(slave_fd)
try:
# Raises ValueError if not called from main thread.
bkh = signal.signal(SIGWINCH, _hwinch)
except ValueError:
pass
return bkh
def _copy(master_fd, saved_mask=set(), master_read=_read, stdin_read=_read):
"""Parent copy loop for spawn.
Copies
pty master -> standard output (master_read)
standard input -> pty master (stdin_read)
To exit from this loop
A. FreeBSD, OpenBSD, NetBSD return no data upon reading master EOF,
B. Linux throws OSError when trying to read from master when
1. ALL descriptors of slave are closed in parent AND
2. child has exited."""
fds = [master_fd, STDIN_FILENO]
args = [fds, [], []]
while True:
_sigreset(saved_mask)
rfds = select(*args)[0]
_sigblock()
if not rfds:
return
if master_fd in rfds:
try:
data = master_read(master_fd)
except OSError:
data = b""
if not data:
fds.remove(master_fd)
args.append(0.01) # set timeout
else:
os.write(STDOUT_FILENO, data)
if STDIN_FILENO in rfds:
data = stdin_read(STDIN_FILENO)
if not data:
fds.remove(STDIN_FILENO)
else:
_writen(master_fd, data)
def spawn(argv, master_read=_read, stdin_read=_read, slave_echo=True, handle_winch=False):
"""Spawn a process."""
if type(argv) == type(''):
argv = (argv,)
sys.audit('pty.spawn', argv)
saved_mask = _getmask()
_sigblock() # Reset during select() in _copy.
master_fd, slave_fd, mode, winsz = _pty_setup(slave_echo)
handle_winch = handle_winch and (winsz != None) and HAVE_WINCH
bkh = _winchset(slave_fd, saved_mask, handle_winch)
pid = os.fork()
if pid == CHILD:
os.close(master_fd)
os.login_tty(slave_fd)
_sigreset(saved_mask)
os.execlp(argv[0], *argv)
os.close(slave_fd)
try:
_copy(master_fd, saved_mask, master_read, stdin_read)
finally:
if mode:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
if bkh:
signal.signal(SIGWINCH, bkh)
os.close(master_fd)
_sigreset(saved_mask)
return os.waitpid(pid, 0)[1]