This repository was archived by the owner on Jan 22, 2021. It is now read-only.
forked from scarletcafe/jishaku
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshell.py
133 lines (97 loc) · 3.55 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
"""
jishaku.shell
~~~~~~~~~~~~~
Tools related to interacting directly with the shell.
:copyright: (c) 2019 Devon (Gorialis) R
:license: MIT, see LICENSE for more details.
"""
import asyncio
import os
import re
import shlex
import subprocess
import sys
import time
SHELL = os.getenv("SHELL") or "/bin/bash"
WINDOWS = sys.platform == "win32"
def background_reader(stream, loop: asyncio.AbstractEventLoop, callback):
"""
Reads a stream and forwards each line to an async callback.
"""
for line in iter(stream.readline, b''):
loop.call_soon_threadsafe(loop.create_task, callback(line))
class ShellReader:
"""
A class that passively reads from a shell and buffers results for read.
Example
-------
.. code:: python3
# reader should be in a with statement to ensure it is properly closed
with ShellReader('echo one; sleep 5; echo two') as reader:
# prints 'one', then 'two' after 5 seconds
async for x in reader:
print(x)
"""
def __init__(self, code: str, timeout: int = 90, loop: asyncio.AbstractEventLoop = None):
if WINDOWS:
sequence = shlex.split(code)
else:
sequence = [code]
self.process = subprocess.Popen(sequence, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.close_code = None
self.loop = loop or asyncio.get_event_loop()
self.timeout = timeout
self.stdout_task = self.make_reader_task(self.process.stdout, self.stdout_handler)
self.stderr_task = self.make_reader_task(self.process.stderr, self.stderr_handler)
self.queue = asyncio.Queue(maxsize=250)
@property
def closed(self):
"""
Are both tasks done, indicating there is no more to read?
"""
return self.stdout_task.done() and self.stderr_task.done()
async def executor_wrapper(self, *args, **kwargs):
"""
Call wrapper for stream reader.
"""
return await self.loop.run_in_executor(None, *args, **kwargs)
def make_reader_task(self, stream, callback):
"""
Create a reader executor task for a stream.
"""
return self.loop.create_task(self.executor_wrapper(background_reader, stream, self.loop, callback))
@staticmethod
def clean_bytes(line):
"""
Cleans a byte sequence of shell directives and decodes it.
"""
text = line.decode('utf-8').replace('\r', '').strip('\n')
return re.sub(r'\x1b[^m]*m', '', text).replace("``", "`\u200b`").strip('\n')
async def stdout_handler(self, line):
"""
Handler for this class for stdout.
"""
await self.queue.put(self.clean_bytes(line))
async def stderr_handler(self, line):
"""
Handler for this class for stderr.
"""
await self.queue.put(self.clean_bytes(b'[stderr] ' + line))
def __enter__(self):
return self
def __exit__(self, *args):
self.process.kill()
self.process.terminate()
self.close_code = self.process.wait(timeout=0.5)
def __aiter__(self):
return self
async def __anext__(self):
start_time = time.perf_counter()
while not self.closed or not self.queue.empty():
try:
return await asyncio.wait_for(self.queue.get(), timeout=1)
except asyncio.TimeoutError as exception:
if time.perf_counter() - start_time >= self.timeout:
raise exception
raise StopAsyncIteration()