Blob Blame History Raw
From 35f1bac5ff9ec694e64b65e51f0e7a3226aa3aaf Mon Sep 17 00:00:00 2001
From: Carlos Garcia Campos <cgarcia@igalia.com>
Date: Wed, 28 Aug 2019 10:51:18 +0200
Subject: [PATCH] WebSockets: only poll IO stream when needed

Instead of having two pollable sources constantly running, always try to
read/write without blocking and start polling if the operation returns
G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
/websocket/direct/close-after-close that was passing but not actually
testing what we wanted, because the client close was never sent. When
the mutex is released, the frame has been queued, but not sent.

diff --git libsoup/soup-websocket-connection.c libsoup/soup-websocket-connection.c
index 345040fe..6afbbe67 100644
--- a/libsoup/soup-websocket-connection.c
+++ b/libsoup/soup-websocket-connection.c
@@ -147,6 +147,7 @@
 };
 
 #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT   128 * 1024
+#define READ_BUFFER_SIZE 1024
 
 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
 
@@ -155,6 +156,11 @@
 
 static void protocol_error_and_close (SoupWebsocketConnection *self);
 
+static gboolean on_web_socket_input (GObject *pollable_stream,
+				     gpointer user_data);
+static gboolean on_web_socket_output (GObject *pollable_stream,
+				      gpointer user_data);
+
 /* Code below is based on g_utf8_validate() implementation,
  * but handling NULL characters as valid, as expected by
  * WebSockets and compliant with RFC 3629.
@@ -283,7 +289,20 @@
 }
 
 static void
-stop_input (SoupWebsocketConnection *self)
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
+{
+	SoupWebsocketConnectionPrivate *pv = self->pv;
+
+	if (pv->input_source)
+		return;
+
+	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
+	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
+	g_source_attach (pv->input_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
 {
 	SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -296,7 +315,20 @@
 }
 
 static void
-stop_output (SoupWebsocketConnection *self)
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
+{
+	SoupWebsocketConnectionPrivate *pv = self->pv;
+
+	if (pv->output_source)
+		return;
+
+	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
+	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
+	g_source_attach (pv->output_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
 {
 	SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -341,8 +373,8 @@
 	close_io_stop_timeout (self);
 
 	if (!pv->io_closing) {
-		stop_input (self);
-		stop_output (self);
+		soup_websocket_connection_stop_input_source (self);
+		soup_websocket_connection_stop_output_source (self);
 		pv->io_closing = TRUE;
 		g_debug ("closing io stream");
 		g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
@@ -359,7 +391,7 @@
 	GSocket *socket;
 	GError *error = NULL;
 
-	stop_output (self);
+	soup_websocket_connection_stop_output_source (self);
 
 	if (G_IS_SOCKET_CONNECTION (pv->io_stream)) {
 		socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream));
@@ -612,9 +644,6 @@
 		 self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
 	         payload_len, self->pv->max_incoming_payload_size);
 	emit_error_and_close (self, error, TRUE);
-
-	/* The input is in an invalid state now */
-	stop_input (self);
 }
 
 static void
@@ -981,32 +1010,31 @@
 		;
 }
 
-static gboolean
-on_web_socket_input (GObject *pollable_stream,
-		     gpointer user_data)
+static void
+soup_websocket_connection_read (SoupWebsocketConnection *self)
 {
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
 	SoupWebsocketConnectionPrivate *pv = self->pv;
 	GError *error = NULL;
 	gboolean end = FALSE;
 	gssize count;
 	gsize len;
 
+	soup_websocket_connection_stop_input_source (self);
+
 	do {
 		len = pv->incoming->len;
-		g_byte_array_set_size (pv->incoming, len + 1024);
+		g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
 
 		count = g_pollable_input_stream_read_nonblocking (pv->input,
 								  pv->incoming->data + len,
-								  1024, NULL, &error);
-
+								  READ_BUFFER_SIZE, NULL, &error);
 		if (count < 0) {
 			if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
 				g_error_free (error);
 				count = 0;
 			} else {
 				emit_error_and_close (self, error, TRUE);
-				return TRUE;
+				return;
 			}
 		} else if (count == 0) {
 			end = TRUE;
@@ -1026,16 +1054,24 @@
 		}
 
 		close_io_stream (self);
+		return;
 	}
 
-	return TRUE;
+	soup_websocket_connection_start_input_source (self);
 }
 
 static gboolean
-on_web_socket_output (GObject *pollable_stream,
-		      gpointer user_data)
+on_web_socket_input (GObject *pollable_stream,
+		     gpointer user_data)
+{
+	soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
+
+	return G_SOURCE_REMOVE;
+}
+
+static void
+soup_websocket_connection_write (SoupWebsocketConnection *self)
 {
-	SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
 	SoupWebsocketConnectionPrivate *pv = self->pv;
 	const guint8 *data;
 	GError *error = NULL;
@@ -1043,19 +1079,18 @@
 	gssize count;
 	gsize len;
 
+	soup_websocket_connection_stop_output_source (self);
+
 	if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
 		g_debug ("Ignoring message since the connection is closed");
-		stop_output (self);
-		return TRUE;
+		return;
 	}
 
 	frame = g_queue_peek_head (&pv->outgoing);
 
 	/* No more frames to send */
-	if (frame == NULL) {
-		stop_output (self);
-		return TRUE;
-	}
+	if (frame == NULL)
+		return;
 
 	data = g_bytes_get_data (frame->data, &len);
 	g_assert (len > 0);
@@ -1075,7 +1110,7 @@
 			frame->pending = TRUE;
 		} else {
 			emit_error_and_close (self, error, TRUE);
-			return FALSE;
+			return;
 		}
 	}
 
@@ -1093,23 +1128,21 @@
 			}
 		}
 		frame_free (frame);
+
+		if (g_queue_is_empty (&pv->outgoing))
+			return;
 	}
 
-	return TRUE;
+	soup_websocket_connection_start_output_source (self);
 }
 
-static void
-start_output (SoupWebsocketConnection *self)
+static gboolean
+on_web_socket_output (GObject *pollable_stream,
+		      gpointer user_data)
 {
-	SoupWebsocketConnectionPrivate *pv = self->pv;
-
-	if (pv->output_source)
-		return;
+	soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
 
-	g_debug ("starting output source");
-	pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
-	g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
-	g_source_attach (pv->output_source, pv->main_context);
+	return G_SOURCE_REMOVE;
 }
 
 static void
@@ -1150,7 +1183,7 @@
 		g_queue_push_tail (&pv->outgoing, frame);
 	}
 
-	start_output (self);
+	soup_websocket_connection_write (self);
 }
 
 static void
@@ -1175,9 +1208,7 @@
 	pv->output = G_POLLABLE_OUTPUT_STREAM (os);
 	g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
 
-	pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
-	g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
-	g_source_attach (pv->input_source, pv->main_context);
+	soup_websocket_connection_start_input_source (self);
 }
 
 static void
diff --git tests/websocket-test.c tests/websocket-test.c
index 146fdf82..26d064df 100644
--- a/tests/websocket-test.c
+++ b/tests/websocket-test.c
@@ -733,6 +733,7 @@
 	const char frames[] =
 		"\x88\x09\x03\xe8""reason1"
 		"\x88\x09\x03\xe8""reason2";
+	GSocket *socket;
 	GError *error = NULL;
 
 	g_mutex_lock (&test->mutex);
@@ -742,7 +743,8 @@
 				   frames, sizeof (frames) -1, &written, NULL, &error);
 	g_assert_no_error (error);
 	g_assert_cmpuint (written, ==, sizeof (frames) - 1);
-	g_io_stream_close (test->raw_server, NULL, &error);
+	socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
+	g_socket_shutdown (socket, FALSE, TRUE, &error);
 	g_assert_no_error (error);
 
 	return NULL;
@@ -766,6 +768,7 @@
 	WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
 	g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL);
 	g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
+	g_io_stream_close (test->raw_server, NULL, NULL);
 }
 
 static gpointer
-- 
2.26.2