| |
| # HG changeset patch |
| # User Benjamin Peterson <benjamin@python.org> |
| # Date 1417827918 18000 |
| # Node ID 923aac88a3cc76a95d5a04d9d3ece245147a8064 |
| # Parent 339f877cca115c1901f5dd93d7bc066031d2a669 |
| smtplib: limit amount read from the network (closes #16042) |
| |
| diff --git a/Lib/smtplib.py b/Lib/smtplib.py |
| |
| |
| @@ -57,6 +57,7 @@ from sys import stderr |
| SMTP_PORT = 25 |
| SMTP_SSL_PORT = 465 |
| CRLF = "\r\n" |
| +_MAXLINE = 8192 # more than 8 times larger than RFC 821, 4.5.3 |
| |
| OLDSTYLE_AUTH = re.compile(r"auth=(.*)", re.I) |
| |
| @@ -179,10 +180,14 @@ else: |
| def __init__(self, sslobj): |
| self.sslobj = sslobj |
| |
| - def readline(self): |
| + def readline(self, size=-1): |
| + if size < 0: |
| + size = None |
| str = "" |
| chr = None |
| while chr != "\n": |
| + if size is not None and len(str) >= size: |
| + break |
| chr = self.sslobj.read(1) |
| if not chr: |
| break |
| @@ -353,7 +358,7 @@ class SMTP: |
| self.file = self.sock.makefile('rb') |
| while 1: |
| try: |
| - line = self.file.readline() |
| + line = self.file.readline(_MAXLINE + 1) |
| except socket.error as e: |
| self.close() |
| raise SMTPServerDisconnected("Connection unexpectedly closed: " |
| @@ -363,6 +368,8 @@ class SMTP: |
| raise SMTPServerDisconnected("Connection unexpectedly closed") |
| if self.debuglevel > 0: |
| print>>stderr, 'reply:', repr(line) |
| + if len(line) > _MAXLINE: |
| + raise SMTPResponseException(500, "Line too long.") |
| resp.append(line[4:].strip()) |
| code = line[:3] |
| # Check that the error code is syntactically correct. |
| diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py |
| |
| |
| @@ -292,6 +292,33 @@ class BadHELOServerTests(unittest.TestCa |
| HOST, self.port, 'localhost', 3) |
| |
| |
| +@unittest.skipUnless(threading, 'Threading required for this test.') |
| +class TooLongLineTests(unittest.TestCase): |
| + respdata = '250 OK' + ('.' * smtplib._MAXLINE * 2) + '\n' |
| + |
| + def setUp(self): |
| + self.old_stdout = sys.stdout |
| + self.output = StringIO.StringIO() |
| + sys.stdout = self.output |
| + |
| + self.evt = threading.Event() |
| + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| + self.sock.settimeout(15) |
| + self.port = test_support.bind_port(self.sock) |
| + servargs = (self.evt, self.respdata, self.sock) |
| + threading.Thread(target=server, args=servargs).start() |
| + self.evt.wait() |
| + self.evt.clear() |
| + |
| + def tearDown(self): |
| + self.evt.wait() |
| + sys.stdout = self.old_stdout |
| + |
| + def testLineTooLong(self): |
| + self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, |
| + HOST, self.port, 'localhost', 3) |
| + |
| + |
| sim_users = {'Mr.A@somewhere.com':'John A', |
| 'Ms.B@somewhere.com':'Sally B', |
| 'Mrs.C@somewhereesle.com':'Ruth C', |
| @@ -526,7 +553,8 @@ class SMTPSimTests(unittest.TestCase): |
| def test_main(verbose=None): |
| test_support.run_unittest(GeneralTests, DebuggingServerTests, |
| NonConnectingTests, |
| - BadHELOServerTests, SMTPSimTests) |
| + BadHELOServerTests, SMTPSimTests, |
| + TooLongLineTests) |
| |
| if __name__ == '__main__': |
| test_main() |