logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

windows_events.py (32907B)


  1. """Selector and proactor event loops for Windows."""
  2. import _overlapped
  3. import _winapi
  4. import errno
  5. import math
  6. import msvcrt
  7. import socket
  8. import struct
  9. import time
  10. import weakref
  11. from . import events
  12. from . import base_subprocess
  13. from . import futures
  14. from . import exceptions
  15. from . import proactor_events
  16. from . import selector_events
  17. from . import tasks
  18. from . import windows_utils
  19. from .log import logger
  20. __all__ = (
  21. 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
  22. 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
  23. 'WindowsProactorEventLoopPolicy',
  24. )
  25. NULL = 0
  26. INFINITE = 0xffffffff
  27. ERROR_CONNECTION_REFUSED = 1225
  28. ERROR_CONNECTION_ABORTED = 1236
  29. # Initial delay in seconds for connect_pipe() before retrying to connect
  30. CONNECT_PIPE_INIT_DELAY = 0.001
  31. # Maximum delay in seconds for connect_pipe() before retrying to connect
  32. CONNECT_PIPE_MAX_DELAY = 0.100
  33. class _OverlappedFuture(futures.Future):
  34. """Subclass of Future which represents an overlapped operation.
  35. Cancelling it will immediately cancel the overlapped operation.
  36. """
  37. def __init__(self, ov, *, loop=None):
  38. super().__init__(loop=loop)
  39. if self._source_traceback:
  40. del self._source_traceback[-1]
  41. self._ov = ov
  42. def _repr_info(self):
  43. info = super()._repr_info()
  44. if self._ov is not None:
  45. state = 'pending' if self._ov.pending else 'completed'
  46. info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
  47. return info
  48. def _cancel_overlapped(self):
  49. if self._ov is None:
  50. return
  51. try:
  52. self._ov.cancel()
  53. except OSError as exc:
  54. context = {
  55. 'message': 'Cancelling an overlapped future failed',
  56. 'exception': exc,
  57. 'future': self,
  58. }
  59. if self._source_traceback:
  60. context['source_traceback'] = self._source_traceback
  61. self._loop.call_exception_handler(context)
  62. self._ov = None
  63. def cancel(self, msg=None):
  64. self._cancel_overlapped()
  65. return super().cancel(msg=msg)
  66. def set_exception(self, exception):
  67. super().set_exception(exception)
  68. self._cancel_overlapped()
  69. def set_result(self, result):
  70. super().set_result(result)
  71. self._ov = None
  72. class _BaseWaitHandleFuture(futures.Future):
  73. """Subclass of Future which represents a wait handle."""
  74. def __init__(self, ov, handle, wait_handle, *, loop=None):
  75. super().__init__(loop=loop)
  76. if self._source_traceback:
  77. del self._source_traceback[-1]
  78. # Keep a reference to the Overlapped object to keep it alive until the
  79. # wait is unregistered
  80. self._ov = ov
  81. self._handle = handle
  82. self._wait_handle = wait_handle
  83. # Should we call UnregisterWaitEx() if the wait completes
  84. # or is cancelled?
  85. self._registered = True
  86. def _poll(self):
  87. # non-blocking wait: use a timeout of 0 millisecond
  88. return (_winapi.WaitForSingleObject(self._handle, 0) ==
  89. _winapi.WAIT_OBJECT_0)
  90. def _repr_info(self):
  91. info = super()._repr_info()
  92. info.append(f'handle={self._handle:#x}')
  93. if self._handle is not None:
  94. state = 'signaled' if self._poll() else 'waiting'
  95. info.append(state)
  96. if self._wait_handle is not None:
  97. info.append(f'wait_handle={self._wait_handle:#x}')
  98. return info
  99. def _unregister_wait_cb(self, fut):
  100. # The wait was unregistered: it's not safe to destroy the Overlapped
  101. # object
  102. self._ov = None
  103. def _unregister_wait(self):
  104. if not self._registered:
  105. return
  106. self._registered = False
  107. wait_handle = self._wait_handle
  108. self._wait_handle = None
  109. try:
  110. _overlapped.UnregisterWait(wait_handle)
  111. except OSError as exc:
  112. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  113. context = {
  114. 'message': 'Failed to unregister the wait handle',
  115. 'exception': exc,
  116. 'future': self,
  117. }
  118. if self._source_traceback:
  119. context['source_traceback'] = self._source_traceback
  120. self._loop.call_exception_handler(context)
  121. return
  122. # ERROR_IO_PENDING means that the unregister is pending
  123. self._unregister_wait_cb(None)
  124. def cancel(self, msg=None):
  125. self._unregister_wait()
  126. return super().cancel(msg=msg)
  127. def set_exception(self, exception):
  128. self._unregister_wait()
  129. super().set_exception(exception)
  130. def set_result(self, result):
  131. self._unregister_wait()
  132. super().set_result(result)
  133. class _WaitCancelFuture(_BaseWaitHandleFuture):
  134. """Subclass of Future which represents a wait for the cancellation of a
  135. _WaitHandleFuture using an event.
  136. """
  137. def __init__(self, ov, event, wait_handle, *, loop=None):
  138. super().__init__(ov, event, wait_handle, loop=loop)
  139. self._done_callback = None
  140. def cancel(self):
  141. raise RuntimeError("_WaitCancelFuture must not be cancelled")
  142. def set_result(self, result):
  143. super().set_result(result)
  144. if self._done_callback is not None:
  145. self._done_callback(self)
  146. def set_exception(self, exception):
  147. super().set_exception(exception)
  148. if self._done_callback is not None:
  149. self._done_callback(self)
  150. class _WaitHandleFuture(_BaseWaitHandleFuture):
  151. def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
  152. super().__init__(ov, handle, wait_handle, loop=loop)
  153. self._proactor = proactor
  154. self._unregister_proactor = True
  155. self._event = _overlapped.CreateEvent(None, True, False, None)
  156. self._event_fut = None
  157. def _unregister_wait_cb(self, fut):
  158. if self._event is not None:
  159. _winapi.CloseHandle(self._event)
  160. self._event = None
  161. self._event_fut = None
  162. # If the wait was cancelled, the wait may never be signalled, so
  163. # it's required to unregister it. Otherwise, IocpProactor.close() will
  164. # wait forever for an event which will never come.
  165. #
  166. # If the IocpProactor already received the event, it's safe to call
  167. # _unregister() because we kept a reference to the Overlapped object
  168. # which is used as a unique key.
  169. self._proactor._unregister(self._ov)
  170. self._proactor = None
  171. super()._unregister_wait_cb(fut)
  172. def _unregister_wait(self):
  173. if not self._registered:
  174. return
  175. self._registered = False
  176. wait_handle = self._wait_handle
  177. self._wait_handle = None
  178. try:
  179. _overlapped.UnregisterWaitEx(wait_handle, self._event)
  180. except OSError as exc:
  181. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  182. context = {
  183. 'message': 'Failed to unregister the wait handle',
  184. 'exception': exc,
  185. 'future': self,
  186. }
  187. if self._source_traceback:
  188. context['source_traceback'] = self._source_traceback
  189. self._loop.call_exception_handler(context)
  190. return
  191. # ERROR_IO_PENDING is not an error, the wait was unregistered
  192. self._event_fut = self._proactor._wait_cancel(self._event,
  193. self._unregister_wait_cb)
  194. class PipeServer(object):
  195. """Class representing a pipe server.
  196. This is much like a bound, listening socket.
  197. """
  198. def __init__(self, address):
  199. self._address = address
  200. self._free_instances = weakref.WeakSet()
  201. # initialize the pipe attribute before calling _server_pipe_handle()
  202. # because this function can raise an exception and the destructor calls
  203. # the close() method
  204. self._pipe = None
  205. self._accept_pipe_future = None
  206. self._pipe = self._server_pipe_handle(True)
  207. def _get_unconnected_pipe(self):
  208. # Create new instance and return previous one. This ensures
  209. # that (until the server is closed) there is always at least
  210. # one pipe handle for address. Therefore if a client attempt
  211. # to connect it will not fail with FileNotFoundError.
  212. tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
  213. return tmp
  214. def _server_pipe_handle(self, first):
  215. # Return a wrapper for a new pipe handle.
  216. if self.closed():
  217. return None
  218. flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
  219. if first:
  220. flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
  221. h = _winapi.CreateNamedPipe(
  222. self._address, flags,
  223. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  224. _winapi.PIPE_WAIT,
  225. _winapi.PIPE_UNLIMITED_INSTANCES,
  226. windows_utils.BUFSIZE, windows_utils.BUFSIZE,
  227. _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
  228. pipe = windows_utils.PipeHandle(h)
  229. self._free_instances.add(pipe)
  230. return pipe
  231. def closed(self):
  232. return (self._address is None)
  233. def close(self):
  234. if self._accept_pipe_future is not None:
  235. self._accept_pipe_future.cancel()
  236. self._accept_pipe_future = None
  237. # Close all instances which have not been connected to by a client.
  238. if self._address is not None:
  239. for pipe in self._free_instances:
  240. pipe.close()
  241. self._pipe = None
  242. self._address = None
  243. self._free_instances.clear()
  244. __del__ = close
  245. class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  246. """Windows version of selector event loop."""
  247. class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
  248. """Windows version of proactor event loop using IOCP."""
  249. def __init__(self, proactor=None):
  250. if proactor is None:
  251. proactor = IocpProactor()
  252. super().__init__(proactor)
  253. def run_forever(self):
  254. try:
  255. assert self._self_reading_future is None
  256. self.call_soon(self._loop_self_reading)
  257. super().run_forever()
  258. finally:
  259. if self._self_reading_future is not None:
  260. ov = self._self_reading_future._ov
  261. self._self_reading_future.cancel()
  262. # self_reading_future was just cancelled so if it hasn't been
  263. # finished yet, it never will be (it's possible that it has
  264. # already finished and its callback is waiting in the queue,
  265. # where it could still happen if the event loop is restarted).
  266. # Unregister it otherwise IocpProactor.close will wait for it
  267. # forever
  268. if ov is not None:
  269. self._proactor._unregister(ov)
  270. self._self_reading_future = None
  271. async def create_pipe_connection(self, protocol_factory, address):
  272. f = self._proactor.connect_pipe(address)
  273. pipe = await f
  274. protocol = protocol_factory()
  275. trans = self._make_duplex_pipe_transport(pipe, protocol,
  276. extra={'addr': address})
  277. return trans, protocol
  278. async def start_serving_pipe(self, protocol_factory, address):
  279. server = PipeServer(address)
  280. def loop_accept_pipe(f=None):
  281. pipe = None
  282. try:
  283. if f:
  284. pipe = f.result()
  285. server._free_instances.discard(pipe)
  286. if server.closed():
  287. # A client connected before the server was closed:
  288. # drop the client (close the pipe) and exit
  289. pipe.close()
  290. return
  291. protocol = protocol_factory()
  292. self._make_duplex_pipe_transport(
  293. pipe, protocol, extra={'addr': address})
  294. pipe = server._get_unconnected_pipe()
  295. if pipe is None:
  296. return
  297. f = self._proactor.accept_pipe(pipe)
  298. except OSError as exc:
  299. if pipe and pipe.fileno() != -1:
  300. self.call_exception_handler({
  301. 'message': 'Pipe accept failed',
  302. 'exception': exc,
  303. 'pipe': pipe,
  304. })
  305. pipe.close()
  306. elif self._debug:
  307. logger.warning("Accept pipe failed on pipe %r",
  308. pipe, exc_info=True)
  309. except exceptions.CancelledError:
  310. if pipe:
  311. pipe.close()
  312. else:
  313. server._accept_pipe_future = f
  314. f.add_done_callback(loop_accept_pipe)
  315. self.call_soon(loop_accept_pipe)
  316. return [server]
  317. async def _make_subprocess_transport(self, protocol, args, shell,
  318. stdin, stdout, stderr, bufsize,
  319. extra=None, **kwargs):
  320. waiter = self.create_future()
  321. transp = _WindowsSubprocessTransport(self, protocol, args, shell,
  322. stdin, stdout, stderr, bufsize,
  323. waiter=waiter, extra=extra,
  324. **kwargs)
  325. try:
  326. await waiter
  327. except (SystemExit, KeyboardInterrupt):
  328. raise
  329. except BaseException:
  330. transp.close()
  331. await transp._wait()
  332. raise
  333. return transp
  334. class IocpProactor:
  335. """Proactor implementation using IOCP."""
  336. def __init__(self, concurrency=0xffffffff):
  337. self._loop = None
  338. self._results = []
  339. self._iocp = _overlapped.CreateIoCompletionPort(
  340. _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
  341. self._cache = {}
  342. self._registered = weakref.WeakSet()
  343. self._unregistered = []
  344. self._stopped_serving = weakref.WeakSet()
  345. def _check_closed(self):
  346. if self._iocp is None:
  347. raise RuntimeError('IocpProactor is closed')
  348. def __repr__(self):
  349. info = ['overlapped#=%s' % len(self._cache),
  350. 'result#=%s' % len(self._results)]
  351. if self._iocp is None:
  352. info.append('closed')
  353. return '<%s %s>' % (self.__class__.__name__, " ".join(info))
  354. def set_loop(self, loop):
  355. self._loop = loop
  356. def select(self, timeout=None):
  357. if not self._results:
  358. self._poll(timeout)
  359. tmp = self._results
  360. self._results = []
  361. return tmp
  362. def _result(self, value):
  363. fut = self._loop.create_future()
  364. fut.set_result(value)
  365. return fut
  366. def recv(self, conn, nbytes, flags=0):
  367. self._register_with_iocp(conn)
  368. ov = _overlapped.Overlapped(NULL)
  369. try:
  370. if isinstance(conn, socket.socket):
  371. ov.WSARecv(conn.fileno(), nbytes, flags)
  372. else:
  373. ov.ReadFile(conn.fileno(), nbytes)
  374. except BrokenPipeError:
  375. return self._result(b'')
  376. def finish_recv(trans, key, ov):
  377. try:
  378. return ov.getresult()
  379. except OSError as exc:
  380. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  381. _overlapped.ERROR_OPERATION_ABORTED):
  382. raise ConnectionResetError(*exc.args)
  383. else:
  384. raise
  385. return self._register(ov, conn, finish_recv)
  386. def recv_into(self, conn, buf, flags=0):
  387. self._register_with_iocp(conn)
  388. ov = _overlapped.Overlapped(NULL)
  389. try:
  390. if isinstance(conn, socket.socket):
  391. ov.WSARecvInto(conn.fileno(), buf, flags)
  392. else:
  393. ov.ReadFileInto(conn.fileno(), buf)
  394. except BrokenPipeError:
  395. return self._result(0)
  396. def finish_recv(trans, key, ov):
  397. try:
  398. return ov.getresult()
  399. except OSError as exc:
  400. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  401. _overlapped.ERROR_OPERATION_ABORTED):
  402. raise ConnectionResetError(*exc.args)
  403. else:
  404. raise
  405. return self._register(ov, conn, finish_recv)
  406. def recvfrom(self, conn, nbytes, flags=0):
  407. self._register_with_iocp(conn)
  408. ov = _overlapped.Overlapped(NULL)
  409. try:
  410. ov.WSARecvFrom(conn.fileno(), nbytes, flags)
  411. except BrokenPipeError:
  412. return self._result((b'', None))
  413. def finish_recv(trans, key, ov):
  414. try:
  415. return ov.getresult()
  416. except OSError as exc:
  417. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  418. _overlapped.ERROR_OPERATION_ABORTED):
  419. raise ConnectionResetError(*exc.args)
  420. else:
  421. raise
  422. return self._register(ov, conn, finish_recv)
  423. def sendto(self, conn, buf, flags=0, addr=None):
  424. self._register_with_iocp(conn)
  425. ov = _overlapped.Overlapped(NULL)
  426. ov.WSASendTo(conn.fileno(), buf, flags, addr)
  427. def finish_send(trans, key, ov):
  428. try:
  429. return ov.getresult()
  430. except OSError as exc:
  431. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  432. _overlapped.ERROR_OPERATION_ABORTED):
  433. raise ConnectionResetError(*exc.args)
  434. else:
  435. raise
  436. return self._register(ov, conn, finish_send)
  437. def send(self, conn, buf, flags=0):
  438. self._register_with_iocp(conn)
  439. ov = _overlapped.Overlapped(NULL)
  440. if isinstance(conn, socket.socket):
  441. ov.WSASend(conn.fileno(), buf, flags)
  442. else:
  443. ov.WriteFile(conn.fileno(), buf)
  444. def finish_send(trans, key, ov):
  445. try:
  446. return ov.getresult()
  447. except OSError as exc:
  448. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  449. _overlapped.ERROR_OPERATION_ABORTED):
  450. raise ConnectionResetError(*exc.args)
  451. else:
  452. raise
  453. return self._register(ov, conn, finish_send)
  454. def accept(self, listener):
  455. self._register_with_iocp(listener)
  456. conn = self._get_accept_socket(listener.family)
  457. ov = _overlapped.Overlapped(NULL)
  458. ov.AcceptEx(listener.fileno(), conn.fileno())
  459. def finish_accept(trans, key, ov):
  460. ov.getresult()
  461. # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
  462. buf = struct.pack('@P', listener.fileno())
  463. conn.setsockopt(socket.SOL_SOCKET,
  464. _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
  465. conn.settimeout(listener.gettimeout())
  466. return conn, conn.getpeername()
  467. async def accept_coro(future, conn):
  468. # Coroutine closing the accept socket if the future is cancelled
  469. try:
  470. await future
  471. except exceptions.CancelledError:
  472. conn.close()
  473. raise
  474. future = self._register(ov, listener, finish_accept)
  475. coro = accept_coro(future, conn)
  476. tasks.ensure_future(coro, loop=self._loop)
  477. return future
  478. def connect(self, conn, address):
  479. if conn.type == socket.SOCK_DGRAM:
  480. # WSAConnect will complete immediately for UDP sockets so we don't
  481. # need to register any IOCP operation
  482. _overlapped.WSAConnect(conn.fileno(), address)
  483. fut = self._loop.create_future()
  484. fut.set_result(None)
  485. return fut
  486. self._register_with_iocp(conn)
  487. # The socket needs to be locally bound before we call ConnectEx().
  488. try:
  489. _overlapped.BindLocal(conn.fileno(), conn.family)
  490. except OSError as e:
  491. if e.winerror != errno.WSAEINVAL:
  492. raise
  493. # Probably already locally bound; check using getsockname().
  494. if conn.getsockname()[1] == 0:
  495. raise
  496. ov = _overlapped.Overlapped(NULL)
  497. ov.ConnectEx(conn.fileno(), address)
  498. def finish_connect(trans, key, ov):
  499. ov.getresult()
  500. # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
  501. conn.setsockopt(socket.SOL_SOCKET,
  502. _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
  503. return conn
  504. return self._register(ov, conn, finish_connect)
  505. def sendfile(self, sock, file, offset, count):
  506. self._register_with_iocp(sock)
  507. ov = _overlapped.Overlapped(NULL)
  508. offset_low = offset & 0xffff_ffff
  509. offset_high = (offset >> 32) & 0xffff_ffff
  510. ov.TransmitFile(sock.fileno(),
  511. msvcrt.get_osfhandle(file.fileno()),
  512. offset_low, offset_high,
  513. count, 0, 0)
  514. def finish_sendfile(trans, key, ov):
  515. try:
  516. return ov.getresult()
  517. except OSError as exc:
  518. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  519. _overlapped.ERROR_OPERATION_ABORTED):
  520. raise ConnectionResetError(*exc.args)
  521. else:
  522. raise
  523. return self._register(ov, sock, finish_sendfile)
  524. def accept_pipe(self, pipe):
  525. self._register_with_iocp(pipe)
  526. ov = _overlapped.Overlapped(NULL)
  527. connected = ov.ConnectNamedPipe(pipe.fileno())
  528. if connected:
  529. # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
  530. # that the pipe is connected. There is no need to wait for the
  531. # completion of the connection.
  532. return self._result(pipe)
  533. def finish_accept_pipe(trans, key, ov):
  534. ov.getresult()
  535. return pipe
  536. return self._register(ov, pipe, finish_accept_pipe)
  537. async def connect_pipe(self, address):
  538. delay = CONNECT_PIPE_INIT_DELAY
  539. while True:
  540. # Unfortunately there is no way to do an overlapped connect to
  541. # a pipe. Call CreateFile() in a loop until it doesn't fail with
  542. # ERROR_PIPE_BUSY.
  543. try:
  544. handle = _overlapped.ConnectPipe(address)
  545. break
  546. except OSError as exc:
  547. if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
  548. raise
  549. # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
  550. delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
  551. await tasks.sleep(delay)
  552. return windows_utils.PipeHandle(handle)
  553. def wait_for_handle(self, handle, timeout=None):
  554. """Wait for a handle.
  555. Return a Future object. The result of the future is True if the wait
  556. completed, or False if the wait did not complete (on timeout).
  557. """
  558. return self._wait_for_handle(handle, timeout, False)
  559. def _wait_cancel(self, event, done_callback):
  560. fut = self._wait_for_handle(event, None, True)
  561. # add_done_callback() cannot be used because the wait may only complete
  562. # in IocpProactor.close(), while the event loop is not running.
  563. fut._done_callback = done_callback
  564. return fut
  565. def _wait_for_handle(self, handle, timeout, _is_cancel):
  566. self._check_closed()
  567. if timeout is None:
  568. ms = _winapi.INFINITE
  569. else:
  570. # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
  571. # round away from zero to wait *at least* timeout seconds.
  572. ms = math.ceil(timeout * 1e3)
  573. # We only create ov so we can use ov.address as a key for the cache.
  574. ov = _overlapped.Overlapped(NULL)
  575. wait_handle = _overlapped.RegisterWaitWithQueue(
  576. handle, self._iocp, ov.address, ms)
  577. if _is_cancel:
  578. f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
  579. else:
  580. f = _WaitHandleFuture(ov, handle, wait_handle, self,
  581. loop=self._loop)
  582. if f._source_traceback:
  583. del f._source_traceback[-1]
  584. def finish_wait_for_handle(trans, key, ov):
  585. # Note that this second wait means that we should only use
  586. # this with handles types where a successful wait has no
  587. # effect. So events or processes are all right, but locks
  588. # or semaphores are not. Also note if the handle is
  589. # signalled and then quickly reset, then we may return
  590. # False even though we have not timed out.
  591. return f._poll()
  592. self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
  593. return f
  594. def _register_with_iocp(self, obj):
  595. # To get notifications of finished ops on this objects sent to the
  596. # completion port, were must register the handle.
  597. if obj not in self._registered:
  598. self._registered.add(obj)
  599. _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
  600. # XXX We could also use SetFileCompletionNotificationModes()
  601. # to avoid sending notifications to completion port of ops
  602. # that succeed immediately.
  603. def _register(self, ov, obj, callback):
  604. self._check_closed()
  605. # Return a future which will be set with the result of the
  606. # operation when it completes. The future's value is actually
  607. # the value returned by callback().
  608. f = _OverlappedFuture(ov, loop=self._loop)
  609. if f._source_traceback:
  610. del f._source_traceback[-1]
  611. if not ov.pending:
  612. # The operation has completed, so no need to postpone the
  613. # work. We cannot take this short cut if we need the
  614. # NumberOfBytes, CompletionKey values returned by
  615. # PostQueuedCompletionStatus().
  616. try:
  617. value = callback(None, None, ov)
  618. except OSError as e:
  619. f.set_exception(e)
  620. else:
  621. f.set_result(value)
  622. # Even if GetOverlappedResult() was called, we have to wait for the
  623. # notification of the completion in GetQueuedCompletionStatus().
  624. # Register the overlapped operation to keep a reference to the
  625. # OVERLAPPED object, otherwise the memory is freed and Windows may
  626. # read uninitialized memory.
  627. # Register the overlapped operation for later. Note that
  628. # we only store obj to prevent it from being garbage
  629. # collected too early.
  630. self._cache[ov.address] = (f, ov, obj, callback)
  631. return f
  632. def _unregister(self, ov):
  633. """Unregister an overlapped object.
  634. Call this method when its future has been cancelled. The event can
  635. already be signalled (pending in the proactor event queue). It is also
  636. safe if the event is never signalled (because it was cancelled).
  637. """
  638. self._check_closed()
  639. self._unregistered.append(ov)
  640. def _get_accept_socket(self, family):
  641. s = socket.socket(family)
  642. s.settimeout(0)
  643. return s
  644. def _poll(self, timeout=None):
  645. if timeout is None:
  646. ms = INFINITE
  647. elif timeout < 0:
  648. raise ValueError("negative timeout")
  649. else:
  650. # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
  651. # round away from zero to wait *at least* timeout seconds.
  652. ms = math.ceil(timeout * 1e3)
  653. if ms >= INFINITE:
  654. raise ValueError("timeout too big")
  655. while True:
  656. status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
  657. if status is None:
  658. break
  659. ms = 0
  660. err, transferred, key, address = status
  661. try:
  662. f, ov, obj, callback = self._cache.pop(address)
  663. except KeyError:
  664. if self._loop.get_debug():
  665. self._loop.call_exception_handler({
  666. 'message': ('GetQueuedCompletionStatus() returned an '
  667. 'unexpected event'),
  668. 'status': ('err=%s transferred=%s key=%#x address=%#x'
  669. % (err, transferred, key, address)),
  670. })
  671. # key is either zero, or it is used to return a pipe
  672. # handle which should be closed to avoid a leak.
  673. if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
  674. _winapi.CloseHandle(key)
  675. continue
  676. if obj in self._stopped_serving:
  677. f.cancel()
  678. # Don't call the callback if _register() already read the result or
  679. # if the overlapped has been cancelled
  680. elif not f.done():
  681. try:
  682. value = callback(transferred, key, ov)
  683. except OSError as e:
  684. f.set_exception(e)
  685. self._results.append(f)
  686. else:
  687. f.set_result(value)
  688. self._results.append(f)
  689. # Remove unregistered futures
  690. for ov in self._unregistered:
  691. self._cache.pop(ov.address, None)
  692. self._unregistered.clear()
  693. def _stop_serving(self, obj):
  694. # obj is a socket or pipe handle. It will be closed in
  695. # BaseProactorEventLoop._stop_serving() which will make any
  696. # pending operations fail quickly.
  697. self._stopped_serving.add(obj)
  698. def close(self):
  699. if self._iocp is None:
  700. # already closed
  701. return
  702. # Cancel remaining registered operations.
  703. for address, (fut, ov, obj, callback) in list(self._cache.items()):
  704. if fut.cancelled():
  705. # Nothing to do with cancelled futures
  706. pass
  707. elif isinstance(fut, _WaitCancelFuture):
  708. # _WaitCancelFuture must not be cancelled
  709. pass
  710. else:
  711. try:
  712. fut.cancel()
  713. except OSError as exc:
  714. if self._loop is not None:
  715. context = {
  716. 'message': 'Cancelling a future failed',
  717. 'exception': exc,
  718. 'future': fut,
  719. }
  720. if fut._source_traceback:
  721. context['source_traceback'] = fut._source_traceback
  722. self._loop.call_exception_handler(context)
  723. # Wait until all cancelled overlapped complete: don't exit with running
  724. # overlapped to prevent a crash. Display progress every second if the
  725. # loop is still running.
  726. msg_update = 1.0
  727. start_time = time.monotonic()
  728. next_msg = start_time + msg_update
  729. while self._cache:
  730. if next_msg <= time.monotonic():
  731. logger.debug('%r is running after closing for %.1f seconds',
  732. self, time.monotonic() - start_time)
  733. next_msg = time.monotonic() + msg_update
  734. # handle a few events, or timeout
  735. self._poll(msg_update)
  736. self._results = []
  737. _winapi.CloseHandle(self._iocp)
  738. self._iocp = None
  739. def __del__(self):
  740. self.close()
  741. class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  742. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  743. self._proc = windows_utils.Popen(
  744. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  745. bufsize=bufsize, **kwargs)
  746. def callback(f):
  747. returncode = self._proc.poll()
  748. self._process_exited(returncode)
  749. f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
  750. f.add_done_callback(callback)
  751. SelectorEventLoop = _WindowsSelectorEventLoop
  752. class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  753. _loop_factory = SelectorEventLoop
  754. class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  755. _loop_factory = ProactorEventLoop
  756. DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy