logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

asynchat.py (11485B)


  1. # -*- Mode: Python; tab-width: 4 -*-
  2. # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
  3. # Author: Sam Rushing <rushing@nightmare.com>
  4. # ======================================================================
  5. # Copyright 1996 by Sam Rushing
  6. #
  7. # All Rights Reserved
  8. #
  9. # Permission to use, copy, modify, and distribute this software and
  10. # its documentation for any purpose and without fee is hereby
  11. # granted, provided that the above copyright notice appear in all
  12. # copies and that both that copyright notice and this permission
  13. # notice appear in supporting documentation, and that the name of Sam
  14. # Rushing not be used in advertising or publicity pertaining to
  15. # distribution of the software without specific, written prior
  16. # permission.
  17. #
  18. # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
  19. # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
  20. # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
  21. # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
  22. # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  23. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
  24. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  25. # ======================================================================
  26. r"""A class supporting chat-style (command/response) protocols.
  27. This class adds support for 'chat' style protocols - where one side
  28. sends a 'command', and the other sends a response (examples would be
  29. the common internet protocols - smtp, nntp, ftp, etc..).
  30. The handle_read() method looks at the input stream for the current
  31. 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
  32. for multi-line output), calling self.found_terminator() on its
  33. receipt.
  34. for example:
  35. Say you build an async nntp client using this class. At the start
  36. of the connection, you'll have self.terminator set to '\r\n', in
  37. order to process the single-line greeting. Just before issuing a
  38. 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
  39. command will be accumulated (using your own 'collect_incoming_data'
  40. method) up to the terminator, and then control will be returned to
  41. you - by calling your self.found_terminator() method.
  42. """
  43. import asyncore
  44. from collections import deque
  45. from warnings import warn
  46. warn(
  47. 'The asynchat module is deprecated. '
  48. 'The recommended replacement is asyncio',
  49. DeprecationWarning,
  50. stacklevel=2)
  51. class async_chat(asyncore.dispatcher):
  52. """This is an abstract class. You must derive from this class, and add
  53. the two methods collect_incoming_data() and found_terminator()"""
  54. # these are overridable defaults
  55. ac_in_buffer_size = 65536
  56. ac_out_buffer_size = 65536
  57. # we don't want to enable the use of encoding by default, because that is a
  58. # sign of an application bug that we don't want to pass silently
  59. use_encoding = 0
  60. encoding = 'latin-1'
  61. def __init__(self, sock=None, map=None):
  62. # for string terminator matching
  63. self.ac_in_buffer = b''
  64. # we use a list here rather than io.BytesIO for a few reasons...
  65. # del lst[:] is faster than bio.truncate(0)
  66. # lst = [] is faster than bio.truncate(0)
  67. self.incoming = []
  68. # we toss the use of the "simple producer" and replace it with
  69. # a pure deque, which the original fifo was a wrapping of
  70. self.producer_fifo = deque()
  71. asyncore.dispatcher.__init__(self, sock, map)
  72. def collect_incoming_data(self, data):
  73. raise NotImplementedError("must be implemented in subclass")
  74. def _collect_incoming_data(self, data):
  75. self.incoming.append(data)
  76. def _get_data(self):
  77. d = b''.join(self.incoming)
  78. del self.incoming[:]
  79. return d
  80. def found_terminator(self):
  81. raise NotImplementedError("must be implemented in subclass")
  82. def set_terminator(self, term):
  83. """Set the input delimiter.
  84. Can be a fixed string of any length, an integer, or None.
  85. """
  86. if isinstance(term, str) and self.use_encoding:
  87. term = bytes(term, self.encoding)
  88. elif isinstance(term, int) and term < 0:
  89. raise ValueError('the number of received bytes must be positive')
  90. self.terminator = term
  91. def get_terminator(self):
  92. return self.terminator
  93. # grab some more data from the socket,
  94. # throw it to the collector method,
  95. # check for the terminator,
  96. # if found, transition to the next state.
  97. def handle_read(self):
  98. try:
  99. data = self.recv(self.ac_in_buffer_size)
  100. except BlockingIOError:
  101. return
  102. except OSError:
  103. self.handle_error()
  104. return
  105. if isinstance(data, str) and self.use_encoding:
  106. data = bytes(str, self.encoding)
  107. self.ac_in_buffer = self.ac_in_buffer + data
  108. # Continue to search for self.terminator in self.ac_in_buffer,
  109. # while calling self.collect_incoming_data. The while loop
  110. # is necessary because we might read several data+terminator
  111. # combos with a single recv(4096).
  112. while self.ac_in_buffer:
  113. lb = len(self.ac_in_buffer)
  114. terminator = self.get_terminator()
  115. if not terminator:
  116. # no terminator, collect it all
  117. self.collect_incoming_data(self.ac_in_buffer)
  118. self.ac_in_buffer = b''
  119. elif isinstance(terminator, int):
  120. # numeric terminator
  121. n = terminator
  122. if lb < n:
  123. self.collect_incoming_data(self.ac_in_buffer)
  124. self.ac_in_buffer = b''
  125. self.terminator = self.terminator - lb
  126. else:
  127. self.collect_incoming_data(self.ac_in_buffer[:n])
  128. self.ac_in_buffer = self.ac_in_buffer[n:]
  129. self.terminator = 0
  130. self.found_terminator()
  131. else:
  132. # 3 cases:
  133. # 1) end of buffer matches terminator exactly:
  134. # collect data, transition
  135. # 2) end of buffer matches some prefix:
  136. # collect data to the prefix
  137. # 3) end of buffer does not match any prefix:
  138. # collect data
  139. terminator_len = len(terminator)
  140. index = self.ac_in_buffer.find(terminator)
  141. if index != -1:
  142. # we found the terminator
  143. if index > 0:
  144. # don't bother reporting the empty string
  145. # (source of subtle bugs)
  146. self.collect_incoming_data(self.ac_in_buffer[:index])
  147. self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
  148. # This does the Right Thing if the terminator
  149. # is changed here.
  150. self.found_terminator()
  151. else:
  152. # check for a prefix of the terminator
  153. index = find_prefix_at_end(self.ac_in_buffer, terminator)
  154. if index:
  155. if index != lb:
  156. # we found a prefix, collect up to the prefix
  157. self.collect_incoming_data(self.ac_in_buffer[:-index])
  158. self.ac_in_buffer = self.ac_in_buffer[-index:]
  159. break
  160. else:
  161. # no prefix, collect it all
  162. self.collect_incoming_data(self.ac_in_buffer)
  163. self.ac_in_buffer = b''
  164. def handle_write(self):
  165. self.initiate_send()
  166. def handle_close(self):
  167. self.close()
  168. def push(self, data):
  169. if not isinstance(data, (bytes, bytearray, memoryview)):
  170. raise TypeError('data argument must be byte-ish (%r)',
  171. type(data))
  172. sabs = self.ac_out_buffer_size
  173. if len(data) > sabs:
  174. for i in range(0, len(data), sabs):
  175. self.producer_fifo.append(data[i:i+sabs])
  176. else:
  177. self.producer_fifo.append(data)
  178. self.initiate_send()
  179. def push_with_producer(self, producer):
  180. self.producer_fifo.append(producer)
  181. self.initiate_send()
  182. def readable(self):
  183. "predicate for inclusion in the readable for select()"
  184. # cannot use the old predicate, it violates the claim of the
  185. # set_terminator method.
  186. # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
  187. return 1
  188. def writable(self):
  189. "predicate for inclusion in the writable for select()"
  190. return self.producer_fifo or (not self.connected)
  191. def close_when_done(self):
  192. "automatically close this channel once the outgoing queue is empty"
  193. self.producer_fifo.append(None)
  194. def initiate_send(self):
  195. while self.producer_fifo and self.connected:
  196. first = self.producer_fifo[0]
  197. # handle empty string/buffer or None entry
  198. if not first:
  199. del self.producer_fifo[0]
  200. if first is None:
  201. self.handle_close()
  202. return
  203. # handle classic producer behavior
  204. obs = self.ac_out_buffer_size
  205. try:
  206. data = first[:obs]
  207. except TypeError:
  208. data = first.more()
  209. if data:
  210. self.producer_fifo.appendleft(data)
  211. else:
  212. del self.producer_fifo[0]
  213. continue
  214. if isinstance(data, str) and self.use_encoding:
  215. data = bytes(data, self.encoding)
  216. # send the data
  217. try:
  218. num_sent = self.send(data)
  219. except OSError:
  220. self.handle_error()
  221. return
  222. if num_sent:
  223. if num_sent < len(data) or obs < len(first):
  224. self.producer_fifo[0] = first[num_sent:]
  225. else:
  226. del self.producer_fifo[0]
  227. # we tried to send some actual data
  228. return
  229. def discard_buffers(self):
  230. # Emergencies only!
  231. self.ac_in_buffer = b''
  232. del self.incoming[:]
  233. self.producer_fifo.clear()
  234. class simple_producer:
  235. def __init__(self, data, buffer_size=512):
  236. self.data = data
  237. self.buffer_size = buffer_size
  238. def more(self):
  239. if len(self.data) > self.buffer_size:
  240. result = self.data[:self.buffer_size]
  241. self.data = self.data[self.buffer_size:]
  242. return result
  243. else:
  244. result = self.data
  245. self.data = b''
  246. return result
  247. # Given 'haystack', see if any prefix of 'needle' is at its end. This
  248. # assumes an exact match has already been checked. Return the number of
  249. # characters matched.
  250. # for example:
  251. # f_p_a_e("qwerty\r", "\r\n") => 1
  252. # f_p_a_e("qwertydkjf", "\r\n") => 0
  253. # f_p_a_e("qwerty\r\n", "\r\n") => <undefined>
  254. # this could maybe be made faster with a computed regex?
  255. # [answer: no; circa Python-2.0, Jan 2001]
  256. # new python: 28961/s
  257. # old python: 18307/s
  258. # re: 12820/s
  259. # regex: 14035/s
  260. def find_prefix_at_end(haystack, needle):
  261. l = len(needle) - 1
  262. while l and not haystack.endswith(needle[:l]):
  263. l -= 1
  264. return l