|
|
ae2451 |
|
|
|
ae2451 |
# HG changeset patch
|
|
|
ae2451 |
# User Richard Oudkerk <shibturn@gmail.com>
|
|
|
ae2451 |
# Date 1372700728 -3600
|
|
|
ae2451 |
# Node ID bc34fe4a0d58a047509798acb0b4b2a21ce1e375
|
|
|
ae2451 |
# Parent 26ef5d5d5c3ea76ab411f2984d507aadce0ce8d7
|
|
|
ae2451 |
Issue #17097: Make multiprocessing ignore EINTR.
|
|
|
ae2451 |
|
|
|
ae2451 |
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
|
|
|
ae2451 |
--- a/Lib/multiprocessing/connection.py
|
|
|
ae2451 |
+++ b/Lib/multiprocessing/connection.py
|
|
|
ae2451 |
@@ -270,7 +270,14 @@ class SocketListener(object):
|
|
|
ae2451 |
self._unlink = None
|
|
|
ae2451 |
|
|
|
ae2451 |
def accept(self):
|
|
|
ae2451 |
- s, self._last_accepted = self._socket.accept()
|
|
|
ae2451 |
+ while True:
|
|
|
ae2451 |
+ try:
|
|
|
ae2451 |
+ s, self._last_accepted = self._socket.accept()
|
|
|
ae2451 |
+ except socket.error as e:
|
|
|
ae2451 |
+ if e.args[0] != errno.EINTR:
|
|
|
ae2451 |
+ raise
|
|
|
ae2451 |
+ else:
|
|
|
ae2451 |
+ break
|
|
|
ae2451 |
s.setblocking(True)
|
|
|
ae2451 |
fd = duplicate(s.fileno())
|
|
|
ae2451 |
conn = _multiprocessing.Connection(fd)
|
|
|
ae2451 |
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
|
|
|
ae2451 |
--- a/Lib/test/test_multiprocessing.py
|
|
|
ae2451 |
+++ b/Lib/test/test_multiprocessing.py
|
|
|
ae2451 |
@@ -2461,12 +2461,80 @@ class TestForkAwareThreadLock(unittest.T
|
|
|
ae2451 |
self.assertLessEqual(new_size, old_size)
|
|
|
ae2451 |
|
|
|
ae2451 |
#
|
|
|
ae2451 |
+# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
|
|
|
ae2451 |
+#
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+class TestIgnoreEINTR(unittest.TestCase):
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+ @classmethod
|
|
|
ae2451 |
+ def _test_ignore(cls, conn):
|
|
|
ae2451 |
+ def handler(signum, frame):
|
|
|
ae2451 |
+ pass
|
|
|
ae2451 |
+ signal.signal(signal.SIGUSR1, handler)
|
|
|
ae2451 |
+ conn.send('ready')
|
|
|
ae2451 |
+ x = conn.recv()
|
|
|
ae2451 |
+ conn.send(x)
|
|
|
ae2451 |
+ conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
|
|
ae2451 |
+ def test_ignore(self):
|
|
|
ae2451 |
+ conn, child_conn = multiprocessing.Pipe()
|
|
|
ae2451 |
+ try:
|
|
|
ae2451 |
+ p = multiprocessing.Process(target=self._test_ignore,
|
|
|
ae2451 |
+ args=(child_conn,))
|
|
|
ae2451 |
+ p.daemon = True
|
|
|
ae2451 |
+ p.start()
|
|
|
ae2451 |
+ child_conn.close()
|
|
|
ae2451 |
+ self.assertEqual(conn.recv(), 'ready')
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ os.kill(p.pid, signal.SIGUSR1)
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ conn.send(1234)
|
|
|
ae2451 |
+ self.assertEqual(conn.recv(), 1234)
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ os.kill(p.pid, signal.SIGUSR1)
|
|
|
ae2451 |
+ self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ p.join()
|
|
|
ae2451 |
+ finally:
|
|
|
ae2451 |
+ conn.close()
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+ @classmethod
|
|
|
ae2451 |
+ def _test_ignore_listener(cls, conn):
|
|
|
ae2451 |
+ def handler(signum, frame):
|
|
|
ae2451 |
+ pass
|
|
|
ae2451 |
+ signal.signal(signal.SIGUSR1, handler)
|
|
|
ae2451 |
+ l = multiprocessing.connection.Listener()
|
|
|
ae2451 |
+ conn.send(l.address)
|
|
|
ae2451 |
+ a = l.accept()
|
|
|
ae2451 |
+ a.send('welcome')
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
|
|
|
ae2451 |
+ def test_ignore_listener(self):
|
|
|
ae2451 |
+ conn, child_conn = multiprocessing.Pipe()
|
|
|
ae2451 |
+ try:
|
|
|
ae2451 |
+ p = multiprocessing.Process(target=self._test_ignore_listener,
|
|
|
ae2451 |
+ args=(child_conn,))
|
|
|
ae2451 |
+ p.daemon = True
|
|
|
ae2451 |
+ p.start()
|
|
|
ae2451 |
+ child_conn.close()
|
|
|
ae2451 |
+ address = conn.recv()
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ os.kill(p.pid, signal.SIGUSR1)
|
|
|
ae2451 |
+ time.sleep(0.1)
|
|
|
ae2451 |
+ client = multiprocessing.connection.Client(address)
|
|
|
ae2451 |
+ self.assertEqual(client.recv(), 'welcome')
|
|
|
ae2451 |
+ p.join()
|
|
|
ae2451 |
+ finally:
|
|
|
ae2451 |
+ conn.close()
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+#
|
|
|
ae2451 |
#
|
|
|
ae2451 |
#
|
|
|
ae2451 |
|
|
|
ae2451 |
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
|
|
|
ae2451 |
TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
|
|
|
ae2451 |
- TestFlags, TestForkAwareThreadLock]
|
|
|
ae2451 |
+ TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
|
|
|
ae2451 |
|
|
|
ae2451 |
#
|
|
|
ae2451 |
#
|
|
|
ae2451 |
diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c
|
|
|
ae2451 |
--- a/Modules/_multiprocessing/socket_connection.c
|
|
|
ae2451 |
+++ b/Modules/_multiprocessing/socket_connection.c
|
|
|
ae2451 |
@@ -23,6 +23,21 @@
|
|
|
ae2451 |
#endif
|
|
|
ae2451 |
|
|
|
ae2451 |
/*
|
|
|
ae2451 |
+ * Wrapper for PyErr_CheckSignals() which can be called without the GIL
|
|
|
ae2451 |
+ */
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+static int
|
|
|
ae2451 |
+check_signals(void)
|
|
|
ae2451 |
+{
|
|
|
ae2451 |
+ PyGILState_STATE state;
|
|
|
ae2451 |
+ int res;
|
|
|
ae2451 |
+ state = PyGILState_Ensure();
|
|
|
ae2451 |
+ res = PyErr_CheckSignals();
|
|
|
ae2451 |
+ PyGILState_Release(state);
|
|
|
ae2451 |
+ return res;
|
|
|
ae2451 |
+}
|
|
|
ae2451 |
+
|
|
|
ae2451 |
+/*
|
|
|
ae2451 |
* Send string to file descriptor
|
|
|
ae2451 |
*/
|
|
|
ae2451 |
|
|
|
ae2451 |
@@ -34,8 +49,14 @@ static Py_ssize_t
|
|
|
ae2451 |
|
|
|
ae2451 |
while (length > 0) {
|
|
|
ae2451 |
res = WRITE(h, p, length);
|
|
|
ae2451 |
- if (res < 0)
|
|
|
ae2451 |
+ if (res < 0) {
|
|
|
ae2451 |
+ if (errno == EINTR) {
|
|
|
ae2451 |
+ if (check_signals() < 0)
|
|
|
ae2451 |
+ return MP_EXCEPTION_HAS_BEEN_SET;
|
|
|
ae2451 |
+ continue;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
return MP_SOCKET_ERROR;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
length -= res;
|
|
|
ae2451 |
p += res;
|
|
|
ae2451 |
}
|
|
|
ae2451 |
@@ -56,12 +77,16 @@ static Py_ssize_t
|
|
|
ae2451 |
|
|
|
ae2451 |
while (remaining > 0) {
|
|
|
ae2451 |
temp = READ(h, p, remaining);
|
|
|
ae2451 |
- if (temp <= 0) {
|
|
|
ae2451 |
- if (temp == 0)
|
|
|
ae2451 |
- return remaining == length ?
|
|
|
ae2451 |
- MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
|
|
|
ae2451 |
- else
|
|
|
ae2451 |
- return temp;
|
|
|
ae2451 |
+ if (temp < 0) {
|
|
|
ae2451 |
+ if (errno == EINTR) {
|
|
|
ae2451 |
+ if (check_signals() < 0)
|
|
|
ae2451 |
+ return MP_EXCEPTION_HAS_BEEN_SET;
|
|
|
ae2451 |
+ continue;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
+ return temp;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
+ else if (temp == 0) {
|
|
|
ae2451 |
+ return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
|
|
|
ae2451 |
}
|
|
|
ae2451 |
remaining -= temp;
|
|
|
ae2451 |
p += temp;
|
|
|
ae2451 |
@@ -171,9 +196,16 @@ conn_poll(ConnectionObject *conn, double
|
|
|
ae2451 |
p.revents = 0;
|
|
|
ae2451 |
|
|
|
ae2451 |
if (timeout < 0) {
|
|
|
ae2451 |
- res = poll(&p, 1, -1);
|
|
|
ae2451 |
+ do {
|
|
|
ae2451 |
+ res = poll(&p, 1, -1);
|
|
|
ae2451 |
+ } while (res < 0 && errno == EINTR);
|
|
|
ae2451 |
} else {
|
|
|
ae2451 |
res = poll(&p, 1, (int)(timeout * 1000 + 0.5));
|
|
|
ae2451 |
+ if (res < 0 && errno == EINTR) {
|
|
|
ae2451 |
+ /* We were interrupted by a signal. Just indicate a
|
|
|
ae2451 |
+ timeout even though we are early. */
|
|
|
ae2451 |
+ return FALSE;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
}
|
|
|
ae2451 |
|
|
|
ae2451 |
if (res < 0) {
|
|
|
ae2451 |
@@ -209,12 +241,19 @@ conn_poll(ConnectionObject *conn, double
|
|
|
ae2451 |
FD_SET((SOCKET)conn->handle, &rfds);
|
|
|
ae2451 |
|
|
|
ae2451 |
if (timeout < 0.0) {
|
|
|
ae2451 |
- res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
|
|
|
ae2451 |
+ do {
|
|
|
ae2451 |
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
|
|
|
ae2451 |
+ } while (res < 0 && errno == EINTR);
|
|
|
ae2451 |
} else {
|
|
|
ae2451 |
struct timeval tv;
|
|
|
ae2451 |
tv.tv_sec = (long)timeout;
|
|
|
ae2451 |
tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
|
|
|
ae2451 |
res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv;;
|
|
|
ae2451 |
+ if (res < 0 && errno == EINTR) {
|
|
|
ae2451 |
+ /* We were interrupted by a signal. Just indicate a
|
|
|
ae2451 |
+ timeout even though we are early. */
|
|
|
ae2451 |
+ return FALSE;
|
|
|
ae2451 |
+ }
|
|
|
ae2451 |
}
|
|
|
ae2451 |
|
|
|
ae2451 |
if (res < 0) {
|
|
|
ae2451 |
|