commit: abef53466da1f7d2e79f5644718a2cf7524abc49
parent e7926ae9f4e5fa258696551a39295402819280c9
Author: dirkf <fieldhouse@gmx.net>
Date: Fri, 28 Jul 2023 06:19:15 +0100
[utils] Rework URL path munging for ., .. components
* move processing to YoutubeDLHandler
* also process `Location` header for redirect
* use tests from https://github.com/yt-dlp/yt-dlp/pull/7662
Diffstat:
4 files changed, 95 insertions(+), 41 deletions(-)
diff --git a/test/test_http.py b/test/test_http.py
@@ -180,6 +180,12 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
respond()
elif self.path == '/%c7%9f':
respond()
+ elif self.path == '/redirect_dotsegments':
+ self.send_response(301)
+ # redirect to /headers but with dot segments before
+ self.send_header('Location', '/a/b/./../../headers')
+ self.send_header('Content-Length', '0')
+ self.end_headers()
elif self.path.startswith('/redirect_'):
self._redirect()
elif self.path.startswith('/method'):
@@ -489,6 +495,14 @@ class TestHTTP(unittest.TestCase):
self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
self.assertEqual(res.read(), b'raw')
+ def test_remove_dot_segments(self):
+ with FakeYDL() as ydl:
+ res = ydl.urlopen(sanitized_Request(self._test_url('a/b/./../../headers')))
+ self.assertEqual(compat_urllib_parse.urlparse(res.geturl()).path, '/headers')
+
+ res = ydl.urlopen(sanitized_Request(self._test_url('redirect_dotsegments')))
+ self.assertEqual(compat_urllib_parse.urlparse(res.geturl()).path, '/headers')
+
def _build_proxy_handler(name):
class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
diff --git a/test/test_utils.py b/test/test_utils.py
@@ -64,6 +64,7 @@ from youtube_dl.utils import (
parse_age_limit,
parse_duration,
parse_filesize,
+ parse_codecs,
parse_count,
parse_iso8601,
parse_resolution,
@@ -114,7 +115,7 @@ from youtube_dl.utils import (
cli_option,
cli_valueless_option,
cli_bool_option,
- parse_codecs,
+ YoutubeDLHandler,
)
from youtube_dl.compat import (
compat_chr,
@@ -905,6 +906,32 @@ class TestUtil(unittest.TestCase):
)
self.assertEqual(escape_url('http://vimeo.com/56015672#at=0'), 'http://vimeo.com/56015672#at=0')
+ def test_remove_dot_segments(self):
+
+ def remove_dot_segments(p):
+ q = '' if p.startswith('/') else '/'
+ p = 'http://example.com' + q + p
+ p = compat_urlparse.urlsplit(YoutubeDLHandler._fix_path(p)).path
+ return p[1:] if q else p
+
+ self.assertEqual(remove_dot_segments('/a/b/c/./../../g'), '/a/g')
+ self.assertEqual(remove_dot_segments('mid/content=5/../6'), 'mid/6')
+ self.assertEqual(remove_dot_segments('/ad/../cd'), '/cd')
+ self.assertEqual(remove_dot_segments('/ad/../cd/'), '/cd/')
+ self.assertEqual(remove_dot_segments('/..'), '/')
+ self.assertEqual(remove_dot_segments('/./'), '/')
+ self.assertEqual(remove_dot_segments('/./a'), '/a')
+ self.assertEqual(remove_dot_segments('/abc/./.././d/././e/.././f/./../../ghi'), '/ghi')
+ self.assertEqual(remove_dot_segments('/'), '/')
+ self.assertEqual(remove_dot_segments('/t'), '/t')
+ self.assertEqual(remove_dot_segments('t'), 't')
+ self.assertEqual(remove_dot_segments(''), '')
+ self.assertEqual(remove_dot_segments('/../a/b/c'), '/a/b/c')
+ self.assertEqual(remove_dot_segments('../a'), 'a')
+ self.assertEqual(remove_dot_segments('./a'), 'a')
+ self.assertEqual(remove_dot_segments('.'), '')
+ self.assertEqual(remove_dot_segments('////'), '////')
+
def test_js_to_json_vars_strings(self):
self.assertDictEqual(
json.loads(js_to_json(
diff --git a/youtube_dl/YoutubeDL.py b/youtube_dl/YoutubeDL.py
@@ -71,7 +71,6 @@ from .utils import (
format_bytes,
formatSeconds,
GeoRestrictedError,
- HEADRequest,
int_or_none,
ISO3166Utils,
join_nonempty,
@@ -88,7 +87,6 @@ from .utils import (
preferredencoding,
prepend_extension,
process_communicate_or_kill,
- PUTRequest,
register_socks_protocols,
render_table,
replace_extension,
@@ -2460,27 +2458,6 @@ class YoutubeDL(object):
""" Start an HTTP download """
if isinstance(req, compat_basestring):
req = sanitized_Request(req)
- # an embedded /../ sequence is not automatically handled by urllib2
- # see https://github.com/yt-dlp/yt-dlp/issues/3355
- url = req.get_full_url()
- parts = url.partition('/../')
- if parts[1]:
- url = compat_urllib_parse.urljoin(parts[0] + parts[1][:1], parts[1][1:] + parts[2])
- if url:
- # worse, URL path may have initial /../ against RFCs: work-around
- # by stripping such prefixes, like eg Firefox
- parts = compat_urllib_parse.urlsplit(url)
- path = parts.path
- while path.startswith('/../'):
- path = path[3:]
- url = parts._replace(path=path).geturl()
- # get a new Request with the munged URL
- if url != req.get_full_url():
- req_type = {'HEAD': HEADRequest, 'PUT': PUTRequest}.get(
- req.get_method(), compat_urllib_request.Request)
- req = req_type(
- url, data=req.data, headers=dict(req.header_items()),
- origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)
return self._opener.open(req, timeout=self._socket_timeout)
def print_debug_header(self):
diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py
@@ -2678,17 +2678,52 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
def compress(data):
return data and ncompress.decompress(data)
+ @staticmethod
+ def _fix_path(url):
+ # an embedded /../ or /./ sequence is not automatically handled by urllib2
+ # see https://github.com/yt-dlp/yt-dlp/issues/3355
+ parsed_url = compat_urllib_parse.urlsplit(url)
+ path = parsed_url.path
+ if not path.endswith('/'):
+ path += '/'
+ parts = path.partition('/./')
+ if not parts[1]:
+ parts = path.partition('/../')
+ if parts[1]:
+ path = compat_urllib_parse.urljoin(
+ parts[0] + parts[1][:1],
+ parts[1][1:] + (parts[2] if parsed_url.path.endswith('/') else parts[2][:-1]))
+ url = parsed_url._replace(path=path).geturl()
+ if '/.' in url:
+ # worse, URL path may have initial /../ against RFCs: work-around
+ # by stripping such prefixes, like eg Firefox
+ path = parsed_url.path + '/'
+ while path.startswith('/.'):
+ if path.startswith('/../'):
+ path = path[3:]
+ elif path.startswith('/./'):
+ path = path[2:]
+ else:
+ break
+ path = path[:-1]
+ if not path.startswith('/') and parsed_url.path.startswith('/'):
+ path = '/' + path
+ url = parsed_url._replace(path=path).geturl()
+ return url
+
def http_request(self, req):
- # According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
- # always respected by websites, some tend to give out URLs with non percent-encoded
+ url = req.get_full_url()
+ # resolve embedded . and ..
+ url_fixed = self._fix_path(url)
+ # According to RFC 3986, URLs can not contain non-ASCII characters; however this is not
+ # always respected by websites: some tend to give out URLs with non percent-encoded
# non-ASCII characters (see telemb.py, ard.py [#3412])
# urllib chokes on URLs with non-ASCII characters (see http://bugs.python.org/issue3991)
# To work around aforementioned issue we will replace request's original URL with
# percent-encoded one
# Since redirects are also affected (e.g. http://www.southpark.de/alle-episoden/s18e09)
# the code of this workaround has been moved here from YoutubeDL.urlopen()
- url = req.get_full_url()
- url_escaped = escape_url(url)
+ url_escaped = escape_url(url_fixed)
# Substitute URL if any change after escaping
if url != url_escaped:
@@ -2702,10 +2737,13 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
req.headers = handle_youtubedl_headers(req.headers)
- if sys.version_info < (2, 7) and '#' in req.get_full_url():
- # Python 2.6 is brain-dead when it comes to fragments
- req._Request__original = req._Request__original.partition('#')[0]
- req._Request__r_type = req._Request__r_type.partition('#')[0]
+ if sys.version_info < (2, 7):
+ # avoid possible race where __r_type may be unset
+ req.get_type()
+ if '#' in req.get_full_url():
+ # Python 2.6 is brain-dead when it comes to fragments
+ req._Request__original = req._Request__original.partition('#')[0]
+ req._Request__r_type = req._Request__r_type.partition('#')[0]
# Use the totally undocumented AbstractHTTPHandler per
# https://github.com/yt-dlp/yt-dlp/pull/4158
@@ -2775,10 +2813,13 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
if sys.version_info >= (3, 0):
location = location.encode('iso-8859-1')
location = location.decode('utf-8')
- location_escaped = escape_url(location)
+ # resolve embedded . and ..
+ location_fixed = self._fix_path(location)
+ location_escaped = escape_url(location_fixed)
if location != location_escaped:
del resp.headers['Location']
- if sys.version_info < (3, 0):
+ # if sys.version_info < (3, 0):
+ if not isinstance(location_escaped, str):
location_escaped = location_escaped.encode('utf-8')
resp.headers['Location'] = location_escaped
return resp
@@ -4248,13 +4289,8 @@ def update_Request(req, url=None, data=None, headers={}, query={}):
req_headers.update(headers)
req_data = data if data is not None else req.data
req_url = update_url_query(url or req.get_full_url(), query)
- req_get_method = req.get_method()
- if req_get_method == 'HEAD':
- req_type = HEADRequest
- elif req_get_method == 'PUT':
- req_type = PUTRequest
- else:
- req_type = compat_urllib_request.Request
+ req_type = {'HEAD': HEADRequest, 'PUT': PUTRequest}.get(
+ req.get_method(), compat_urllib_request.Request)
new_req = req_type(
req_url, data=req_data, headers=req_headers,
origin_req_host=req.origin_req_host, unverifiable=req.unverifiable)