logo

searx

My custom branche(s) on searx, a meta-search engine git clone https://hacktivis.me/git/searx.git

utils.py (11666B)


  1. import csv
  2. import hashlib
  3. import hmac
  4. import os
  5. import re
  6. from babel.core import get_global
  7. from babel.dates import format_date
  8. from codecs import getincrementalencoder
  9. from imp import load_source
  10. from numbers import Number
  11. from os.path import splitext, join
  12. from io import open
  13. from random import choice
  14. import sys
  15. import json
  16. from searx import settings
  17. from searx.version import VERSION_STRING
  18. from searx.languages import language_codes
  19. from searx import settings
  20. from searx import logger
  21. try:
  22. from cStringIO import StringIO
  23. except:
  24. from io import StringIO
  25. try:
  26. from HTMLParser import HTMLParser
  27. except:
  28. from html.parser import HTMLParser
  29. if sys.version_info[0] == 3:
  30. unichr = chr
  31. unicode = str
  32. IS_PY2 = False
  33. basestring = str
  34. else:
  35. IS_PY2 = True
  36. logger = logger.getChild('utils')
  37. blocked_tags = ('script',
  38. 'style')
  39. useragents = json.loads(open(os.path.dirname(os.path.realpath(__file__))
  40. + "/data/useragents.json", 'r', encoding='utf-8').read())
  41. def searx_useragent():
  42. return 'searx/{searx_version} {suffix}'.format(
  43. searx_version=VERSION_STRING,
  44. suffix=settings['outgoing'].get('useragent_suffix', ''))
  45. def gen_useragent(os=None):
  46. return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions'])))
  47. def highlight_content(content, query):
  48. if not content:
  49. return None
  50. # ignoring html contents
  51. # TODO better html content detection
  52. if content.find('<') != -1:
  53. return content
  54. query = query.decode('utf-8')
  55. if content.lower().find(query.lower()) > -1:
  56. query_regex = u'({0})'.format(re.escape(query))
  57. content = re.sub(query_regex, '<span class="highlight">\\1</span>',
  58. content, flags=re.I | re.U)
  59. else:
  60. regex_parts = []
  61. for chunk in query.split():
  62. if len(chunk) == 1:
  63. regex_parts.append(u'\\W+{0}\\W+'.format(re.escape(chunk)))
  64. else:
  65. regex_parts.append(u'{0}'.format(re.escape(chunk)))
  66. query_regex = u'({0})'.format('|'.join(regex_parts))
  67. content = re.sub(query_regex, '<span class="highlight">\\1</span>',
  68. content, flags=re.I | re.U)
  69. return content
  70. class HTMLTextExtractor(HTMLParser):
  71. def __init__(self):
  72. HTMLParser.__init__(self)
  73. self.result = []
  74. self.tags = []
  75. def handle_starttag(self, tag, attrs):
  76. self.tags.append(tag)
  77. def handle_endtag(self, tag):
  78. if not self.tags:
  79. return
  80. if tag != self.tags[-1]:
  81. raise Exception("invalid html")
  82. self.tags.pop()
  83. def is_valid_tag(self):
  84. return not self.tags or self.tags[-1] not in blocked_tags
  85. def handle_data(self, d):
  86. if not self.is_valid_tag():
  87. return
  88. self.result.append(d)
  89. def handle_charref(self, number):
  90. if not self.is_valid_tag():
  91. return
  92. if number[0] in (u'x', u'X'):
  93. codepoint = int(number[1:], 16)
  94. else:
  95. codepoint = int(number)
  96. self.result.append(unichr(codepoint))
  97. def handle_entityref(self, name):
  98. if not self.is_valid_tag():
  99. return
  100. # codepoint = htmlentitydefs.name2codepoint[name]
  101. # self.result.append(unichr(codepoint))
  102. self.result.append(name)
  103. def get_text(self):
  104. return u''.join(self.result).strip()
  105. def html_to_text(html):
  106. html = html.replace('\n', ' ')
  107. html = ' '.join(html.split())
  108. s = HTMLTextExtractor()
  109. s.feed(html)
  110. return s.get_text()
  111. class UnicodeWriter:
  112. """
  113. A CSV writer which will write rows to CSV file "f",
  114. which is encoded in the given encoding.
  115. """
  116. def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
  117. # Redirect output to a queue
  118. self.queue = StringIO()
  119. self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
  120. self.stream = f
  121. self.encoder = getincrementalencoder(encoding)()
  122. def writerow(self, row):
  123. if IS_PY2:
  124. row = [s.encode("utf-8") if hasattr(s, 'encode') else s for s in row]
  125. self.writer.writerow(row)
  126. # Fetch UTF-8 output from the queue ...
  127. data = self.queue.getvalue()
  128. if IS_PY2:
  129. data = data.decode("utf-8")
  130. else:
  131. data = data.strip('\x00')
  132. # ... and reencode it into the target encoding
  133. data = self.encoder.encode(data)
  134. # write to the target stream
  135. if IS_PY2:
  136. self.stream.write(data)
  137. else:
  138. self.stream.write(data.decode("utf-8"))
  139. # empty queue
  140. self.queue.truncate(0)
  141. def writerows(self, rows):
  142. for row in rows:
  143. self.writerow(row)
  144. def get_resources_directory(searx_directory, subdirectory, resources_directory):
  145. if not resources_directory:
  146. resources_directory = os.path.join(searx_directory, subdirectory)
  147. if not os.path.isdir(resources_directory):
  148. raise Exception(directory + " is not a directory")
  149. return resources_directory
  150. def get_themes(templates_path):
  151. """Returns available themes list."""
  152. themes = os.listdir(templates_path)
  153. if '__common__' in themes:
  154. themes.remove('__common__')
  155. return themes
  156. def get_static_files(static_path):
  157. static_files = set()
  158. static_path_length = len(static_path) + 1
  159. for directory, _, files in os.walk(static_path):
  160. for filename in files:
  161. f = os.path.join(directory[static_path_length:], filename)
  162. static_files.add(f)
  163. return static_files
  164. def get_result_templates(templates_path):
  165. result_templates = set()
  166. templates_path_length = len(templates_path) + 1
  167. for directory, _, files in os.walk(templates_path):
  168. if directory.endswith('result_templates'):
  169. for filename in files:
  170. f = os.path.join(directory[templates_path_length:], filename)
  171. result_templates.add(f)
  172. return result_templates
  173. def format_date_by_locale(date, locale_string):
  174. # strftime works only on dates after 1900
  175. if date.year <= 1900:
  176. return date.isoformat().split('T')[0]
  177. if locale_string == 'all':
  178. locale_string = settings['ui']['default_locale'] or 'en_US'
  179. # to avoid crashing if locale is not supported by babel
  180. try:
  181. formatted_date = format_date(date, locale=locale_string)
  182. except:
  183. formatted_date = format_date(date, "YYYY-MM-dd")
  184. return formatted_date
  185. def dict_subset(d, properties):
  186. result = {}
  187. for k in properties:
  188. if k in d:
  189. result[k] = d[k]
  190. return result
  191. def prettify_url(url, max_length=74):
  192. if len(url) > max_length:
  193. chunk_len = int(max_length / 2 + 1)
  194. return u'{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
  195. else:
  196. return url
  197. # get element in list or default value
  198. def list_get(a_list, index, default=None):
  199. if len(a_list) > index:
  200. return a_list[index]
  201. else:
  202. return default
  203. def get_torrent_size(filesize, filesize_multiplier):
  204. try:
  205. filesize = float(filesize)
  206. if filesize_multiplier == 'TB':
  207. filesize = int(filesize * 1024 * 1024 * 1024 * 1024)
  208. elif filesize_multiplier == 'GB':
  209. filesize = int(filesize * 1024 * 1024 * 1024)
  210. elif filesize_multiplier == 'MB':
  211. filesize = int(filesize * 1024 * 1024)
  212. elif filesize_multiplier == 'KB':
  213. filesize = int(filesize * 1024)
  214. elif filesize_multiplier == 'TiB':
  215. filesize = int(filesize * 1000 * 1000 * 1000 * 1000)
  216. elif filesize_multiplier == 'GiB':
  217. filesize = int(filesize * 1000 * 1000 * 1000)
  218. elif filesize_multiplier == 'MiB':
  219. filesize = int(filesize * 1000 * 1000)
  220. elif filesize_multiplier == 'KiB':
  221. filesize = int(filesize * 1000)
  222. except:
  223. filesize = None
  224. return filesize
  225. def convert_str_to_int(number_str):
  226. if number_str.isdigit():
  227. return int(number_str)
  228. else:
  229. return 0
  230. # convert a variable to integer or return 0 if it's not a number
  231. def int_or_zero(num):
  232. if isinstance(num, list):
  233. if len(num) < 1:
  234. return 0
  235. num = num[0]
  236. return convert_str_to_int(num)
  237. def is_valid_lang(lang):
  238. is_abbr = (len(lang) == 2)
  239. if is_abbr:
  240. for l in language_codes:
  241. if l[0][:2] == lang.lower():
  242. return (True, l[0][:2], l[3].lower())
  243. return False
  244. else:
  245. for l in language_codes:
  246. if l[1].lower() == lang.lower():
  247. return (True, l[0][:2], l[3].lower())
  248. return False
  249. # auxiliary function to match lang_code in lang_list
  250. def _match_language(lang_code, lang_list=[], custom_aliases={}):
  251. # replace language code with a custom alias if necessary
  252. if lang_code in custom_aliases:
  253. lang_code = custom_aliases[lang_code]
  254. if lang_code in lang_list:
  255. return lang_code
  256. # try to get the most likely country for this language
  257. subtags = get_global('likely_subtags').get(lang_code)
  258. if subtags:
  259. subtag_parts = subtags.split('_')
  260. new_code = subtag_parts[0] + '-' + subtag_parts[-1]
  261. if new_code in custom_aliases:
  262. new_code = custom_aliases[new_code]
  263. if new_code in lang_list:
  264. return new_code
  265. # try to get the any supported country for this language
  266. for lc in lang_list:
  267. if lang_code == lc.split('-')[0]:
  268. return lc
  269. return None
  270. # get the language code from lang_list that best matches locale_code
  271. def match_language(locale_code, lang_list=[], custom_aliases={}, fallback='en-US'):
  272. # try to get language from given locale_code
  273. language = _match_language(locale_code, lang_list, custom_aliases)
  274. if language:
  275. return language
  276. locale_parts = locale_code.split('-')
  277. lang_code = locale_parts[0]
  278. # try to get language using an equivalent country code
  279. if len(locale_parts) > 1:
  280. country_alias = get_global('territory_aliases').get(locale_parts[-1])
  281. if country_alias:
  282. language = _match_language(lang_code + '-' + country_alias[0], lang_list, custom_aliases)
  283. if language:
  284. return language
  285. # try to get language using an equivalent language code
  286. alias = get_global('language_aliases').get(lang_code)
  287. if alias:
  288. language = _match_language(alias, lang_list, custom_aliases)
  289. if language:
  290. return language
  291. if lang_code != locale_code:
  292. # try to get language from given language without giving the country
  293. language = _match_language(lang_code, lang_list, custom_aliases)
  294. return language or fallback
  295. def load_module(filename, module_dir):
  296. modname = splitext(filename)[0]
  297. if modname in sys.modules:
  298. del sys.modules[modname]
  299. filepath = join(module_dir, filename)
  300. module = load_source(modname, filepath)
  301. module.name = modname
  302. return module
  303. def new_hmac(secret_key, url):
  304. if sys.version_info[0] == 2:
  305. return hmac.new(bytes(secret_key), url, hashlib.sha256).hexdigest()
  306. else:
  307. return hmac.new(bytes(secret_key, 'utf-8'), url, hashlib.sha256).hexdigest()
  308. def to_string(obj):
  309. if isinstance(obj, basestring):
  310. return obj
  311. if isinstance(obj, Number):
  312. return unicode(obj)
  313. if hasattr(obj, '__str__'):
  314. return obj.__str__()
  315. if hasattr(obj, '__repr__'):
  316. return obj.__repr__()