Blame SOURCES/0003-WebSockets-only-poll-IO-stream-when-needed.patch

9513f3
From 35f1bac5ff9ec694e64b65e51f0e7a3226aa3aaf Mon Sep 17 00:00:00 2001
9513f3
From: Carlos Garcia Campos <cgarcia@igalia.com>
9513f3
Date: Wed, 28 Aug 2019 10:51:18 +0200
9513f3
Subject: [PATCH] WebSockets: only poll IO stream when needed
9513f3
9513f3
Instead of having two pollable sources constantly running, always try to
9513f3
read/write without blocking and start polling if the operation returns
9513f3
G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
9513f3
/websocket/direct/close-after-close that was passing but not actually
9513f3
testing what we wanted, because the client close was never sent. When
9513f3
the mutex is released, the frame has been queued, but not sent.
9513f3
9513f3
diff --git libsoup/soup-websocket-connection.c libsoup/soup-websocket-connection.c
9513f3
index 345040fe..6afbbe67 100644
9513f3
--- a/libsoup/soup-websocket-connection.c
9513f3
+++ b/libsoup/soup-websocket-connection.c
9513f3
@@ -147,6 +147,7 @@
9513f3
 };
9513f3
 
9513f3
 #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT   128 * 1024
9513f3
+#define READ_BUFFER_SIZE 1024
9513f3
 
9513f3
 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
9513f3
 
9513f3
@@ -155,6 +156,11 @@
9513f3
 
9513f3
 static void protocol_error_and_close (SoupWebsocketConnection *self);
9513f3
 
9513f3
+static gboolean on_web_socket_input (GObject *pollable_stream,
9513f3
+				     gpointer user_data);
9513f3
+static gboolean on_web_socket_output (GObject *pollable_stream,
9513f3
+				      gpointer user_data);
9513f3
+
9513f3
 /* Code below is based on g_utf8_validate() implementation,
9513f3
  * but handling NULL characters as valid, as expected by
9513f3
  * WebSockets and compliant with RFC 3629.
9513f3
@@ -283,7 +289,20 @@
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
-stop_input (SoupWebsocketConnection *self)
9513f3
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
9513f3
+{
9513f3
+	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
+
9513f3
+	if (pv->input_source)
9513f3
+		return;
9513f3
+
9513f3
+	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
9513f3
+	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
9513f3
+	g_source_attach (pv->input_source, pv->main_context);
9513f3
+}
9513f3
+
9513f3
+static void
9513f3
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
9513f3
 {
9513f3
 	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
 
9513f3
@@ -296,7 +315,20 @@
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
-stop_output (SoupWebsocketConnection *self)
9513f3
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
9513f3
+{
9513f3
+	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
+
9513f3
+	if (pv->output_source)
9513f3
+		return;
9513f3
+
9513f3
+	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
9513f3
+	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
9513f3
+	g_source_attach (pv->output_source, pv->main_context);
9513f3
+}
9513f3
+
9513f3
+static void
9513f3
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
9513f3
 {
9513f3
 	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
 
9513f3
@@ -341,8 +373,8 @@
9513f3
 	close_io_stop_timeout (self);
9513f3
 
9513f3
 	if (!pv->io_closing) {
9513f3
-		stop_input (self);
9513f3
-		stop_output (self);
9513f3
+		soup_websocket_connection_stop_input_source (self);
9513f3
+		soup_websocket_connection_stop_output_source (self);
9513f3
 		pv->io_closing = TRUE;
9513f3
 		g_debug ("closing io stream");
9513f3
 		g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
9513f3
@@ -359,7 +391,7 @@
9513f3
 	GSocket *socket;
9513f3
 	GError *error = NULL;
9513f3
 
9513f3
-	stop_output (self);
9513f3
+	soup_websocket_connection_stop_output_source (self);
9513f3
 
9513f3
 	if (G_IS_SOCKET_CONNECTION (pv->io_stream)) {
9513f3
 		socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream));
9513f3
@@ -612,9 +644,6 @@
9513f3
 		 self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
9513f3
 	         payload_len, self->pv->max_incoming_payload_size);
9513f3
 	emit_error_and_close (self, error, TRUE);
9513f3
-
9513f3
-	/* The input is in an invalid state now */
9513f3
-	stop_input (self);
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
@@ -981,32 +1010,31 @@
9513f3
 		;
9513f3
 }
9513f3
 
9513f3
-static gboolean
9513f3
-on_web_socket_input (GObject *pollable_stream,
9513f3
-		     gpointer user_data)
9513f3
+static void
9513f3
+soup_websocket_connection_read (SoupWebsocketConnection *self)
9513f3
 {
9513f3
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
9513f3
 	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
 	GError *error = NULL;
9513f3
 	gboolean end = FALSE;
9513f3
 	gssize count;
9513f3
 	gsize len;
9513f3
 
9513f3
+	soup_websocket_connection_stop_input_source (self);
9513f3
+
9513f3
 	do {
9513f3
 		len = pv->incoming->len;
9513f3
-		g_byte_array_set_size (pv->incoming, len + 1024);
9513f3
+		g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
9513f3
 
9513f3
 		count = g_pollable_input_stream_read_nonblocking (pv->input,
9513f3
 								  pv->incoming->data + len,
9513f3
-								  1024, NULL, &error);
9513f3
-
9513f3
+								  READ_BUFFER_SIZE, NULL, &error);
9513f3
 		if (count < 0) {
9513f3
 			if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
9513f3
 				g_error_free (error);
9513f3
 				count = 0;
9513f3
 			} else {
9513f3
 				emit_error_and_close (self, error, TRUE);
9513f3
-				return TRUE;
9513f3
+				return;
9513f3
 			}
9513f3
 		} else if (count == 0) {
9513f3
 			end = TRUE;
9513f3
@@ -1026,16 +1054,24 @@
9513f3
 		}
9513f3
 
9513f3
 		close_io_stream (self);
9513f3
+		return;
9513f3
 	}
9513f3
 
9513f3
-	return TRUE;
9513f3
+	soup_websocket_connection_start_input_source (self);
9513f3
 }
9513f3
 
9513f3
 static gboolean
9513f3
-on_web_socket_output (GObject *pollable_stream,
9513f3
-		      gpointer user_data)
9513f3
+on_web_socket_input (GObject *pollable_stream,
9513f3
+		     gpointer user_data)
9513f3
+{
9513f3
+	soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
9513f3
+
9513f3
+	return G_SOURCE_REMOVE;
9513f3
+}
9513f3
+
9513f3
+static void
9513f3
+soup_websocket_connection_write (SoupWebsocketConnection *self)
9513f3
 {
9513f3
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
9513f3
 	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
 	const guint8 *data;
9513f3
 	GError *error = NULL;
9513f3
@@ -1043,19 +1079,18 @@
9513f3
 	gssize count;
9513f3
 	gsize len;
9513f3
 
9513f3
+	soup_websocket_connection_stop_output_source (self);
9513f3
+
9513f3
 	if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
9513f3
 		g_debug ("Ignoring message since the connection is closed");
9513f3
-		stop_output (self);
9513f3
-		return TRUE;
9513f3
+		return;
9513f3
 	}
9513f3
 
9513f3
 	frame = g_queue_peek_head (&pv->outgoing);
9513f3
 
9513f3
 	/* No more frames to send */
9513f3
-	if (frame == NULL) {
9513f3
-		stop_output (self);
9513f3
-		return TRUE;
9513f3
-	}
9513f3
+	if (frame == NULL)
9513f3
+		return;
9513f3
 
9513f3
 	data = g_bytes_get_data (frame->data, &len;;
9513f3
 	g_assert (len > 0);
9513f3
@@ -1075,7 +1110,7 @@
9513f3
 			frame->pending = TRUE;
9513f3
 		} else {
9513f3
 			emit_error_and_close (self, error, TRUE);
9513f3
-			return FALSE;
9513f3
+			return;
9513f3
 		}
9513f3
 	}
9513f3
 
9513f3
@@ -1093,23 +1128,21 @@
9513f3
 			}
9513f3
 		}
9513f3
 		frame_free (frame);
9513f3
+
9513f3
+		if (g_queue_is_empty (&pv->outgoing))
9513f3
+			return;
9513f3
 	}
9513f3
 
9513f3
-	return TRUE;
9513f3
+	soup_websocket_connection_start_output_source (self);
9513f3
 }
9513f3
 
9513f3
-static void
9513f3
-start_output (SoupWebsocketConnection *self)
9513f3
+static gboolean
9513f3
+on_web_socket_output (GObject *pollable_stream,
9513f3
+		      gpointer user_data)
9513f3
 {
9513f3
-	SoupWebsocketConnectionPrivate *pv = self->pv;
9513f3
-
9513f3
-	if (pv->output_source)
9513f3
-		return;
9513f3
+	soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
9513f3
 
9513f3
-	g_debug ("starting output source");
9513f3
-	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
9513f3
-	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
9513f3
-	g_source_attach (pv->output_source, pv->main_context);
9513f3
+	return G_SOURCE_REMOVE;
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
@@ -1150,7 +1183,7 @@
9513f3
 		g_queue_push_tail (&pv->outgoing, frame);
9513f3
 	}
9513f3
 
9513f3
-	start_output (self);
9513f3
+	soup_websocket_connection_write (self);
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
@@ -1175,9 +1208,7 @@
9513f3
 	pv->output = G_POLLABLE_OUTPUT_STREAM (os);
9513f3
 	g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
9513f3
 
9513f3
-	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
9513f3
-	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
9513f3
-	g_source_attach (pv->input_source, pv->main_context);
9513f3
+	soup_websocket_connection_start_input_source (self);
9513f3
 }
9513f3
 
9513f3
 static void
9513f3
diff --git tests/websocket-test.c tests/websocket-test.c
9513f3
index 146fdf82..26d064df 100644
9513f3
--- a/tests/websocket-test.c
9513f3
+++ b/tests/websocket-test.c
9513f3
@@ -733,6 +733,7 @@
9513f3
 	const char frames[] =
9513f3
 		"\x88\x09\x03\xe8""reason1"
9513f3
 		"\x88\x09\x03\xe8""reason2";
9513f3
+	GSocket *socket;
9513f3
 	GError *error = NULL;
9513f3
 
9513f3
 	g_mutex_lock (&test->mutex);
9513f3
@@ -742,7 +743,8 @@
9513f3
 				   frames, sizeof (frames) -1, &written, NULL, &error);
9513f3
 	g_assert_no_error (error);
9513f3
 	g_assert_cmpuint (written, ==, sizeof (frames) - 1);
9513f3
-	g_io_stream_close (test->raw_server, NULL, &error);
9513f3
+	socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
9513f3
+	g_socket_shutdown (socket, FALSE, TRUE, &error);
9513f3
 	g_assert_no_error (error);
9513f3
 
9513f3
 	return NULL;
9513f3
@@ -766,6 +768,7 @@
9513f3
 	WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
9513f3
 	g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL);
9513f3
 	g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
9513f3
+	g_io_stream_close (test->raw_server, NULL, NULL);
9513f3
 }
9513f3
 
9513f3
 static gpointer
9513f3
-- 
9513f3
2.26.2
9513f3