forked from mhammond/pywin32
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwinprocess.py
More file actions
230 lines (209 loc) · 7.19 KB
/
winprocess.py
File metadata and controls
230 lines (209 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""
Windows Process Control
winprocess.run launches a child process and returns the exit code.
Optionally, it can:
redirect stdin, stdout & stderr to files
run the command as another user
limit the process's running time
control the process window (location, size, window state, desktop)
Works on Windows NT, 2000 & XP. Requires Mark Hammond's win32
extensions.
This code is free for any purpose, with no warranty of any kind.
-- John B. Dell'Aquila <jbd@alum.mit.edu>
"""
import msvcrt
import os
import win32api
import win32con
import win32event
import win32gui
import win32process
import win32security
def logonUser(loginString):
"""
Login as specified user and return handle.
loginString: 'Domain\nUser\nPassword'; for local
login use . or empty string as domain
e.g. '.\nadministrator\nsecret_password'
"""
domain, user, passwd = loginString.split("\n")
return win32security.LogonUser(
user,
domain,
passwd,
win32con.LOGON32_LOGON_INTERACTIVE,
win32con.LOGON32_PROVIDER_DEFAULT,
)
class Process:
"""
A Windows process.
"""
def __init__(
self,
cmd,
login=None,
hStdin=None,
hStdout=None,
hStderr=None,
show=1,
xy=None,
xySize=None,
desktop=None,
):
"""
Create a Windows process.
cmd: command to run
login: run as user 'Domain\nUser\nPassword'
hStdin, hStdout, hStderr:
handles for process I/O; default is caller's stdin,
stdout & stderr
show: wShowWindow (0=SW_HIDE, 1=SW_NORMAL, ...)
xy: window offset (x, y) of upper left corner in pixels
xySize: window size (width, height) in pixels
desktop: lpDesktop - name of desktop e.g. 'winsta0\\default'
None = inherit current desktop
'' = create new desktop if necessary
User calling login requires additional privileges:
Act as part of the operating system [not needed on Windows XP]
Increase quotas
Replace a process level token
Login string must EITHER be an administrator's account
(ordinary user can't access current desktop - see Microsoft
Q165194) OR use desktop='' to run another desktop invisibly
(may be very slow to startup & finalize).
"""
si = win32process.STARTUPINFO()
si.dwFlags = win32con.STARTF_USESTDHANDLES ^ win32con.STARTF_USESHOWWINDOW
if hStdin is None:
si.hStdInput = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
else:
si.hStdInput = hStdin
if hStdout is None:
si.hStdOutput = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
else:
si.hStdOutput = hStdout
if hStderr is None:
si.hStdError = win32api.GetStdHandle(win32api.STD_ERROR_HANDLE)
else:
si.hStdError = hStderr
si.wShowWindow = show
if xy is not None:
si.dwX, si.dwY = xy
si.dwFlags ^= win32con.STARTF_USEPOSITION
if xySize is not None:
si.dwXSize, si.dwYSize = xySize
si.dwFlags ^= win32con.STARTF_USESIZE
if desktop is not None:
si.lpDesktop = desktop
procArgs = (
None, # appName
cmd, # commandLine
None, # processAttributes
None, # threadAttributes
1, # bInheritHandles
win32process.CREATE_NEW_CONSOLE, # dwCreationFlags
None, # newEnvironment
None, # currentDirectory
si,
) # startupinfo
if login is not None:
hUser = logonUser(login)
win32security.ImpersonateLoggedOnUser(hUser)
procHandles = win32process.CreateProcessAsUser(hUser, *procArgs)
win32security.RevertToSelf()
else:
procHandles = win32process.CreateProcess(*procArgs)
self.hProcess, self.hThread, self.PId, self.TId = procHandles
def wait(self, mSec=None):
"""
Wait for process to finish or for specified number of
milliseconds to elapse.
"""
if mSec is None:
mSec = win32event.INFINITE
return win32event.WaitForSingleObject(self.hProcess, mSec)
def kill(self, gracePeriod=5000):
"""
Kill process. Try for an orderly shutdown via WM_CLOSE. If
still running after gracePeriod (5 sec. default), terminate.
"""
win32gui.EnumWindows(self.__close__, 0)
if self.wait(gracePeriod) != win32event.WAIT_OBJECT_0:
win32process.TerminateProcess(self.hProcess, 0)
win32api.Sleep(100) # wait for resources to be released
def __close__(self, hwnd, dummy):
"""
EnumWindows callback - sends WM_CLOSE to any window
owned by this process.
"""
TId, PId = win32process.GetWindowThreadProcessId(hwnd)
if PId == self.PId:
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
def exitCode(self):
"""
Return process exit code.
"""
return win32process.GetExitCodeProcess(self.hProcess)
def run(cmd, mSec=None, stdin=None, stdout=None, stderr=None, **kw):
"""
Run cmd as a child process and return exit code.
mSec: terminate cmd after specified number of milliseconds
stdin, stdout, stderr:
file objects for child I/O (use hStdin etc. to attach
handles instead of files); default is caller's stdin,
stdout & stderr;
kw: see Process.__init__ for more keyword options
"""
if stdin is not None:
kw["hStdin"] = msvcrt.get_osfhandle(stdin.fileno())
if stdout is not None:
kw["hStdout"] = msvcrt.get_osfhandle(stdout.fileno())
if stderr is not None:
kw["hStderr"] = msvcrt.get_osfhandle(stderr.fileno())
child = Process(cmd, **kw)
if child.wait(mSec) != win32event.WAIT_OBJECT_0:
child.kill()
raise WindowsError("process timeout exceeded")
return child.exitCode()
if __name__ == "__main__":
# Pipe commands to a shell and display the output in notepad
print("Testing winprocess.py...")
import tempfile
timeoutSeconds = 15
cmdString = (
"""\
REM Test of winprocess.py piping commands to a shell.\r
REM This 'notepad' process will terminate in %d seconds.\r
vol\r
net user\r
_this_is_a_test_of_stderr_\r
"""
% timeoutSeconds
)
cmd_name = tempfile.mktemp()
out_name = cmd_name + ".txt"
try:
cmd = open(cmd_name, "w+b")
out = open(out_name, "w+b")
cmd.write(cmdString.encode("utf-8"))
cmd.seek(0)
print(
"CMD.EXE exit code:",
run("cmd.exe", show=0, stdin=cmd, stdout=out, stderr=out),
)
cmd.close()
print(
"NOTEPAD exit code:",
run(
"notepad.exe %s" % out.name,
show=win32con.SW_MAXIMIZE,
mSec=timeoutSeconds * 1000,
),
)
out.close()
finally:
for n in (cmd_name, out_name):
try:
os.unlink(cmd_name)
except os.error:
pass