logo

youtube-dl

[mirror] Download/Watch videos from video hostersgit clone https://hacktivis.me/git/mirror/youtube-dl.git

test_downloader_external.py (8635B)


  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from __future__ import unicode_literals
  4. # Allow direct execution
  5. import os
  6. import re
  7. import sys
  8. import subprocess
  9. import unittest
  10. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  11. from test.helper import (
  12. FakeLogger,
  13. FakeYDL,
  14. http_server_port,
  15. try_rm,
  16. )
  17. from youtube_dl import YoutubeDL
  18. from youtube_dl.compat import (
  19. compat_http_cookiejar_Cookie,
  20. compat_http_server,
  21. compat_kwargs,
  22. )
  23. from youtube_dl.utils import (
  24. encodeFilename,
  25. join_nonempty,
  26. )
  27. from youtube_dl.downloader.external import (
  28. Aria2cFD,
  29. Aria2pFD,
  30. AxelFD,
  31. CurlFD,
  32. FFmpegFD,
  33. HttpieFD,
  34. WgetFD,
  35. )
  36. import threading
  37. TEST_SIZE = 10 * 1024
  38. TEST_COOKIE = {
  39. 'version': 0,
  40. 'name': 'test',
  41. 'value': 'ytdlp',
  42. 'port': None,
  43. 'port_specified': False,
  44. 'domain': '.example.com',
  45. 'domain_specified': True,
  46. 'domain_initial_dot': False,
  47. 'path': '/',
  48. 'path_specified': True,
  49. 'secure': False,
  50. 'expires': None,
  51. 'discard': False,
  52. 'comment': None,
  53. 'comment_url': None,
  54. 'rest': {},
  55. }
  56. TEST_COOKIE_VALUE = join_nonempty('name', 'value', delim='=', from_dict=TEST_COOKIE)
  57. TEST_INFO = {'url': 'http://www.example.com/'}
  58. def cookiejar_Cookie(**cookie_args):
  59. return compat_http_cookiejar_Cookie(**compat_kwargs(cookie_args))
  60. def ifExternalFDAvailable(externalFD):
  61. return unittest.skipUnless(externalFD.available(),
  62. externalFD.get_basename() + ' not found')
  63. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  64. def log_message(self, format, *args):
  65. pass
  66. def send_content_range(self, total=None):
  67. range_header = self.headers.get('Range')
  68. start = end = None
  69. if range_header:
  70. mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)
  71. if mobj:
  72. start, end = (int(mobj.group(i)) for i in (1, 2))
  73. valid_range = start is not None and end is not None
  74. if valid_range:
  75. content_range = 'bytes %d-%d' % (start, end)
  76. if total:
  77. content_range += '/%d' % total
  78. self.send_header('Content-Range', content_range)
  79. return (end - start + 1) if valid_range else total
  80. def serve(self, range=True, content_length=True):
  81. self.send_response(200)
  82. self.send_header('Content-Type', 'video/mp4')
  83. size = TEST_SIZE
  84. if range:
  85. size = self.send_content_range(TEST_SIZE)
  86. if content_length:
  87. self.send_header('Content-Length', size)
  88. self.end_headers()
  89. self.wfile.write(b'#' * size)
  90. def do_GET(self):
  91. if self.path == '/regular':
  92. self.serve()
  93. elif self.path == '/no-content-length':
  94. self.serve(content_length=False)
  95. elif self.path == '/no-range':
  96. self.serve(range=False)
  97. elif self.path == '/no-range-no-content-length':
  98. self.serve(range=False, content_length=False)
  99. else:
  100. assert False, 'unrecognised server path'
  101. @ifExternalFDAvailable(Aria2pFD)
  102. class TestAria2pFD(unittest.TestCase):
  103. def setUp(self):
  104. self.httpd = compat_http_server.HTTPServer(
  105. ('127.0.0.1', 0), HTTPTestRequestHandler)
  106. self.port = http_server_port(self.httpd)
  107. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  108. self.server_thread.daemon = True
  109. self.server_thread.start()
  110. def download(self, params, ep):
  111. with subprocess.Popen(
  112. ['aria2c', '--enable-rpc'],
  113. stdout=subprocess.DEVNULL,
  114. stderr=subprocess.DEVNULL
  115. ) as process:
  116. if not process.poll():
  117. filename = 'testfile.mp4'
  118. params['logger'] = FakeLogger()
  119. params['outtmpl'] = filename
  120. ydl = YoutubeDL(params)
  121. try_rm(encodeFilename(filename))
  122. self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)
  123. self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
  124. try_rm(encodeFilename(filename))
  125. process.kill()
  126. def download_all(self, params):
  127. for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  128. self.download(params, ep)
  129. def test_regular(self):
  130. self.download_all({'external_downloader': 'aria2p'})
  131. def test_chunked(self):
  132. self.download_all({
  133. 'external_downloader': 'aria2p',
  134. 'http_chunk_size': 1000,
  135. })
  136. @ifExternalFDAvailable(HttpieFD)
  137. class TestHttpieFD(unittest.TestCase):
  138. def test_make_cmd(self):
  139. with FakeYDL() as ydl:
  140. downloader = HttpieFD(ydl, {})
  141. self.assertEqual(
  142. downloader._make_cmd('test', TEST_INFO),
  143. ['http', '--download', '--output', 'test', 'http://www.example.com/'])
  144. # Test cookie header is added
  145. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  146. self.assertEqual(
  147. downloader._make_cmd('test', TEST_INFO),
  148. ['http', '--download', '--output', 'test',
  149. 'http://www.example.com/', 'Cookie:' + TEST_COOKIE_VALUE])
  150. @ifExternalFDAvailable(AxelFD)
  151. class TestAxelFD(unittest.TestCase):
  152. def test_make_cmd(self):
  153. with FakeYDL() as ydl:
  154. downloader = AxelFD(ydl, {})
  155. self.assertEqual(
  156. downloader._make_cmd('test', TEST_INFO),
  157. ['axel', '-o', 'test', '--', 'http://www.example.com/'])
  158. # Test cookie header is added
  159. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  160. self.assertEqual(
  161. downloader._make_cmd('test', TEST_INFO),
  162. ['axel', '-o', 'test', '-H', 'Cookie: ' + TEST_COOKIE_VALUE,
  163. '--max-redirect=0', '--', 'http://www.example.com/'])
  164. @ifExternalFDAvailable(WgetFD)
  165. class TestWgetFD(unittest.TestCase):
  166. def test_make_cmd(self):
  167. with FakeYDL() as ydl:
  168. downloader = WgetFD(ydl, {})
  169. self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
  170. # Test cookiejar tempfile arg is added
  171. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  172. self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
  173. @ifExternalFDAvailable(CurlFD)
  174. class TestCurlFD(unittest.TestCase):
  175. def test_make_cmd(self):
  176. with FakeYDL() as ydl:
  177. downloader = CurlFD(ydl, {})
  178. self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
  179. # Test cookie header is added
  180. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  181. self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
  182. self.assertIn(TEST_COOKIE_VALUE, downloader._make_cmd('test', TEST_INFO))
  183. @ifExternalFDAvailable(Aria2cFD)
  184. class TestAria2cFD(unittest.TestCase):
  185. def test_make_cmd(self):
  186. with FakeYDL() as ydl:
  187. downloader = Aria2cFD(ydl, {})
  188. downloader._make_cmd('test', TEST_INFO)
  189. self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
  190. # Test cookiejar tempfile arg is added
  191. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  192. cmd = downloader._make_cmd('test', TEST_INFO)
  193. self.assertIn('--load-cookies=%s' % downloader._cookies_tempfile, cmd)
  194. @ifExternalFDAvailable(FFmpegFD)
  195. class TestFFmpegFD(unittest.TestCase):
  196. _args = []
  197. def _test_cmd(self, args):
  198. self._args = args
  199. def test_make_cmd(self):
  200. with FakeYDL() as ydl:
  201. downloader = FFmpegFD(ydl, {})
  202. downloader._debug_cmd = self._test_cmd
  203. info_dict = TEST_INFO.copy()
  204. info_dict['ext'] = 'mp4'
  205. downloader._call_downloader('test', info_dict)
  206. self.assertEqual(self._args, [
  207. 'ffmpeg', '-y', '-i', 'http://www.example.com/',
  208. '-c', 'copy', '-f', 'mp4', 'file:test'])
  209. # Test cookies arg is added
  210. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  211. downloader._call_downloader('test', info_dict)
  212. self.assertEqual(self._args, [
  213. 'ffmpeg', '-y', '-cookies', TEST_COOKIE_VALUE + '; path=/; domain=.example.com;\r\n',
  214. '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
  215. if __name__ == '__main__':
  216. unittest.main()