Blob Blame History Raw
From 976a4010aa4e450855dce5fa4c865bcbdc86cccd Mon Sep 17 00:00:00 2001
From: Charalampos Stratakis <cstratak@redhat.com>
Date: Fri, 16 Apr 2021 18:02:00 +0200
Subject: [PATCH] CVE-2021-23336: Add `separator` argument to parse_qs; warn
 with default
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Partially backports https://bugs.python.org/issue42967 : [security] Address a web cache-poisoning issue reported in urllib.parse.parse_qsl().

Backported from the python3 branch.
However, this solution is different than the upstream solution in Python 3.

Based on the downstream solution for python 3.6.13 by Petr Viktorin.

An optional argument seperator is added to specify the separator.
It is recommended to set it to '&' or ';' to match the application or proxy in use.
The default can be set with an env variable of a config file.
If neither the argument, env var or config file specifies a separator, "&" is used
but a warning is raised if parse_qs is used on input that contains ';'.

Co-authors of the downstream change:
Co-authored-by: Petr Viktorin <pviktori@redhat.com>
Co-authors of the upstream change (who do not necessarily agree with this):
Co-authored-by: Adam Goldschmidt <adamgold7@gmail.com>
Co-authored-by: Ken Jin <28750310+Fidget-Spinner@users.noreply.github.com>
Co-authored-by: Éric Araujo <merwok@netwok.org>
---
 Doc/library/cgi.rst       |   5 +-
 Doc/library/urlparse.rst  |  15 ++-
 Lib/cgi.py                |  34 +++---
 Lib/test/test_cgi.py      |  59 ++++++++++-
 Lib/test/test_urlparse.py | 210 +++++++++++++++++++++++++++++++++++++-
 Lib/urlparse.py           |  78 +++++++++++++-
 6 files changed, 369 insertions(+), 32 deletions(-)

diff --git a/Doc/library/cgi.rst b/Doc/library/cgi.rst
index ecd62c8c019..a96cd38717b 100644
--- a/Doc/library/cgi.rst
+++ b/Doc/library/cgi.rst
@@ -285,10 +285,10 @@ These are useful if you want more control, or if you want to employ some of the
 algorithms implemented in this module in other circumstances.
 
 
-.. function:: parse(fp[, environ[, keep_blank_values[, strict_parsing]]])
+.. function:: parse(fp[, environ[, keep_blank_values[, strict_parsing[, separator]]]])
 
    Parse a query in the environment or from a file (the file defaults to
-   ``sys.stdin`` and environment defaults to ``os.environ``).  The *keep_blank_values* and *strict_parsing* parameters are
+   ``sys.stdin`` and environment defaults to ``os.environ``).  The *keep_blank_values*, *strict_parsing* and *separator* parameters are
    passed to :func:`urlparse.parse_qs` unchanged.
 
 
@@ -316,7 +316,6 @@ algorithms implemented in this module in other circumstances.
    Note that this does not parse nested multipart parts --- use
    :class:`FieldStorage` for that.
 
-
 .. function:: parse_header(string)
 
    Parse a MIME header (such as :mailheader:`Content-Type`) into a main value and a
diff --git a/Doc/library/urlparse.rst b/Doc/library/urlparse.rst
index 0989c88c302..97d1119257c 100644
--- a/Doc/library/urlparse.rst
+++ b/Doc/library/urlparse.rst
@@ -136,7 +136,7 @@ The :mod:`urlparse` module defines the following functions:
       now raise :exc:`ValueError`.
 
 
-.. function:: parse_qs(qs[, keep_blank_values[, strict_parsing[, max_num_fields]]])
+.. function:: parse_qs(qs[, keep_blank_values[, strict_parsing[, max_num_fields[, separator]]]])
 
    Parse a query string given as a string argument (data of type
    :mimetype:`application/x-www-form-urlencoded`).  Data are returned as a
@@ -157,6 +157,15 @@ The :mod:`urlparse` module defines the following functions:
    read. If set, then throws a :exc:`ValueError` if there are more than
    *max_num_fields* fields read.
 
+   The optional argument *separator* is the symbol to use for separating the
+   query arguments. It is recommended to set it to ``'&'`` or ``';'``.
+   It defaults to ``'&'``; a warning is raised if this default is used.
+   This default may be changed with the following environment variable settings:
+
+   - ``PYTHON_URLLIB_QS_SEPARATOR='&'``: use only ``&`` as separator, without warning (as in Python 3.6.13+ or 3.10)
+   - ``PYTHON_URLLIB_QS_SEPARATOR=';'``: use only ``;`` as separator
+   - ``PYTHON_URLLIB_QS_SEPARATOR=legacy``: use both ``&`` and ``;`` (as in previous versions of Python)
+
    Use the :func:`urllib.urlencode` function to convert such dictionaries into
    query strings.
 
@@ -186,6 +195,9 @@ The :mod:`urlparse` module defines the following functions:
    read. If set, then throws a :exc:`ValueError` if there are more than
    *max_num_fields* fields read.
 
+   The optional argument *separator* is the symbol to use for separating the
+   query arguments. It works as in :py:func:`parse_qs`.
+
    Use the :func:`urllib.urlencode` function to convert such lists of pairs into
    query strings.
 
@@ -195,6 +207,7 @@ The :mod:`urlparse` module defines the following functions:
    .. versionchanged:: 2.7.16
       Added *max_num_fields* parameter.
 
+
 .. function:: urlunparse(parts)
 
    Construct a URL from a tuple as returned by ``urlparse()``. The *parts* argument
diff --git a/Lib/cgi.py b/Lib/cgi.py
index 5b903e03477..1421f2d90e0 100755
--- a/Lib/cgi.py
+++ b/Lib/cgi.py
@@ -121,7 +121,8 @@ log = initlog           # The current logging function
 # 0 ==> unlimited input
 maxlen = 0
 
-def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
+def parse(fp=None, environ=os.environ, keep_blank_values=0,
+          strict_parsing=0, separator=None):
     """Parse a query in the environment or from a file (default stdin)
 
         Arguments, all optional:
@@ -140,6 +141,8 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
         strict_parsing: flag indicating what to do with parsing errors.
             If false (the default), errors are silently ignored.
             If true, errors raise a ValueError exception.
+
+        separator: str. The symbol to use for separating the query arguments.
     """
     if fp is None:
         fp = sys.stdin
@@ -171,25 +174,26 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
         else:
             qs = ""
         environ['QUERY_STRING'] = qs    # XXX Shouldn't, really
-    return urlparse.parse_qs(qs, keep_blank_values, strict_parsing)
+    return urlparse.parse_qs(qs, keep_blank_values, strict_parsing, separator=separator)
 
 
 # parse query string function called from urlparse,
 # this is done in order to maintain backward compatibility.
 
-def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
+def parse_qs(qs, keep_blank_values=0, strict_parsing=0, separator=None):
     """Parse a query given as a string argument."""
     warn("cgi.parse_qs is deprecated, use urlparse.parse_qs instead",
          PendingDeprecationWarning, 2)
-    return urlparse.parse_qs(qs, keep_blank_values, strict_parsing)
+    return urlparse.parse_qs(qs, keep_blank_values, strict_parsing,
+                             separator=separator)
 
 
-def parse_qsl(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None):
+def parse_qsl(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None, separator=None):
     """Parse a query given as a string argument."""
     warn("cgi.parse_qsl is deprecated, use urlparse.parse_qsl instead",
          PendingDeprecationWarning, 2)
     return urlparse.parse_qsl(qs, keep_blank_values, strict_parsing,
-                              max_num_fields)
+                              max_num_fields, separator=separator)
 
 def parse_multipart(fp, pdict):
     """Parse multipart input.
@@ -288,7 +292,6 @@ def parse_multipart(fp, pdict):
 
     return partdict
 
-
 def _parseparam(s):
     while s[:1] == ';':
         s = s[1:]
@@ -395,7 +398,7 @@ class FieldStorage:
 
     def __init__(self, fp=None, headers=None, outerboundary="",
                  environ=os.environ, keep_blank_values=0, strict_parsing=0,
-                 max_num_fields=None):
+                 max_num_fields=None, separator=None):
         """Constructor.  Read multipart/* until last part.
 
         Arguments, all optional:
@@ -430,6 +433,7 @@ class FieldStorage:
         self.keep_blank_values = keep_blank_values
         self.strict_parsing = strict_parsing
         self.max_num_fields = max_num_fields
+        self.separator = separator
         if 'REQUEST_METHOD' in environ:
             method = environ['REQUEST_METHOD'].upper()
         self.qs_on_post = None
@@ -613,7 +617,8 @@ class FieldStorage:
         if self.qs_on_post:
             qs += '&' + self.qs_on_post
         query = urlparse.parse_qsl(qs, self.keep_blank_values,
-                                   self.strict_parsing, self.max_num_fields)
+                                   self.strict_parsing, self.max_num_fields,
+                                   self.separator)
         self.list = [MiniFieldStorage(key, value) for key, value in query]
         self.skip_lines()
 
@@ -629,7 +634,8 @@ class FieldStorage:
             query = urlparse.parse_qsl(self.qs_on_post,
                                        self.keep_blank_values,
                                        self.strict_parsing,
-                                       self.max_num_fields)
+                                       self.max_num_fields,
+                                       self.separator)
             self.list.extend(MiniFieldStorage(key, value)
                              for key, value in query)
             FieldStorageClass = None
@@ -649,7 +655,8 @@ class FieldStorage:
             headers = rfc822.Message(self.fp)
             part = klass(self.fp, headers, ib,
                          environ, keep_blank_values, strict_parsing,
-                         max_num_fields)
+                         max_num_fields,
+                         separator=self.separator)
 
             if max_num_fields is not None:
                 max_num_fields -= 1
@@ -817,10 +824,11 @@ class FormContentDict(UserDict.UserDict):
     form.dict == {key: [val, val, ...], ...}
 
     """
-    def __init__(self, environ=os.environ, keep_blank_values=0, strict_parsing=0):
+    def __init__(self, environ=os.environ, keep_blank_values=0, strict_parsing=0, separator=None):
         self.dict = self.data = parse(environ=environ,
                                       keep_blank_values=keep_blank_values,
-                                      strict_parsing=strict_parsing)
+                                      strict_parsing=strict_parsing,
+                                      separator=separator)
         self.query_string = environ['QUERY_STRING']
 
 
diff --git a/Lib/test/test_cgi.py b/Lib/test/test_cgi.py
index 743c2afbd4c..9956ea9d4e8 100644
--- a/Lib/test/test_cgi.py
+++ b/Lib/test/test_cgi.py
@@ -61,12 +61,9 @@ parse_strict_test_cases = [
     ("", ValueError("bad query field: ''")),
     ("&", ValueError("bad query field: ''")),
     ("&&", ValueError("bad query field: ''")),
-    (";", ValueError("bad query field: ''")),
-    (";&;", ValueError("bad query field: ''")),
     # Should the next few really be valid?
     ("=", {}),
     ("=&=", {}),
-    ("=;=", {}),
     # This rest seem to make sense
     ("=a", {'': ['a']}),
     ("&=a", ValueError("bad query field: ''")),
@@ -81,8 +78,6 @@ parse_strict_test_cases = [
     ("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
     ("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
     ("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
-    ("x=1;y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
-    ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
     ("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
      {'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
       'cuyer': ['r'],
@@ -177,6 +172,60 @@ class CgiTests(unittest.TestCase):
                         self.assertItemsEqual(sd.items(),
                                                 first_second_elts(expect.items()))
 
+    def test_separator(self):
+        parse_semicolon = [
+            ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
+            ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
+            (";", ValueError("bad query field: ''")),
+            (";;", ValueError("bad query field: ''")),
+            ("=;a", ValueError("bad query field: 'a'")),
+            (";b=a", ValueError("bad query field: ''")),
+            ("b;=a", ValueError("bad query field: 'b'")),
+            ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
+            ("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
+        ]
+        for orig, expect in parse_semicolon:
+            env = {'QUERY_STRING': orig}
+            fcd = cgi.FormContentDict(env, separator=';')
+            sd = cgi.SvFormContentDict(env, separator=';')
+            fs = cgi.FieldStorage(environ=env, separator=';')
+            if isinstance(expect, dict):
+                # test dict interface
+                self.assertEqual(len(expect), len(fcd))
+                self.assertItemsEqual(expect.keys(), fcd.keys())
+                self.assertItemsEqual(expect.values(), fcd.values())
+                self.assertItemsEqual(expect.items(), fcd.items())
+                self.assertEqual(fcd.get("nonexistent field", "default"), "default")
+                self.assertEqual(len(sd), len(fs))
+                self.assertItemsEqual(sd.keys(), fs.keys())
+                self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
+                # test individual fields
+                for key in expect.keys():
+                    expect_val = expect[key]
+                    self.assertTrue(fcd.has_key(key))
+                    self.assertItemsEqual(fcd[key], expect[key])
+                    self.assertEqual(fcd.get(key, "default"), fcd[key])
+                    self.assertTrue(fs.has_key(key))
+                    if len(expect_val) > 1:
+                        single_value = 0
+                    else:
+                        single_value = 1
+                    try:
+                        val = sd[key]
+                    except IndexError:
+                        self.assertFalse(single_value)
+                        self.assertEqual(fs.getvalue(key), expect_val)
+                    else:
+                        self.assertTrue(single_value)
+                        self.assertEqual(val, expect_val[0])
+                        self.assertEqual(fs.getvalue(key), expect_val[0])
+                    self.assertItemsEqual(sd.getlist(key), expect_val)
+                    if single_value:
+                        self.assertItemsEqual(sd.values(),
+                                                first_elts(expect.values()))
+                        self.assertItemsEqual(sd.items(),
+                                                first_second_elts(expect.items()))
+
     def test_weird_formcontentdict(self):
         # Test the weird FormContentDict classes
         env = {'QUERY_STRING': "x=1&y=2.0&z=2-3.%2b0&1=1abc"}
diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py
index 86c4a0595c4..21875bb2991 100644
--- a/Lib/test/test_urlparse.py
+++ b/Lib/test/test_urlparse.py
@@ -3,6 +3,12 @@ import sys
 import unicodedata
 import unittest
 import urlparse
+from test.support import EnvironmentVarGuard
+from warnings import catch_warnings, filterwarnings
+import tempfile
+import contextlib
+import os.path
+import shutil
 
 RFC1808_BASE = "http://a/b/c/d;p?q#f"
 RFC2396_BASE = "http://a/b/c/d;p?q"
@@ -24,16 +30,29 @@ parse_qsl_test_cases = [
     ("&a=b", [('a', 'b')]),
     ("a=a+b&b=b+c", [('a', 'a b'), ('b', 'b c')]),
     ("a=1&a=2", [('a', '1'), ('a', '2')]),
+]
+
+parse_qsl_test_cases_semicolon = [
     (";", []),
     (";;", []),
     (";a=b", [('a', 'b')]),
     ("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]),
     ("a=1;a=2", [('a', '1'), ('a', '2')]),
-    (b";", []),
-    (b";;", []),
-    (b";a=b", [(b'a', b'b')]),
-    (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
-    (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
+]
+
+parse_qsl_test_cases_legacy = [
+    ("a=1;a=2&a=3", [('a', '1'), ('a', '2'), ('a', '3')]),
+    ("a=1;b=2&c=3", [('a', '1'), ('b', '2'), ('c', '3')]),
+    ("a=1&b=2&c=3;", [('a', '1'), ('b', '2'), ('c', '3')]),
+]
+
+parse_qsl_test_cases_warn = [
+    (";a=b", [(';a', 'b')]),
+    ("a=a+b;b=b+c", [('a', 'a b;b=b c')]),
+    (b";a=b", [(b';a', b'b')]),
+    (b"a=a+b;b=b+c", [(b'a', b'a b;b=b c')]),
+    ("a=1;a=2&a=3", [('a', '1;a=2'), ('a', '3')]),
+    (b"a=1;a=2&a=3", [(b'a', b'1;a=2'), (b'a', b'3')]),
 ]
 
 parse_qs_test_cases = [
@@ -57,6 +76,9 @@ parse_qs_test_cases = [
     (b"&a=b", {b'a': [b'b']}),
     (b"a=a+b&b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
     (b"a=1&a=2", {b'a': [b'1', b'2']}),
+]
+
+parse_qs_test_cases_semicolon = [
     (";", {}),
     (";;", {}),
     (";a=b", {'a': ['b']}),
@@ -69,6 +91,24 @@ parse_qs_test_cases = [
     (b"a=1;a=2", {b'a': [b'1', b'2']}),
 ]
 
+parse_qs_test_cases_legacy = [
+    ("a=1;a=2&a=3", {'a': ['1', '2', '3']}),
+    ("a=1;b=2&c=3", {'a': ['1'], 'b': ['2'], 'c': ['3']}),
+    ("a=1&b=2&c=3;", {'a': ['1'], 'b': ['2'], 'c': ['3']}),
+    (b"a=1;a=2&a=3", {b'a': [b'1', b'2', b'3']}),
+    (b"a=1;b=2&c=3", {b'a': [b'1'], b'b': [b'2'], b'c': [b'3']}),
+    (b"a=1&b=2&c=3;", {b'a': [b'1'], b'b': [b'2'], b'c': [b'3']}),
+]
+
+parse_qs_test_cases_warn = [
+    (";a=b", {';a': ['b']}),
+    ("a=a+b;b=b+c", {'a': ['a b;b=b c']}),
+    (b";a=b", {b';a': [b'b']}),
+    (b"a=a+b;b=b+c", {b'a':[ b'a b;b=b c']}),
+    ("a=1;a=2&a=3", {'a': ['1;a=2', '3']}),
+    (b"a=1;a=2&a=3", {b'a': [b'1;a=2', b'3']}),
+]
+
 class UrlParseTestCase(unittest.TestCase):
 
     def checkRoundtrips(self, url, parsed, split):
@@ -141,6 +181,40 @@ class UrlParseTestCase(unittest.TestCase):
             self.assertEqual(result, expect_without_blanks,
                     "Error parsing %r" % orig)
 
+    def test_qs_default_warn(self):
+        for orig, expect in parse_qs_test_cases_warn:
+            with catch_warnings(record=True) as w:
+                filterwarnings(action='always',
+                                        category=urlparse._QueryStringSeparatorWarning)
+                result = urlparse.parse_qs(orig, keep_blank_values=True)
+                self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 1)
+            self.assertEqual(w[0].category, urlparse._QueryStringSeparatorWarning)
+
+    def test_qsl_default_warn(self):
+        for orig, expect in parse_qsl_test_cases_warn:
+            with catch_warnings(record=True) as w:
+                filterwarnings(action='always',
+                               category=urlparse._QueryStringSeparatorWarning)
+                result = urlparse.parse_qsl(orig, keep_blank_values=True)
+                self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 1)
+            self.assertEqual(w[0].category, urlparse._QueryStringSeparatorWarning)
+
+    def test_default_qs_no_warnings(self):
+        for orig, expect in parse_qs_test_cases:
+            with catch_warnings(record=True) as w:
+                result = urlparse.parse_qs(orig, keep_blank_values=True)
+                self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
+    def test_default_qsl_no_warnings(self):
+        for orig, expect in parse_qsl_test_cases:
+            with catch_warnings(record=True) as w:
+                result = urlparse.parse_qsl(orig, keep_blank_values=True)
+                self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
     def test_roundtrips(self):
         testcases = [
             ('file:///tmp/junk.txt',
@@ -626,6 +700,132 @@ class UrlParseTestCase(unittest.TestCase):
         self.assertEqual(urlparse.urlparse("http://www.python.org:80"),
                 ('http','www.python.org:80','','','',''))
 
+    def test_parse_qs_separator_bytes(self):
+        expected = {b'a': [b'1'], b'b': [b'2']}
+
+        result = urlparse.parse_qs(b'a=1;b=2', separator=b';')
+        self.assertEqual(result, expected)
+        result = urlparse.parse_qs(b'a=1;b=2', separator=';')
+        self.assertEqual(result, expected)
+        result = urlparse.parse_qs('a=1;b=2', separator=';')
+        self.assertEqual(result, {'a': ['1'], 'b': ['2']})
+
+    @contextlib.contextmanager
+    def _qsl_sep_config(self, sep):
+        """Context for the given parse_qsl default separator configured in config file"""
+        old_filename = urlparse._QS_SEPARATOR_CONFIG_FILENAME
+        urlparse._default_qs_separator = None
+        try:
+            tmpdirname = tempfile.mkdtemp()
+            filename = os.path.join(tmpdirname, 'conf.cfg')
+            with open(filename, 'w') as file:
+                file.write('[parse_qs]\n')
+                file.write('PYTHON_URLLIB_QS_SEPARATOR = {}'.format(sep))
+            urlparse._QS_SEPARATOR_CONFIG_FILENAME = filename
+            yield
+        finally:
+            urlparse._QS_SEPARATOR_CONFIG_FILENAME = old_filename
+            urlparse._default_qs_separator = None
+            shutil.rmtree(tmpdirname)
+
+    def test_parse_qs_separator_semicolon(self):
+        for orig, expect in parse_qs_test_cases_semicolon:
+            result = urlparse.parse_qs(orig, separator=';')
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                environ['PYTHON_URLLIB_QS_SEPARATOR'] = ';'
+                result = urlparse.parse_qs(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+            with self._qsl_sep_config(';'), catch_warnings(record=True) as w:
+                result = urlparse.parse_qs(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
+    def test_parse_qsl_separator_semicolon(self):
+        for orig, expect in parse_qsl_test_cases_semicolon:
+            result = urlparse.parse_qsl(orig, separator=';')
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                environ['PYTHON_URLLIB_QS_SEPARATOR'] = ';'
+                result = urlparse.parse_qsl(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+            with self._qsl_sep_config(';'), catch_warnings(record=True) as w:
+                result = urlparse.parse_qsl(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
+    def test_parse_qs_separator_legacy(self):
+        for orig, expect in parse_qs_test_cases_legacy:
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                environ['PYTHON_URLLIB_QS_SEPARATOR'] = 'legacy'
+                result = urlparse.parse_qs(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+            with self._qsl_sep_config('legacy'), catch_warnings(record=True) as w:
+                result = urlparse.parse_qs(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
+    def test_parse_qsl_separator_legacy(self):
+        for orig, expect in parse_qsl_test_cases_legacy:
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                environ['PYTHON_URLLIB_QS_SEPARATOR'] = 'legacy'
+                result = urlparse.parse_qsl(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+            with self._qsl_sep_config('legacy'), catch_warnings(record=True) as w:
+                result = urlparse.parse_qsl(orig)
+            self.assertEqual(result, expect, "Error parsing %r" % orig)
+            self.assertEqual(len(w), 0)
+
+    def test_parse_qs_separator_bad_value_env_or_config(self):
+        for bad_sep in '', 'abc', 'safe', '&;', 'SEP':
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                environ['PYTHON_URLLIB_QS_SEPARATOR'] = bad_sep
+                with self.assertRaises(ValueError):
+                    urlparse.parse_qsl('a=1;b=2')
+            with self._qsl_sep_config('bad_sep'), catch_warnings(record=True) as w:
+                with self.assertRaises(ValueError):
+                    urlparse.parse_qsl('a=1;b=2')
+
+    def test_parse_qs_separator_bad_value_arg(self):
+        for bad_sep in True, {}, '':
+            with self.assertRaises(ValueError):
+                urlparse.parse_qsl('a=1;b=2', separator=bad_sep)
+
+    def test_parse_qs_separator_num_fields(self):
+        for qs, sep in (
+            ('a&b&c', '&'),
+            ('a;b;c', ';'),
+            ('a&b;c', 'legacy'),
+        ):
+            with EnvironmentVarGuard() as environ, catch_warnings(record=True) as w:
+                if sep != 'legacy':
+                    with self.assertRaises(ValueError):
+                        urlparse.parse_qsl(qs, separator=sep, max_num_fields=2)
+                if sep:
+                    environ['PYTHON_URLLIB_QS_SEPARATOR'] = sep
+                with self.assertRaises(ValueError):
+                    urlparse.parse_qsl(qs, max_num_fields=2)
+
+    def test_parse_qs_separator_priority(self):
+        # env variable trumps config file
+        with self._qsl_sep_config('~'), EnvironmentVarGuard() as environ:
+            environ['PYTHON_URLLIB_QS_SEPARATOR'] = '!'
+            result = urlparse.parse_qs('a=1!b=2~c=3')
+            self.assertEqual(result, {'a': ['1'], 'b': ['2~c=3']})
+        # argument trumps config file
+        with self._qsl_sep_config('~'):
+            result = urlparse.parse_qs('a=1$b=2~c=3', separator='$')
+            self.assertEqual(result, {'a': ['1'], 'b': ['2~c=3']})
+        # argument trumps env variable
+        with EnvironmentVarGuard() as environ:
+            environ['PYTHON_URLLIB_QS_SEPARATOR'] = '~'
+            result = urlparse.parse_qs('a=1$b=2~c=3', separator='$')
+            self.assertEqual(result, {'a': ['1'], 'b': ['2~c=3']})
+
     def test_urlsplit_normalization(self):
         # Certain characters should never occur in the netloc,
         # including under normalization.
diff --git a/Lib/urlparse.py b/Lib/urlparse.py
index 798b467b605..69504d8fd93 100644
--- a/Lib/urlparse.py
+++ b/Lib/urlparse.py
@@ -29,6 +29,7 @@ test_urlparse.py provides a good indicator of parsing behavior.
 """
 
 import re
+import os
 
 __all__ = ["urlparse", "urlunparse", "urljoin", "urldefrag",
            "urlsplit", "urlunsplit", "parse_qs", "parse_qsl"]
@@ -382,7 +383,8 @@ def unquote(s):
             append(item)
     return ''.join(res)
 
-def parse_qs(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None):
+def parse_qs(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None,
+             separator=None):
     """Parse a query given as a string argument.
 
         Arguments:
@@ -405,14 +407,23 @@ def parse_qs(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None):
     """
     dict = {}
     for name, value in parse_qsl(qs, keep_blank_values, strict_parsing,
-                                 max_num_fields):
+                                 max_num_fields, separator):
         if name in dict:
             dict[name].append(value)
         else:
             dict[name] = [value]
     return dict
 
-def parse_qsl(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None):
+class _QueryStringSeparatorWarning(RuntimeWarning):
+    """Warning for using default `separator` in parse_qs or parse_qsl"""
+
+# The default "separator" for parse_qsl can be specified in a config file.
+# It's cached after first read.
+_QS_SEPARATOR_CONFIG_FILENAME = '/etc/python/urllib.cfg'
+_default_qs_separator = None
+
+def parse_qsl(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None,
+              separator=None):
     """Parse a query given as a string argument.
 
     Arguments:
@@ -434,15 +445,72 @@ def parse_qsl(qs, keep_blank_values=0, strict_parsing=0, max_num_fields=None):
 
     Returns a list, as G-d intended.
     """
+
+    if (not separator or (not isinstance(separator, (str, bytes)))) and separator is not None:
+        raise ValueError("Separator must be of type string or bytes.")
+
+    # Used when both "&" and ";" act as separators. (Need a non-string value.)
+    _legacy = object()
+
+    if separator is None:
+        global _default_qs_separator
+        separator = _default_qs_separator
+        envvar_name = 'PYTHON_URLLIB_QS_SEPARATOR'
+        if separator is None:
+            # Set default separator from environment variable
+            separator = os.environ.get(envvar_name)
+            config_source = 'environment variable'
+        if separator is None:
+            # Set default separator from the configuration file
+            try:
+                file = open(_QS_SEPARATOR_CONFIG_FILENAME)
+            except EnvironmentError:
+                pass
+            else:
+                with file:
+                    import ConfigParser
+                    config = ConfigParser.ConfigParser()
+                    config.readfp(file)
+                    separator = config.get('parse_qs', envvar_name)
+                    _default_qs_separator = separator
+                config_source = _QS_SEPARATOR_CONFIG_FILENAME
+        if separator is None:
+            # The default is '&', but warn if not specified explicitly
+            if ';' in qs:
+                from warnings import warn
+                warn("The default separator of urlparse.parse_qsl and "
+                    + "parse_qs was changed to '&' to avoid a web cache "
+                    + "poisoning issue (CVE-2021-23336). "
+                    + "By default, semicolons no longer act as query field "
+                    + "separators. "
+                    + "See https://access.redhat.com/articles/5860431 for "
+                    + "more details.",
+                    _QueryStringSeparatorWarning, stacklevel=2)
+            separator = '&'
+        elif separator == 'legacy':
+            separator = _legacy
+        elif len(separator) != 1:
+            raise ValueError(
+                '{} (from {}) must contain '.format(envvar_name, config_source)
+                + '1 character, or "legacy". See '
+                + 'https://access.redhat.com/articles/5860431 for more details.'
+            )
+
     # If max_num_fields is defined then check that the number of fields
     # is less than max_num_fields. This prevents a memory exhaustion DOS
     # attack via post bodies with many fields.
     if max_num_fields is not None:
-        num_fields = 1 + qs.count('&') + qs.count(';')
+        if separator is _legacy:
+            num_fields = 1 + qs.count('&') + qs.count(';')
+        else:
+            num_fields = 1 + qs.count(separator)
         if max_num_fields < num_fields:
             raise ValueError('Max number of fields exceeded')
 
-    pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
+    if separator is _legacy:
+        pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
+    else:
+        pairs = [s1 for s1 in qs.split(separator)]
     r = []
     for name_value in pairs:
         if not name_value and not strict_parsing:
-- 
2.30.2