Blame SOURCES/0003-WebSockets-only-poll-IO-stream-when-needed.patch

0a2b47
From 35f1bac5ff9ec694e64b65e51f0e7a3226aa3aaf Mon Sep 17 00:00:00 2001
0a2b47
From: Carlos Garcia Campos <cgarcia@igalia.com>
0a2b47
Date: Wed, 28 Aug 2019 10:51:18 +0200
0a2b47
Subject: [PATCH] WebSockets: only poll IO stream when needed
0a2b47
0a2b47
Instead of having two pollable sources constantly running, always try to
0a2b47
read/write without blocking and start polling if the operation returns
0a2b47
G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
0a2b47
/websocket/direct/close-after-close that was passing but not actually
0a2b47
testing what we wanted, because the client close was never sent. When
0a2b47
the mutex is released, the frame has been queued, but not sent.
0a2b47
0a2b47
diff --git libsoup/soup-websocket-connection.c libsoup/soup-websocket-connection.c
0a2b47
index 345040fe..6afbbe67 100644
0a2b47
--- a/libsoup/soup-websocket-connection.c
0a2b47
+++ b/libsoup/soup-websocket-connection.c
0a2b47
@@ -147,6 +147,7 @@
0a2b47
 };
0a2b47
 
0a2b47
 #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT   128 * 1024
0a2b47
+#define READ_BUFFER_SIZE 1024
0a2b47
 
0a2b47
 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
0a2b47
 
0a2b47
@@ -155,6 +156,11 @@
0a2b47
 
0a2b47
 static void protocol_error_and_close (SoupWebsocketConnection *self);
0a2b47
 
0a2b47
+static gboolean on_web_socket_input (GObject *pollable_stream,
0a2b47
+				     gpointer user_data);
0a2b47
+static gboolean on_web_socket_output (GObject *pollable_stream,
0a2b47
+				      gpointer user_data);
0a2b47
+
0a2b47
 /* Code below is based on g_utf8_validate() implementation,
0a2b47
  * but handling NULL characters as valid, as expected by
0a2b47
  * WebSockets and compliant with RFC 3629.
0a2b47
@@ -283,7 +289,20 @@
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
-stop_input (SoupWebsocketConnection *self)
0a2b47
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
0a2b47
+{
0a2b47
+	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
+
0a2b47
+	if (pv->input_source)
0a2b47
+		return;
0a2b47
+
0a2b47
+	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
0a2b47
+	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
0a2b47
+	g_source_attach (pv->input_source, pv->main_context);
0a2b47
+}
0a2b47
+
0a2b47
+static void
0a2b47
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
0a2b47
 {
0a2b47
 	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
 
0a2b47
@@ -296,7 +315,20 @@
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
-stop_output (SoupWebsocketConnection *self)
0a2b47
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
0a2b47
+{
0a2b47
+	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
+
0a2b47
+	if (pv->output_source)
0a2b47
+		return;
0a2b47
+
0a2b47
+	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
0a2b47
+	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
0a2b47
+	g_source_attach (pv->output_source, pv->main_context);
0a2b47
+}
0a2b47
+
0a2b47
+static void
0a2b47
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
0a2b47
 {
0a2b47
 	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
 
0a2b47
@@ -341,8 +373,8 @@
0a2b47
 	close_io_stop_timeout (self);
0a2b47
 
0a2b47
 	if (!pv->io_closing) {
0a2b47
-		stop_input (self);
0a2b47
-		stop_output (self);
0a2b47
+		soup_websocket_connection_stop_input_source (self);
0a2b47
+		soup_websocket_connection_stop_output_source (self);
0a2b47
 		pv->io_closing = TRUE;
0a2b47
 		g_debug ("closing io stream");
0a2b47
 		g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
0a2b47
@@ -359,7 +391,7 @@
0a2b47
 	GSocket *socket;
0a2b47
 	GError *error = NULL;
0a2b47
 
0a2b47
-	stop_output (self);
0a2b47
+	soup_websocket_connection_stop_output_source (self);
0a2b47
 
0a2b47
 	if (G_IS_SOCKET_CONNECTION (pv->io_stream)) {
0a2b47
 		socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream));
0a2b47
@@ -612,9 +644,6 @@
0a2b47
 		 self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
0a2b47
 	         payload_len, self->pv->max_incoming_payload_size);
0a2b47
 	emit_error_and_close (self, error, TRUE);
0a2b47
-
0a2b47
-	/* The input is in an invalid state now */
0a2b47
-	stop_input (self);
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
@@ -981,32 +1010,31 @@
0a2b47
 		;
0a2b47
 }
0a2b47
 
0a2b47
-static gboolean
0a2b47
-on_web_socket_input (GObject *pollable_stream,
0a2b47
-		     gpointer user_data)
0a2b47
+static void
0a2b47
+soup_websocket_connection_read (SoupWebsocketConnection *self)
0a2b47
 {
0a2b47
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
0a2b47
 	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
 	GError *error = NULL;
0a2b47
 	gboolean end = FALSE;
0a2b47
 	gssize count;
0a2b47
 	gsize len;
0a2b47
 
0a2b47
+	soup_websocket_connection_stop_input_source (self);
0a2b47
+
0a2b47
 	do {
0a2b47
 		len = pv->incoming->len;
0a2b47
-		g_byte_array_set_size (pv->incoming, len + 1024);
0a2b47
+		g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
0a2b47
 
0a2b47
 		count = g_pollable_input_stream_read_nonblocking (pv->input,
0a2b47
 								  pv->incoming->data + len,
0a2b47
-								  1024, NULL, &error);
0a2b47
-
0a2b47
+								  READ_BUFFER_SIZE, NULL, &error);
0a2b47
 		if (count < 0) {
0a2b47
 			if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
0a2b47
 				g_error_free (error);
0a2b47
 				count = 0;
0a2b47
 			} else {
0a2b47
 				emit_error_and_close (self, error, TRUE);
0a2b47
-				return TRUE;
0a2b47
+				return;
0a2b47
 			}
0a2b47
 		} else if (count == 0) {
0a2b47
 			end = TRUE;
0a2b47
@@ -1026,16 +1054,24 @@
0a2b47
 		}
0a2b47
 
0a2b47
 		close_io_stream (self);
0a2b47
+		return;
0a2b47
 	}
0a2b47
 
0a2b47
-	return TRUE;
0a2b47
+	soup_websocket_connection_start_input_source (self);
0a2b47
 }
0a2b47
 
0a2b47
 static gboolean
0a2b47
-on_web_socket_output (GObject *pollable_stream,
0a2b47
-		      gpointer user_data)
0a2b47
+on_web_socket_input (GObject *pollable_stream,
0a2b47
+		     gpointer user_data)
0a2b47
+{
0a2b47
+	soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
0a2b47
+
0a2b47
+	return G_SOURCE_REMOVE;
0a2b47
+}
0a2b47
+
0a2b47
+static void
0a2b47
+soup_websocket_connection_write (SoupWebsocketConnection *self)
0a2b47
 {
0a2b47
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
0a2b47
 	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
 	const guint8 *data;
0a2b47
 	GError *error = NULL;
0a2b47
@@ -1043,19 +1079,18 @@
0a2b47
 	gssize count;
0a2b47
 	gsize len;
0a2b47
 
0a2b47
+	soup_websocket_connection_stop_output_source (self);
0a2b47
+
0a2b47
 	if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
0a2b47
 		g_debug ("Ignoring message since the connection is closed");
0a2b47
-		stop_output (self);
0a2b47
-		return TRUE;
0a2b47
+		return;
0a2b47
 	}
0a2b47
 
0a2b47
 	frame = g_queue_peek_head (&pv->outgoing);
0a2b47
 
0a2b47
 	/* No more frames to send */
0a2b47
-	if (frame == NULL) {
0a2b47
-		stop_output (self);
0a2b47
-		return TRUE;
0a2b47
-	}
0a2b47
+	if (frame == NULL)
0a2b47
+		return;
0a2b47
 
0a2b47
 	data = g_bytes_get_data (frame->data, &len;;
0a2b47
 	g_assert (len > 0);
0a2b47
@@ -1075,7 +1110,7 @@
0a2b47
 			frame->pending = TRUE;
0a2b47
 		} else {
0a2b47
 			emit_error_and_close (self, error, TRUE);
0a2b47
-			return FALSE;
0a2b47
+			return;
0a2b47
 		}
0a2b47
 	}
0a2b47
 
0a2b47
@@ -1093,23 +1128,21 @@
0a2b47
 			}
0a2b47
 		}
0a2b47
 		frame_free (frame);
0a2b47
+
0a2b47
+		if (g_queue_is_empty (&pv->outgoing))
0a2b47
+			return;
0a2b47
 	}
0a2b47
 
0a2b47
-	return TRUE;
0a2b47
+	soup_websocket_connection_start_output_source (self);
0a2b47
 }
0a2b47
 
0a2b47
-static void
0a2b47
-start_output (SoupWebsocketConnection *self)
0a2b47
+static gboolean
0a2b47
+on_web_socket_output (GObject *pollable_stream,
0a2b47
+		      gpointer user_data)
0a2b47
 {
0a2b47
-	SoupWebsocketConnectionPrivate *pv = self->pv;
0a2b47
-
0a2b47
-	if (pv->output_source)
0a2b47
-		return;
0a2b47
+	soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
0a2b47
 
0a2b47
-	g_debug ("starting output source");
0a2b47
-	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
0a2b47
-	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
0a2b47
-	g_source_attach (pv->output_source, pv->main_context);
0a2b47
+	return G_SOURCE_REMOVE;
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
@@ -1150,7 +1183,7 @@
0a2b47
 		g_queue_push_tail (&pv->outgoing, frame);
0a2b47
 	}
0a2b47
 
0a2b47
-	start_output (self);
0a2b47
+	soup_websocket_connection_write (self);
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
@@ -1175,9 +1208,7 @@
0a2b47
 	pv->output = G_POLLABLE_OUTPUT_STREAM (os);
0a2b47
 	g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
0a2b47
 
0a2b47
-	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
0a2b47
-	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
0a2b47
-	g_source_attach (pv->input_source, pv->main_context);
0a2b47
+	soup_websocket_connection_start_input_source (self);
0a2b47
 }
0a2b47
 
0a2b47
 static void
0a2b47
diff --git tests/websocket-test.c tests/websocket-test.c
0a2b47
index 146fdf82..26d064df 100644
0a2b47
--- a/tests/websocket-test.c
0a2b47
+++ b/tests/websocket-test.c
0a2b47
@@ -733,6 +733,7 @@
0a2b47
 	const char frames[] =
0a2b47
 		"\x88\x09\x03\xe8""reason1"
0a2b47
 		"\x88\x09\x03\xe8""reason2";
0a2b47
+	GSocket *socket;
0a2b47
 	GError *error = NULL;
0a2b47
 
0a2b47
 	g_mutex_lock (&test->mutex);
0a2b47
@@ -742,7 +743,8 @@
0a2b47
 				   frames, sizeof (frames) -1, &written, NULL, &error);
0a2b47
 	g_assert_no_error (error);
0a2b47
 	g_assert_cmpuint (written, ==, sizeof (frames) - 1);
0a2b47
-	g_io_stream_close (test->raw_server, NULL, &error);
0a2b47
+	socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
0a2b47
+	g_socket_shutdown (socket, FALSE, TRUE, &error);
0a2b47
 	g_assert_no_error (error);
0a2b47
 
0a2b47
 	return NULL;
0a2b47
@@ -766,6 +768,7 @@
0a2b47
 	WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
0a2b47
 	g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL);
0a2b47
 	g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
0a2b47
+	g_io_stream_close (test->raw_server, NULL, NULL);
0a2b47
 }
0a2b47
 
0a2b47
 static gpointer
0a2b47
-- 
0a2b47
2.26.2
0a2b47