logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

subprocess.py (7405B)


  1. __all__ = 'create_subprocess_exec', 'create_subprocess_shell'
  2. import subprocess
  3. from . import events
  4. from . import protocols
  5. from . import streams
  6. from . import tasks
  7. from .log import logger
  8. PIPE = subprocess.PIPE
  9. STDOUT = subprocess.STDOUT
  10. DEVNULL = subprocess.DEVNULL
  11. class SubprocessStreamProtocol(streams.FlowControlMixin,
  12. protocols.SubprocessProtocol):
  13. """Like StreamReaderProtocol, but for a subprocess."""
  14. def __init__(self, limit, loop):
  15. super().__init__(loop=loop)
  16. self._limit = limit
  17. self.stdin = self.stdout = self.stderr = None
  18. self._transport = None
  19. self._process_exited = False
  20. self._pipe_fds = []
  21. self._stdin_closed = self._loop.create_future()
  22. def __repr__(self):
  23. info = [self.__class__.__name__]
  24. if self.stdin is not None:
  25. info.append(f'stdin={self.stdin!r}')
  26. if self.stdout is not None:
  27. info.append(f'stdout={self.stdout!r}')
  28. if self.stderr is not None:
  29. info.append(f'stderr={self.stderr!r}')
  30. return '<{}>'.format(' '.join(info))
  31. def connection_made(self, transport):
  32. self._transport = transport
  33. stdout_transport = transport.get_pipe_transport(1)
  34. if stdout_transport is not None:
  35. self.stdout = streams.StreamReader(limit=self._limit,
  36. loop=self._loop)
  37. self.stdout.set_transport(stdout_transport)
  38. self._pipe_fds.append(1)
  39. stderr_transport = transport.get_pipe_transport(2)
  40. if stderr_transport is not None:
  41. self.stderr = streams.StreamReader(limit=self._limit,
  42. loop=self._loop)
  43. self.stderr.set_transport(stderr_transport)
  44. self._pipe_fds.append(2)
  45. stdin_transport = transport.get_pipe_transport(0)
  46. if stdin_transport is not None:
  47. self.stdin = streams.StreamWriter(stdin_transport,
  48. protocol=self,
  49. reader=None,
  50. loop=self._loop)
  51. def pipe_data_received(self, fd, data):
  52. if fd == 1:
  53. reader = self.stdout
  54. elif fd == 2:
  55. reader = self.stderr
  56. else:
  57. reader = None
  58. if reader is not None:
  59. reader.feed_data(data)
  60. def pipe_connection_lost(self, fd, exc):
  61. if fd == 0:
  62. pipe = self.stdin
  63. if pipe is not None:
  64. pipe.close()
  65. self.connection_lost(exc)
  66. if exc is None:
  67. self._stdin_closed.set_result(None)
  68. else:
  69. self._stdin_closed.set_exception(exc)
  70. return
  71. if fd == 1:
  72. reader = self.stdout
  73. elif fd == 2:
  74. reader = self.stderr
  75. else:
  76. reader = None
  77. if reader is not None:
  78. if exc is None:
  79. reader.feed_eof()
  80. else:
  81. reader.set_exception(exc)
  82. if fd in self._pipe_fds:
  83. self._pipe_fds.remove(fd)
  84. self._maybe_close_transport()
  85. def process_exited(self):
  86. self._process_exited = True
  87. self._maybe_close_transport()
  88. def _maybe_close_transport(self):
  89. if len(self._pipe_fds) == 0 and self._process_exited:
  90. self._transport.close()
  91. self._transport = None
  92. def _get_close_waiter(self, stream):
  93. if stream is self.stdin:
  94. return self._stdin_closed
  95. class Process:
  96. def __init__(self, transport, protocol, loop):
  97. self._transport = transport
  98. self._protocol = protocol
  99. self._loop = loop
  100. self.stdin = protocol.stdin
  101. self.stdout = protocol.stdout
  102. self.stderr = protocol.stderr
  103. self.pid = transport.get_pid()
  104. def __repr__(self):
  105. return f'<{self.__class__.__name__} {self.pid}>'
  106. @property
  107. def returncode(self):
  108. return self._transport.get_returncode()
  109. async def wait(self):
  110. """Wait until the process exit and return the process return code."""
  111. return await self._transport._wait()
  112. def send_signal(self, signal):
  113. self._transport.send_signal(signal)
  114. def terminate(self):
  115. self._transport.terminate()
  116. def kill(self):
  117. self._transport.kill()
  118. async def _feed_stdin(self, input):
  119. debug = self._loop.get_debug()
  120. self.stdin.write(input)
  121. if debug:
  122. logger.debug(
  123. '%r communicate: feed stdin (%s bytes)', self, len(input))
  124. try:
  125. await self.stdin.drain()
  126. except (BrokenPipeError, ConnectionResetError) as exc:
  127. # communicate() ignores BrokenPipeError and ConnectionResetError
  128. if debug:
  129. logger.debug('%r communicate: stdin got %r', self, exc)
  130. if debug:
  131. logger.debug('%r communicate: close stdin', self)
  132. self.stdin.close()
  133. async def _noop(self):
  134. return None
  135. async def _read_stream(self, fd):
  136. transport = self._transport.get_pipe_transport(fd)
  137. if fd == 2:
  138. stream = self.stderr
  139. else:
  140. assert fd == 1
  141. stream = self.stdout
  142. if self._loop.get_debug():
  143. name = 'stdout' if fd == 1 else 'stderr'
  144. logger.debug('%r communicate: read %s', self, name)
  145. output = await stream.read()
  146. if self._loop.get_debug():
  147. name = 'stdout' if fd == 1 else 'stderr'
  148. logger.debug('%r communicate: close %s', self, name)
  149. transport.close()
  150. return output
  151. async def communicate(self, input=None):
  152. if input is not None:
  153. stdin = self._feed_stdin(input)
  154. else:
  155. stdin = self._noop()
  156. if self.stdout is not None:
  157. stdout = self._read_stream(1)
  158. else:
  159. stdout = self._noop()
  160. if self.stderr is not None:
  161. stderr = self._read_stream(2)
  162. else:
  163. stderr = self._noop()
  164. stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
  165. await self.wait()
  166. return (stdout, stderr)
  167. async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
  168. limit=streams._DEFAULT_LIMIT, **kwds):
  169. loop = events.get_running_loop()
  170. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  171. loop=loop)
  172. transport, protocol = await loop.subprocess_shell(
  173. protocol_factory,
  174. cmd, stdin=stdin, stdout=stdout,
  175. stderr=stderr, **kwds)
  176. return Process(transport, protocol, loop)
  177. async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
  178. stderr=None, limit=streams._DEFAULT_LIMIT,
  179. **kwds):
  180. loop = events.get_running_loop()
  181. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  182. loop=loop)
  183. transport, protocol = await loop.subprocess_exec(
  184. protocol_factory,
  185. program, *args,
  186. stdin=stdin, stdout=stdout,
  187. stderr=stderr, **kwds)
  188. return Process(transport, protocol, loop)