923a60
From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001
923a60
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
923a60
Date: Fri, 13 Mar 2015 00:02:28 -0400
923a60
Subject: [PATCH] journal-remote: process events without delay
923a60
923a60
journal-remote buffers input, and then parses it handling one journal entry at a time.
923a60
It was possible for useful data to be left in the buffer after some entries were
923a60
processesed. But all data would be already read from the fd, so there would be
923a60
no reason for the event loop to call the handler again. After some new data came in,
923a60
the handler would be called again, and would then process the "old" data in the buffer.
923a60
923a60
Fix this by enabling a handler wherever we process input data and do not exhaust data
923a60
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
923a60
we encounter EAGAIN.
923a60
923a60
Looping over the input data is done in this roundabout way to allow the event loop
923a60
to dispatch other events in the meanwhile. If the loop was inside the handler, a
923a60
source which produced data fast enough could completely monopolize the process.
923a60
923a60
https://bugs.freedesktop.org/show_bug.cgi?id=89516
923a60
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
923a60
---
923a60
 src/journal-remote/journal-remote-parse.c |  1 +
923a60
 src/journal-remote/journal-remote-parse.h |  1 +
923a60
 src/journal-remote/journal-remote.c       | 65 ++++++++++++++++++++---
923a60
 3 files changed, 59 insertions(+), 8 deletions(-)
923a60
923a60
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
923a60
index 6c096de03a..7e62954351 100644
923a60
--- a/src/journal-remote/journal-remote-parse.c
923a60
+++ b/src/journal-remote/journal-remote-parse.c
923a60
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
923a60
         writer_unref(source->writer);
923a60
 
923a60
         sd_event_source_unref(source->event);
923a60
+        sd_event_source_unref(source->buffer_event);
923a60
 
923a60
         free(source);
923a60
 }
923a60
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
923a60
index 22db550913..06a50296a1 100644
923a60
--- a/src/journal-remote/journal-remote-parse.h
923a60
+++ b/src/journal-remote/journal-remote-parse.h
923a60
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
923a60
         Writer *writer;
923a60
 
923a60
         sd_event_source *event;
923a60
+        sd_event_source *buffer_event;
923a60
 } RemoteSource;
923a60
 
923a60
 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
923a60
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
923a60
index d1486e7cda..b7cc6d7172 100644
923a60
--- a/src/journal-remote/journal-remote.c
923a60
+++ b/src/journal-remote/journal-remote.c
923a60
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
923a60
                                      int fd,
923a60
                                      uint32_t revents,
923a60
                                      void *userdata);
923a60
+static int dispatch_raw_source_until_block(sd_event_source *event,
923a60
+                                           void *userdata);
923a60
 static int dispatch_blocking_source_event(sd_event_source *event,
923a60
                                           void *userdata);
923a60
 static int dispatch_raw_connection_event(sd_event_source *event,
923a60
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
923a60
 
923a60
         r = sd_event_add_io(s->events, &source->event,
923a60
                             fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
923a60
-                            dispatch_raw_source_event, s);
923a60
-        if (r == -EPERM) {
923a60
+                            dispatch_raw_source_event, source);
923a60
+        if (r == 0) {
923a60
+                /* Add additional source for buffer processing. It will be
923a60
+                 * enabled later. */
923a60
+                r = sd_event_add_defer(s->events, &source->buffer_event,
923a60
+                                       dispatch_raw_source_until_block, source);
923a60
+                if (r == 0)
923a60
+                        sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
923a60
+        } else if (r == -EPERM) {
923a60
                 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
923a60
                 r = sd_event_add_defer(s->events, &source->event,
923a60
                                        dispatch_blocking_source_event, source);
923a60
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
923a60
  **********************************************************************
923a60
  **********************************************************************/
923a60
 
923a60
-static int dispatch_raw_source_event(sd_event_source *event,
923a60
-                                     int fd,
923a60
-                                     uint32_t revents,
923a60
-                                     void *userdata) {
923a60
+static int handle_raw_source(sd_event_source *event,
923a60
+                             int fd,
923a60
+                             uint32_t revents,
923a60
+                             RemoteServer *s) {
923a60
 
923a60
-        RemoteServer *s = userdata;
923a60
         RemoteSource *source;
923a60
         int r;
923a60
 
923a60
+        /* Returns 1 if there might be more data pending,
923a60
+         * 0 if data is currently exhausted, negative on error.
923a60
+         */
923a60
+
923a60
         assert(fd >= 0 && fd < (ssize_t) s->sources_size);
923a60
         source = s->sources[fd];
923a60
         assert(source->fd == fd);
923a60
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
923a60
                 return 1;
923a60
 }
923a60
 
923a60
+static int dispatch_raw_source_until_block(sd_event_source *event,
923a60
+                                           void *userdata) {
923a60
+        RemoteSource *source = userdata;
923a60
+        int r;
923a60
+
923a60
+        /* Make sure event stays around even if source is destroyed */
923a60
+        sd_event_source_ref(event);
923a60
+
923a60
+        r = handle_raw_source(event, source->fd, EPOLLIN, server);
923a60
+        if (r != 1)
923a60
+                /* No more data for now */
923a60
+                sd_event_source_set_enabled(event, SD_EVENT_OFF);
923a60
+
923a60
+        sd_event_source_unref(event);
923a60
+
923a60
+        return r;
923a60
+}
923a60
+
923a60
+static int dispatch_raw_source_event(sd_event_source *event,
923a60
+                                     int fd,
923a60
+                                     uint32_t revents,
923a60
+                                     void *userdata) {
923a60
+        RemoteSource *source = userdata;
923a60
+        int r;
923a60
+
923a60
+        assert(source->event);
923a60
+        assert(source->buffer_event);
923a60
+
923a60
+        r = handle_raw_source(event, fd, EPOLLIN, server);
923a60
+        if (r == 1)
923a60
+                /* Might have more data. We need to rerun the handler
923a60
+                 * until we are sure the buffer is exhausted. */
923a60
+                sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
923a60
+
923a60
+        return r;
923a60
+}
923a60
+
923a60
 static int dispatch_blocking_source_event(sd_event_source *event,
923a60
                                           void *userdata) {
923a60
         RemoteSource *source = userdata;
923a60
 
923a60
-        return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
923a60
+        return handle_raw_source(event, source->fd, EPOLLIN, server);
923a60
 }
923a60
 
923a60
 static int accept_connection(const char* type, int fd,