logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

selector_events.py (39502B)


  1. """Event loop using a selector and related classes.
  2. A selector is a "notify-when-ready" multiplexer. For a subclass which
  3. also includes support for signal handling, see the unix_events sub-module.
  4. """
  5. __all__ = 'BaseSelectorEventLoop',
  6. import collections
  7. import errno
  8. import functools
  9. import selectors
  10. import socket
  11. import warnings
  12. import weakref
  13. try:
  14. import ssl
  15. except ImportError: # pragma: no cover
  16. ssl = None
  17. from . import base_events
  18. from . import constants
  19. from . import events
  20. from . import futures
  21. from . import protocols
  22. from . import sslproto
  23. from . import transports
  24. from . import trsock
  25. from .log import logger
  26. def _test_selector_event(selector, fd, event):
  27. # Test if the selector is monitoring 'event' events
  28. # for the file descriptor 'fd'.
  29. try:
  30. key = selector.get_key(fd)
  31. except KeyError:
  32. return False
  33. else:
  34. return bool(key.events & event)
  35. def _check_ssl_socket(sock):
  36. if ssl is not None and isinstance(sock, ssl.SSLSocket):
  37. raise TypeError("Socket cannot be of type SSLSocket")
  38. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  39. """Selector event loop.
  40. See events.EventLoop for API specification.
  41. """
  42. def __init__(self, selector=None):
  43. super().__init__()
  44. if selector is None:
  45. selector = selectors.DefaultSelector()
  46. logger.debug('Using selector: %s', selector.__class__.__name__)
  47. self._selector = selector
  48. self._make_self_pipe()
  49. self._transports = weakref.WeakValueDictionary()
  50. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  51. extra=None, server=None):
  52. return _SelectorSocketTransport(self, sock, protocol, waiter,
  53. extra, server)
  54. def _make_ssl_transport(
  55. self, rawsock, protocol, sslcontext, waiter=None,
  56. *, server_side=False, server_hostname=None,
  57. extra=None, server=None,
  58. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  59. ssl_protocol = sslproto.SSLProtocol(
  60. self, protocol, sslcontext, waiter,
  61. server_side, server_hostname,
  62. ssl_handshake_timeout=ssl_handshake_timeout)
  63. _SelectorSocketTransport(self, rawsock, ssl_protocol,
  64. extra=extra, server=server)
  65. return ssl_protocol._app_transport
  66. def _make_datagram_transport(self, sock, protocol,
  67. address=None, waiter=None, extra=None):
  68. return _SelectorDatagramTransport(self, sock, protocol,
  69. address, waiter, extra)
  70. def close(self):
  71. if self.is_running():
  72. raise RuntimeError("Cannot close a running event loop")
  73. if self.is_closed():
  74. return
  75. self._close_self_pipe()
  76. super().close()
  77. if self._selector is not None:
  78. self._selector.close()
  79. self._selector = None
  80. def _close_self_pipe(self):
  81. self._remove_reader(self._ssock.fileno())
  82. self._ssock.close()
  83. self._ssock = None
  84. self._csock.close()
  85. self._csock = None
  86. self._internal_fds -= 1
  87. def _make_self_pipe(self):
  88. # A self-socket, really. :-)
  89. self._ssock, self._csock = socket.socketpair()
  90. self._ssock.setblocking(False)
  91. self._csock.setblocking(False)
  92. self._internal_fds += 1
  93. self._add_reader(self._ssock.fileno(), self._read_from_self)
  94. def _process_self_data(self, data):
  95. pass
  96. def _read_from_self(self):
  97. while True:
  98. try:
  99. data = self._ssock.recv(4096)
  100. if not data:
  101. break
  102. self._process_self_data(data)
  103. except InterruptedError:
  104. continue
  105. except BlockingIOError:
  106. break
  107. def _write_to_self(self):
  108. # This may be called from a different thread, possibly after
  109. # _close_self_pipe() has been called or even while it is
  110. # running. Guard for self._csock being None or closed. When
  111. # a socket is closed, send() raises OSError (with errno set to
  112. # EBADF, but let's not rely on the exact error code).
  113. csock = self._csock
  114. if csock is None:
  115. return
  116. try:
  117. csock.send(b'\0')
  118. except OSError:
  119. if self._debug:
  120. logger.debug("Fail to write a null byte into the "
  121. "self-pipe socket",
  122. exc_info=True)
  123. def _start_serving(self, protocol_factory, sock,
  124. sslcontext=None, server=None, backlog=100,
  125. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  126. self._add_reader(sock.fileno(), self._accept_connection,
  127. protocol_factory, sock, sslcontext, server, backlog,
  128. ssl_handshake_timeout)
  129. def _accept_connection(
  130. self, protocol_factory, sock,
  131. sslcontext=None, server=None, backlog=100,
  132. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  133. # This method is only called once for each event loop tick where the
  134. # listening socket has triggered an EVENT_READ. There may be multiple
  135. # connections waiting for an .accept() so it is called in a loop.
  136. # See https://bugs.python.org/issue27906 for more details.
  137. for _ in range(backlog):
  138. try:
  139. conn, addr = sock.accept()
  140. if self._debug:
  141. logger.debug("%r got a new connection from %r: %r",
  142. server, addr, conn)
  143. conn.setblocking(False)
  144. except (BlockingIOError, InterruptedError, ConnectionAbortedError):
  145. # Early exit because the socket accept buffer is empty.
  146. return None
  147. except OSError as exc:
  148. # There's nowhere to send the error, so just log it.
  149. if exc.errno in (errno.EMFILE, errno.ENFILE,
  150. errno.ENOBUFS, errno.ENOMEM):
  151. # Some platforms (e.g. Linux keep reporting the FD as
  152. # ready, so we remove the read handler temporarily.
  153. # We'll try again in a while.
  154. self.call_exception_handler({
  155. 'message': 'socket.accept() out of system resource',
  156. 'exception': exc,
  157. 'socket': trsock.TransportSocket(sock),
  158. })
  159. self._remove_reader(sock.fileno())
  160. self.call_later(constants.ACCEPT_RETRY_DELAY,
  161. self._start_serving,
  162. protocol_factory, sock, sslcontext, server,
  163. backlog, ssl_handshake_timeout)
  164. else:
  165. raise # The event loop will catch, log and ignore it.
  166. else:
  167. extra = {'peername': addr}
  168. accept = self._accept_connection2(
  169. protocol_factory, conn, extra, sslcontext, server,
  170. ssl_handshake_timeout)
  171. self.create_task(accept)
  172. async def _accept_connection2(
  173. self, protocol_factory, conn, extra,
  174. sslcontext=None, server=None,
  175. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  176. protocol = None
  177. transport = None
  178. try:
  179. protocol = protocol_factory()
  180. waiter = self.create_future()
  181. if sslcontext:
  182. transport = self._make_ssl_transport(
  183. conn, protocol, sslcontext, waiter=waiter,
  184. server_side=True, extra=extra, server=server,
  185. ssl_handshake_timeout=ssl_handshake_timeout)
  186. else:
  187. transport = self._make_socket_transport(
  188. conn, protocol, waiter=waiter, extra=extra,
  189. server=server)
  190. try:
  191. await waiter
  192. except BaseException:
  193. transport.close()
  194. raise
  195. # It's now up to the protocol to handle the connection.
  196. except (SystemExit, KeyboardInterrupt):
  197. raise
  198. except BaseException as exc:
  199. if self._debug:
  200. context = {
  201. 'message':
  202. 'Error on transport creation for incoming connection',
  203. 'exception': exc,
  204. }
  205. if protocol is not None:
  206. context['protocol'] = protocol
  207. if transport is not None:
  208. context['transport'] = transport
  209. self.call_exception_handler(context)
  210. def _ensure_fd_no_transport(self, fd):
  211. fileno = fd
  212. if not isinstance(fileno, int):
  213. try:
  214. fileno = int(fileno.fileno())
  215. except (AttributeError, TypeError, ValueError):
  216. # This code matches selectors._fileobj_to_fd function.
  217. raise ValueError(f"Invalid file object: {fd!r}") from None
  218. try:
  219. transport = self._transports[fileno]
  220. except KeyError:
  221. pass
  222. else:
  223. if not transport.is_closing():
  224. raise RuntimeError(
  225. f'File descriptor {fd!r} is used by transport '
  226. f'{transport!r}')
  227. def _add_reader(self, fd, callback, *args):
  228. self._check_closed()
  229. handle = events.Handle(callback, args, self, None)
  230. try:
  231. key = self._selector.get_key(fd)
  232. except KeyError:
  233. self._selector.register(fd, selectors.EVENT_READ,
  234. (handle, None))
  235. else:
  236. mask, (reader, writer) = key.events, key.data
  237. self._selector.modify(fd, mask | selectors.EVENT_READ,
  238. (handle, writer))
  239. if reader is not None:
  240. reader.cancel()
  241. return handle
  242. def _remove_reader(self, fd):
  243. if self.is_closed():
  244. return False
  245. try:
  246. key = self._selector.get_key(fd)
  247. except KeyError:
  248. return False
  249. else:
  250. mask, (reader, writer) = key.events, key.data
  251. mask &= ~selectors.EVENT_READ
  252. if not mask:
  253. self._selector.unregister(fd)
  254. else:
  255. self._selector.modify(fd, mask, (None, writer))
  256. if reader is not None:
  257. reader.cancel()
  258. return True
  259. else:
  260. return False
  261. def _add_writer(self, fd, callback, *args):
  262. self._check_closed()
  263. handle = events.Handle(callback, args, self, None)
  264. try:
  265. key = self._selector.get_key(fd)
  266. except KeyError:
  267. self._selector.register(fd, selectors.EVENT_WRITE,
  268. (None, handle))
  269. else:
  270. mask, (reader, writer) = key.events, key.data
  271. self._selector.modify(fd, mask | selectors.EVENT_WRITE,
  272. (reader, handle))
  273. if writer is not None:
  274. writer.cancel()
  275. return handle
  276. def _remove_writer(self, fd):
  277. """Remove a writer callback."""
  278. if self.is_closed():
  279. return False
  280. try:
  281. key = self._selector.get_key(fd)
  282. except KeyError:
  283. return False
  284. else:
  285. mask, (reader, writer) = key.events, key.data
  286. # Remove both writer and connector.
  287. mask &= ~selectors.EVENT_WRITE
  288. if not mask:
  289. self._selector.unregister(fd)
  290. else:
  291. self._selector.modify(fd, mask, (reader, None))
  292. if writer is not None:
  293. writer.cancel()
  294. return True
  295. else:
  296. return False
  297. def add_reader(self, fd, callback, *args):
  298. """Add a reader callback."""
  299. self._ensure_fd_no_transport(fd)
  300. self._add_reader(fd, callback, *args)
  301. def remove_reader(self, fd):
  302. """Remove a reader callback."""
  303. self._ensure_fd_no_transport(fd)
  304. return self._remove_reader(fd)
  305. def add_writer(self, fd, callback, *args):
  306. """Add a writer callback.."""
  307. self._ensure_fd_no_transport(fd)
  308. self._add_writer(fd, callback, *args)
  309. def remove_writer(self, fd):
  310. """Remove a writer callback."""
  311. self._ensure_fd_no_transport(fd)
  312. return self._remove_writer(fd)
  313. async def sock_recv(self, sock, n):
  314. """Receive data from the socket.
  315. The return value is a bytes object representing the data received.
  316. The maximum amount of data to be received at once is specified by
  317. nbytes.
  318. """
  319. _check_ssl_socket(sock)
  320. if self._debug and sock.gettimeout() != 0:
  321. raise ValueError("the socket must be non-blocking")
  322. try:
  323. return sock.recv(n)
  324. except (BlockingIOError, InterruptedError):
  325. pass
  326. fut = self.create_future()
  327. fd = sock.fileno()
  328. self._ensure_fd_no_transport(fd)
  329. handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
  330. fut.add_done_callback(
  331. functools.partial(self._sock_read_done, fd, handle=handle))
  332. return await fut
  333. def _sock_read_done(self, fd, fut, handle=None):
  334. if handle is None or not handle.cancelled():
  335. self.remove_reader(fd)
  336. def _sock_recv(self, fut, sock, n):
  337. # _sock_recv() can add itself as an I/O callback if the operation can't
  338. # be done immediately. Don't use it directly, call sock_recv().
  339. if fut.done():
  340. return
  341. try:
  342. data = sock.recv(n)
  343. except (BlockingIOError, InterruptedError):
  344. return # try again next time
  345. except (SystemExit, KeyboardInterrupt):
  346. raise
  347. except BaseException as exc:
  348. fut.set_exception(exc)
  349. else:
  350. fut.set_result(data)
  351. async def sock_recv_into(self, sock, buf):
  352. """Receive data from the socket.
  353. The received data is written into *buf* (a writable buffer).
  354. The return value is the number of bytes written.
  355. """
  356. _check_ssl_socket(sock)
  357. if self._debug and sock.gettimeout() != 0:
  358. raise ValueError("the socket must be non-blocking")
  359. try:
  360. return sock.recv_into(buf)
  361. except (BlockingIOError, InterruptedError):
  362. pass
  363. fut = self.create_future()
  364. fd = sock.fileno()
  365. self._ensure_fd_no_transport(fd)
  366. handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
  367. fut.add_done_callback(
  368. functools.partial(self._sock_read_done, fd, handle=handle))
  369. return await fut
  370. def _sock_recv_into(self, fut, sock, buf):
  371. # _sock_recv_into() can add itself as an I/O callback if the operation
  372. # can't be done immediately. Don't use it directly, call
  373. # sock_recv_into().
  374. if fut.done():
  375. return
  376. try:
  377. nbytes = sock.recv_into(buf)
  378. except (BlockingIOError, InterruptedError):
  379. return # try again next time
  380. except (SystemExit, KeyboardInterrupt):
  381. raise
  382. except BaseException as exc:
  383. fut.set_exception(exc)
  384. else:
  385. fut.set_result(nbytes)
  386. async def sock_sendall(self, sock, data):
  387. """Send data to the socket.
  388. The socket must be connected to a remote socket. This method continues
  389. to send data from data until either all data has been sent or an
  390. error occurs. None is returned on success. On error, an exception is
  391. raised, and there is no way to determine how much data, if any, was
  392. successfully processed by the receiving end of the connection.
  393. """
  394. _check_ssl_socket(sock)
  395. if self._debug and sock.gettimeout() != 0:
  396. raise ValueError("the socket must be non-blocking")
  397. try:
  398. n = sock.send(data)
  399. except (BlockingIOError, InterruptedError):
  400. n = 0
  401. if n == len(data):
  402. # all data sent
  403. return
  404. fut = self.create_future()
  405. fd = sock.fileno()
  406. self._ensure_fd_no_transport(fd)
  407. # use a trick with a list in closure to store a mutable state
  408. handle = self._add_writer(fd, self._sock_sendall, fut, sock,
  409. memoryview(data), [n])
  410. fut.add_done_callback(
  411. functools.partial(self._sock_write_done, fd, handle=handle))
  412. return await fut
  413. def _sock_sendall(self, fut, sock, view, pos):
  414. if fut.done():
  415. # Future cancellation can be scheduled on previous loop iteration
  416. return
  417. start = pos[0]
  418. try:
  419. n = sock.send(view[start:])
  420. except (BlockingIOError, InterruptedError):
  421. return
  422. except (SystemExit, KeyboardInterrupt):
  423. raise
  424. except BaseException as exc:
  425. fut.set_exception(exc)
  426. return
  427. start += n
  428. if start == len(view):
  429. fut.set_result(None)
  430. else:
  431. pos[0] = start
  432. async def sock_connect(self, sock, address):
  433. """Connect to a remote socket at address.
  434. This method is a coroutine.
  435. """
  436. _check_ssl_socket(sock)
  437. if self._debug and sock.gettimeout() != 0:
  438. raise ValueError("the socket must be non-blocking")
  439. if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
  440. resolved = await self._ensure_resolved(
  441. address, family=sock.family, proto=sock.proto, loop=self)
  442. _, _, _, _, address = resolved[0]
  443. fut = self.create_future()
  444. self._sock_connect(fut, sock, address)
  445. return await fut
  446. def _sock_connect(self, fut, sock, address):
  447. fd = sock.fileno()
  448. try:
  449. sock.connect(address)
  450. except (BlockingIOError, InterruptedError):
  451. # Issue #23618: When the C function connect() fails with EINTR, the
  452. # connection runs in background. We have to wait until the socket
  453. # becomes writable to be notified when the connection succeed or
  454. # fails.
  455. self._ensure_fd_no_transport(fd)
  456. handle = self._add_writer(
  457. fd, self._sock_connect_cb, fut, sock, address)
  458. fut.add_done_callback(
  459. functools.partial(self._sock_write_done, fd, handle=handle))
  460. except (SystemExit, KeyboardInterrupt):
  461. raise
  462. except BaseException as exc:
  463. fut.set_exception(exc)
  464. else:
  465. fut.set_result(None)
  466. def _sock_write_done(self, fd, fut, handle=None):
  467. if handle is None or not handle.cancelled():
  468. self.remove_writer(fd)
  469. def _sock_connect_cb(self, fut, sock, address):
  470. if fut.done():
  471. return
  472. try:
  473. err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  474. if err != 0:
  475. # Jump to any except clause below.
  476. raise OSError(err, f'Connect call failed {address}')
  477. except (BlockingIOError, InterruptedError):
  478. # socket is still registered, the callback will be retried later
  479. pass
  480. except (SystemExit, KeyboardInterrupt):
  481. raise
  482. except BaseException as exc:
  483. fut.set_exception(exc)
  484. else:
  485. fut.set_result(None)
  486. async def sock_accept(self, sock):
  487. """Accept a connection.
  488. The socket must be bound to an address and listening for connections.
  489. The return value is a pair (conn, address) where conn is a new socket
  490. object usable to send and receive data on the connection, and address
  491. is the address bound to the socket on the other end of the connection.
  492. """
  493. _check_ssl_socket(sock)
  494. if self._debug and sock.gettimeout() != 0:
  495. raise ValueError("the socket must be non-blocking")
  496. fut = self.create_future()
  497. self._sock_accept(fut, sock)
  498. return await fut
  499. def _sock_accept(self, fut, sock):
  500. fd = sock.fileno()
  501. try:
  502. conn, address = sock.accept()
  503. conn.setblocking(False)
  504. except (BlockingIOError, InterruptedError):
  505. self._ensure_fd_no_transport(fd)
  506. handle = self._add_reader(fd, self._sock_accept, fut, sock)
  507. fut.add_done_callback(
  508. functools.partial(self._sock_read_done, fd, handle=handle))
  509. except (SystemExit, KeyboardInterrupt):
  510. raise
  511. except BaseException as exc:
  512. fut.set_exception(exc)
  513. else:
  514. fut.set_result((conn, address))
  515. async def _sendfile_native(self, transp, file, offset, count):
  516. del self._transports[transp._sock_fd]
  517. resume_reading = transp.is_reading()
  518. transp.pause_reading()
  519. await transp._make_empty_waiter()
  520. try:
  521. return await self.sock_sendfile(transp._sock, file, offset, count,
  522. fallback=False)
  523. finally:
  524. transp._reset_empty_waiter()
  525. if resume_reading:
  526. transp.resume_reading()
  527. self._transports[transp._sock_fd] = transp
  528. def _process_events(self, event_list):
  529. for key, mask in event_list:
  530. fileobj, (reader, writer) = key.fileobj, key.data
  531. if mask & selectors.EVENT_READ and reader is not None:
  532. if reader._cancelled:
  533. self._remove_reader(fileobj)
  534. else:
  535. self._add_callback(reader)
  536. if mask & selectors.EVENT_WRITE and writer is not None:
  537. if writer._cancelled:
  538. self._remove_writer(fileobj)
  539. else:
  540. self._add_callback(writer)
  541. def _stop_serving(self, sock):
  542. self._remove_reader(sock.fileno())
  543. sock.close()
  544. class _SelectorTransport(transports._FlowControlMixin,
  545. transports.Transport):
  546. max_size = 256 * 1024 # Buffer size passed to recv().
  547. _buffer_factory = bytearray # Constructs initial value for self._buffer.
  548. # Attribute used in the destructor: it must be set even if the constructor
  549. # is not called (see _SelectorSslTransport which may start by raising an
  550. # exception)
  551. _sock = None
  552. def __init__(self, loop, sock, protocol, extra=None, server=None):
  553. super().__init__(extra, loop)
  554. self._extra['socket'] = trsock.TransportSocket(sock)
  555. try:
  556. self._extra['sockname'] = sock.getsockname()
  557. except OSError:
  558. self._extra['sockname'] = None
  559. if 'peername' not in self._extra:
  560. try:
  561. self._extra['peername'] = sock.getpeername()
  562. except socket.error:
  563. self._extra['peername'] = None
  564. self._sock = sock
  565. self._sock_fd = sock.fileno()
  566. self._protocol_connected = False
  567. self.set_protocol(protocol)
  568. self._server = server
  569. self._buffer = self._buffer_factory()
  570. self._conn_lost = 0 # Set when call to connection_lost scheduled.
  571. self._closing = False # Set when close() called.
  572. if self._server is not None:
  573. self._server._attach()
  574. loop._transports[self._sock_fd] = self
  575. def __repr__(self):
  576. info = [self.__class__.__name__]
  577. if self._sock is None:
  578. info.append('closed')
  579. elif self._closing:
  580. info.append('closing')
  581. info.append(f'fd={self._sock_fd}')
  582. # test if the transport was closed
  583. if self._loop is not None and not self._loop.is_closed():
  584. polling = _test_selector_event(self._loop._selector,
  585. self._sock_fd, selectors.EVENT_READ)
  586. if polling:
  587. info.append('read=polling')
  588. else:
  589. info.append('read=idle')
  590. polling = _test_selector_event(self._loop._selector,
  591. self._sock_fd,
  592. selectors.EVENT_WRITE)
  593. if polling:
  594. state = 'polling'
  595. else:
  596. state = 'idle'
  597. bufsize = self.get_write_buffer_size()
  598. info.append(f'write=<{state}, bufsize={bufsize}>')
  599. return '<{}>'.format(' '.join(info))
  600. def abort(self):
  601. self._force_close(None)
  602. def set_protocol(self, protocol):
  603. self._protocol = protocol
  604. self._protocol_connected = True
  605. def get_protocol(self):
  606. return self._protocol
  607. def is_closing(self):
  608. return self._closing
  609. def close(self):
  610. if self._closing:
  611. return
  612. self._closing = True
  613. self._loop._remove_reader(self._sock_fd)
  614. if not self._buffer:
  615. self._conn_lost += 1
  616. self._loop._remove_writer(self._sock_fd)
  617. self._loop.call_soon(self._call_connection_lost, None)
  618. def __del__(self, _warn=warnings.warn):
  619. if self._sock is not None:
  620. _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  621. self._sock.close()
  622. def _fatal_error(self, exc, message='Fatal error on transport'):
  623. # Should be called from exception handler only.
  624. if isinstance(exc, OSError):
  625. if self._loop.get_debug():
  626. logger.debug("%r: %s", self, message, exc_info=True)
  627. else:
  628. self._loop.call_exception_handler({
  629. 'message': message,
  630. 'exception': exc,
  631. 'transport': self,
  632. 'protocol': self._protocol,
  633. })
  634. self._force_close(exc)
  635. def _force_close(self, exc):
  636. if self._conn_lost:
  637. return
  638. if self._buffer:
  639. self._buffer.clear()
  640. self._loop._remove_writer(self._sock_fd)
  641. if not self._closing:
  642. self._closing = True
  643. self._loop._remove_reader(self._sock_fd)
  644. self._conn_lost += 1
  645. self._loop.call_soon(self._call_connection_lost, exc)
  646. def _call_connection_lost(self, exc):
  647. try:
  648. if self._protocol_connected:
  649. self._protocol.connection_lost(exc)
  650. finally:
  651. self._sock.close()
  652. self._sock = None
  653. self._protocol = None
  654. self._loop = None
  655. server = self._server
  656. if server is not None:
  657. server._detach()
  658. self._server = None
  659. def get_write_buffer_size(self):
  660. return len(self._buffer)
  661. def _add_reader(self, fd, callback, *args):
  662. if self._closing:
  663. return
  664. self._loop._add_reader(fd, callback, *args)
  665. class _SelectorSocketTransport(_SelectorTransport):
  666. _start_tls_compatible = True
  667. _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
  668. def __init__(self, loop, sock, protocol, waiter=None,
  669. extra=None, server=None):
  670. self._read_ready_cb = None
  671. super().__init__(loop, sock, protocol, extra, server)
  672. self._eof = False
  673. self._paused = False
  674. self._empty_waiter = None
  675. # Disable the Nagle algorithm -- small writes will be
  676. # sent without waiting for the TCP ACK. This generally
  677. # decreases the latency (in some cases significantly.)
  678. base_events._set_nodelay(self._sock)
  679. self._loop.call_soon(self._protocol.connection_made, self)
  680. # only start reading when connection_made() has been called
  681. self._loop.call_soon(self._add_reader,
  682. self._sock_fd, self._read_ready)
  683. if waiter is not None:
  684. # only wake up the waiter when connection_made() has been called
  685. self._loop.call_soon(futures._set_result_unless_cancelled,
  686. waiter, None)
  687. def set_protocol(self, protocol):
  688. if isinstance(protocol, protocols.BufferedProtocol):
  689. self._read_ready_cb = self._read_ready__get_buffer
  690. else:
  691. self._read_ready_cb = self._read_ready__data_received
  692. super().set_protocol(protocol)
  693. def is_reading(self):
  694. return not self._paused and not self._closing
  695. def pause_reading(self):
  696. if self._closing or self._paused:
  697. return
  698. self._paused = True
  699. self._loop._remove_reader(self._sock_fd)
  700. if self._loop.get_debug():
  701. logger.debug("%r pauses reading", self)
  702. def resume_reading(self):
  703. if self._closing or not self._paused:
  704. return
  705. self._paused = False
  706. self._add_reader(self._sock_fd, self._read_ready)
  707. if self._loop.get_debug():
  708. logger.debug("%r resumes reading", self)
  709. def _read_ready(self):
  710. self._read_ready_cb()
  711. def _read_ready__get_buffer(self):
  712. if self._conn_lost:
  713. return
  714. try:
  715. buf = self._protocol.get_buffer(-1)
  716. if not len(buf):
  717. raise RuntimeError('get_buffer() returned an empty buffer')
  718. except (SystemExit, KeyboardInterrupt):
  719. raise
  720. except BaseException as exc:
  721. self._fatal_error(
  722. exc, 'Fatal error: protocol.get_buffer() call failed.')
  723. return
  724. try:
  725. nbytes = self._sock.recv_into(buf)
  726. except (BlockingIOError, InterruptedError):
  727. return
  728. except (SystemExit, KeyboardInterrupt):
  729. raise
  730. except BaseException as exc:
  731. self._fatal_error(exc, 'Fatal read error on socket transport')
  732. return
  733. if not nbytes:
  734. self._read_ready__on_eof()
  735. return
  736. try:
  737. self._protocol.buffer_updated(nbytes)
  738. except (SystemExit, KeyboardInterrupt):
  739. raise
  740. except BaseException as exc:
  741. self._fatal_error(
  742. exc, 'Fatal error: protocol.buffer_updated() call failed.')
  743. def _read_ready__data_received(self):
  744. if self._conn_lost:
  745. return
  746. try:
  747. data = self._sock.recv(self.max_size)
  748. except (BlockingIOError, InterruptedError):
  749. return
  750. except (SystemExit, KeyboardInterrupt):
  751. raise
  752. except BaseException as exc:
  753. self._fatal_error(exc, 'Fatal read error on socket transport')
  754. return
  755. if not data:
  756. self._read_ready__on_eof()
  757. return
  758. try:
  759. self._protocol.data_received(data)
  760. except (SystemExit, KeyboardInterrupt):
  761. raise
  762. except BaseException as exc:
  763. self._fatal_error(
  764. exc, 'Fatal error: protocol.data_received() call failed.')
  765. def _read_ready__on_eof(self):
  766. if self._loop.get_debug():
  767. logger.debug("%r received EOF", self)
  768. try:
  769. keep_open = self._protocol.eof_received()
  770. except (SystemExit, KeyboardInterrupt):
  771. raise
  772. except BaseException as exc:
  773. self._fatal_error(
  774. exc, 'Fatal error: protocol.eof_received() call failed.')
  775. return
  776. if keep_open:
  777. # We're keeping the connection open so the
  778. # protocol can write more, but we still can't
  779. # receive more, so remove the reader callback.
  780. self._loop._remove_reader(self._sock_fd)
  781. else:
  782. self.close()
  783. def write(self, data):
  784. if not isinstance(data, (bytes, bytearray, memoryview)):
  785. raise TypeError(f'data argument must be a bytes-like object, '
  786. f'not {type(data).__name__!r}')
  787. if self._eof:
  788. raise RuntimeError('Cannot call write() after write_eof()')
  789. if self._empty_waiter is not None:
  790. raise RuntimeError('unable to write; sendfile is in progress')
  791. if not data:
  792. return
  793. if self._conn_lost:
  794. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  795. logger.warning('socket.send() raised exception.')
  796. self._conn_lost += 1
  797. return
  798. if not self._buffer:
  799. # Optimization: try to send now.
  800. try:
  801. n = self._sock.send(data)
  802. except (BlockingIOError, InterruptedError):
  803. pass
  804. except (SystemExit, KeyboardInterrupt):
  805. raise
  806. except BaseException as exc:
  807. self._fatal_error(exc, 'Fatal write error on socket transport')
  808. return
  809. else:
  810. data = data[n:]
  811. if not data:
  812. return
  813. # Not all was written; register write handler.
  814. self._loop._add_writer(self._sock_fd, self._write_ready)
  815. # Add it to the buffer.
  816. self._buffer.extend(data)
  817. self._maybe_pause_protocol()
  818. def _write_ready(self):
  819. assert self._buffer, 'Data should not be empty'
  820. if self._conn_lost:
  821. return
  822. try:
  823. n = self._sock.send(self._buffer)
  824. except (BlockingIOError, InterruptedError):
  825. pass
  826. except (SystemExit, KeyboardInterrupt):
  827. raise
  828. except BaseException as exc:
  829. self._loop._remove_writer(self._sock_fd)
  830. self._buffer.clear()
  831. self._fatal_error(exc, 'Fatal write error on socket transport')
  832. if self._empty_waiter is not None:
  833. self._empty_waiter.set_exception(exc)
  834. else:
  835. if n:
  836. del self._buffer[:n]
  837. self._maybe_resume_protocol() # May append to buffer.
  838. if not self._buffer:
  839. self._loop._remove_writer(self._sock_fd)
  840. if self._empty_waiter is not None:
  841. self._empty_waiter.set_result(None)
  842. if self._closing:
  843. self._call_connection_lost(None)
  844. elif self._eof:
  845. self._sock.shutdown(socket.SHUT_WR)
  846. def write_eof(self):
  847. if self._closing or self._eof:
  848. return
  849. self._eof = True
  850. if not self._buffer:
  851. self._sock.shutdown(socket.SHUT_WR)
  852. def can_write_eof(self):
  853. return True
  854. def _call_connection_lost(self, exc):
  855. super()._call_connection_lost(exc)
  856. if self._empty_waiter is not None:
  857. self._empty_waiter.set_exception(
  858. ConnectionError("Connection is closed by peer"))
  859. def _make_empty_waiter(self):
  860. if self._empty_waiter is not None:
  861. raise RuntimeError("Empty waiter is already set")
  862. self._empty_waiter = self._loop.create_future()
  863. if not self._buffer:
  864. self._empty_waiter.set_result(None)
  865. return self._empty_waiter
  866. def _reset_empty_waiter(self):
  867. self._empty_waiter = None
  868. class _SelectorDatagramTransport(_SelectorTransport):
  869. _buffer_factory = collections.deque
  870. def __init__(self, loop, sock, protocol, address=None,
  871. waiter=None, extra=None):
  872. super().__init__(loop, sock, protocol, extra)
  873. self._address = address
  874. self._loop.call_soon(self._protocol.connection_made, self)
  875. # only start reading when connection_made() has been called
  876. self._loop.call_soon(self._add_reader,
  877. self._sock_fd, self._read_ready)
  878. if waiter is not None:
  879. # only wake up the waiter when connection_made() has been called
  880. self._loop.call_soon(futures._set_result_unless_cancelled,
  881. waiter, None)
  882. def get_write_buffer_size(self):
  883. return sum(len(data) for data, _ in self._buffer)
  884. def _read_ready(self):
  885. if self._conn_lost:
  886. return
  887. try:
  888. data, addr = self._sock.recvfrom(self.max_size)
  889. except (BlockingIOError, InterruptedError):
  890. pass
  891. except OSError as exc:
  892. self._protocol.error_received(exc)
  893. except (SystemExit, KeyboardInterrupt):
  894. raise
  895. except BaseException as exc:
  896. self._fatal_error(exc, 'Fatal read error on datagram transport')
  897. else:
  898. self._protocol.datagram_received(data, addr)
  899. def sendto(self, data, addr=None):
  900. if not isinstance(data, (bytes, bytearray, memoryview)):
  901. raise TypeError(f'data argument must be a bytes-like object, '
  902. f'not {type(data).__name__!r}')
  903. if not data:
  904. return
  905. if self._address:
  906. if addr not in (None, self._address):
  907. raise ValueError(
  908. f'Invalid address: must be None or {self._address}')
  909. addr = self._address
  910. if self._conn_lost and self._address:
  911. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  912. logger.warning('socket.send() raised exception.')
  913. self._conn_lost += 1
  914. return
  915. if not self._buffer:
  916. # Attempt to send it right away first.
  917. try:
  918. if self._extra['peername']:
  919. self._sock.send(data)
  920. else:
  921. self._sock.sendto(data, addr)
  922. return
  923. except (BlockingIOError, InterruptedError):
  924. self._loop._add_writer(self._sock_fd, self._sendto_ready)
  925. except OSError as exc:
  926. self._protocol.error_received(exc)
  927. return
  928. except (SystemExit, KeyboardInterrupt):
  929. raise
  930. except BaseException as exc:
  931. self._fatal_error(
  932. exc, 'Fatal write error on datagram transport')
  933. return
  934. # Ensure that what we buffer is immutable.
  935. self._buffer.append((bytes(data), addr))
  936. self._maybe_pause_protocol()
  937. def _sendto_ready(self):
  938. while self._buffer:
  939. data, addr = self._buffer.popleft()
  940. try:
  941. if self._extra['peername']:
  942. self._sock.send(data)
  943. else:
  944. self._sock.sendto(data, addr)
  945. except (BlockingIOError, InterruptedError):
  946. self._buffer.appendleft((data, addr)) # Try again later.
  947. break
  948. except OSError as exc:
  949. self._protocol.error_received(exc)
  950. return
  951. except (SystemExit, KeyboardInterrupt):
  952. raise
  953. except BaseException as exc:
  954. self._fatal_error(
  955. exc, 'Fatal write error on datagram transport')
  956. return
  957. self._maybe_resume_protocol() # May append to buffer.
  958. if not self._buffer:
  959. self._loop._remove_writer(self._sock_fd)
  960. if self._closing:
  961. self._call_connection_lost(None)