803fb7
From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001
803fb7
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
803fb7
Date: Fri, 13 Mar 2015 00:02:28 -0400
803fb7
Subject: [PATCH] journal-remote: process events without delay
803fb7
803fb7
journal-remote buffers input, and then parses it handling one journal entry at a time.
803fb7
It was possible for useful data to be left in the buffer after some entries were
803fb7
processesed. But all data would be already read from the fd, so there would be
803fb7
no reason for the event loop to call the handler again. After some new data came in,
803fb7
the handler would be called again, and would then process the "old" data in the buffer.
803fb7
803fb7
Fix this by enabling a handler wherever we process input data and do not exhaust data
803fb7
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
803fb7
we encounter EAGAIN.
803fb7
803fb7
Looping over the input data is done in this roundabout way to allow the event loop
803fb7
to dispatch other events in the meanwhile. If the loop was inside the handler, a
803fb7
source which produced data fast enough could completely monopolize the process.
803fb7
803fb7
https://bugs.freedesktop.org/show_bug.cgi?id=89516
803fb7
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
803fb7
---
803fb7
 src/journal-remote/journal-remote-parse.c |  1 +
803fb7
 src/journal-remote/journal-remote-parse.h |  1 +
803fb7
 src/journal-remote/journal-remote.c       | 65 +++++++++++++++++++++++++++----
803fb7
 3 files changed, 59 insertions(+), 8 deletions(-)
803fb7
803fb7
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
803fb7
index 6c096de03..7e6295435 100644
803fb7
--- a/src/journal-remote/journal-remote-parse.c
803fb7
+++ b/src/journal-remote/journal-remote-parse.c
803fb7
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
803fb7
         writer_unref(source->writer);
803fb7
 
803fb7
         sd_event_source_unref(source->event);
803fb7
+        sd_event_source_unref(source->buffer_event);
803fb7
 
803fb7
         free(source);
803fb7
 }
803fb7
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
803fb7
index 22db55091..06a50296a 100644
803fb7
--- a/src/journal-remote/journal-remote-parse.h
803fb7
+++ b/src/journal-remote/journal-remote-parse.h
803fb7
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
803fb7
         Writer *writer;
803fb7
 
803fb7
         sd_event_source *event;
803fb7
+        sd_event_source *buffer_event;
803fb7
 } RemoteSource;
803fb7
 
803fb7
 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
803fb7
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
803fb7
index d1486e7cd..b7cc6d717 100644
803fb7
--- a/src/journal-remote/journal-remote.c
803fb7
+++ b/src/journal-remote/journal-remote.c
803fb7
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
803fb7
                                      int fd,
803fb7
                                      uint32_t revents,
803fb7
                                      void *userdata);
803fb7
+static int dispatch_raw_source_until_block(sd_event_source *event,
803fb7
+                                           void *userdata);
803fb7
 static int dispatch_blocking_source_event(sd_event_source *event,
803fb7
                                           void *userdata);
803fb7
 static int dispatch_raw_connection_event(sd_event_source *event,
803fb7
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
803fb7
 
803fb7
         r = sd_event_add_io(s->events, &source->event,
803fb7
                             fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
803fb7
-                            dispatch_raw_source_event, s);
803fb7
-        if (r == -EPERM) {
803fb7
+                            dispatch_raw_source_event, source);
803fb7
+        if (r == 0) {
803fb7
+                /* Add additional source for buffer processing. It will be
803fb7
+                 * enabled later. */
803fb7
+                r = sd_event_add_defer(s->events, &source->buffer_event,
803fb7
+                                       dispatch_raw_source_until_block, source);
803fb7
+                if (r == 0)
803fb7
+                        sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
803fb7
+        } else if (r == -EPERM) {
803fb7
                 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
803fb7
                 r = sd_event_add_defer(s->events, &source->event,
803fb7
                                        dispatch_blocking_source_event, source);
803fb7
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
803fb7
  **********************************************************************
803fb7
  **********************************************************************/
803fb7
 
803fb7
-static int dispatch_raw_source_event(sd_event_source *event,
803fb7
-                                     int fd,
803fb7
-                                     uint32_t revents,
803fb7
-                                     void *userdata) {
803fb7
+static int handle_raw_source(sd_event_source *event,
803fb7
+                             int fd,
803fb7
+                             uint32_t revents,
803fb7
+                             RemoteServer *s) {
803fb7
 
803fb7
-        RemoteServer *s = userdata;
803fb7
         RemoteSource *source;
803fb7
         int r;
803fb7
 
803fb7
+        /* Returns 1 if there might be more data pending,
803fb7
+         * 0 if data is currently exhausted, negative on error.
803fb7
+         */
803fb7
+
803fb7
         assert(fd >= 0 && fd < (ssize_t) s->sources_size);
803fb7
         source = s->sources[fd];
803fb7
         assert(source->fd == fd);
803fb7
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
803fb7
                 return 1;
803fb7
 }
803fb7
 
803fb7
+static int dispatch_raw_source_until_block(sd_event_source *event,
803fb7
+                                           void *userdata) {
803fb7
+        RemoteSource *source = userdata;
803fb7
+        int r;
803fb7
+
803fb7
+        /* Make sure event stays around even if source is destroyed */
803fb7
+        sd_event_source_ref(event);
803fb7
+
803fb7
+        r = handle_raw_source(event, source->fd, EPOLLIN, server);
803fb7
+        if (r != 1)
803fb7
+                /* No more data for now */
803fb7
+                sd_event_source_set_enabled(event, SD_EVENT_OFF);
803fb7
+
803fb7
+        sd_event_source_unref(event);
803fb7
+
803fb7
+        return r;
803fb7
+}
803fb7
+
803fb7
+static int dispatch_raw_source_event(sd_event_source *event,
803fb7
+                                     int fd,
803fb7
+                                     uint32_t revents,
803fb7
+                                     void *userdata) {
803fb7
+        RemoteSource *source = userdata;
803fb7
+        int r;
803fb7
+
803fb7
+        assert(source->event);
803fb7
+        assert(source->buffer_event);
803fb7
+
803fb7
+        r = handle_raw_source(event, fd, EPOLLIN, server);
803fb7
+        if (r == 1)
803fb7
+                /* Might have more data. We need to rerun the handler
803fb7
+                 * until we are sure the buffer is exhausted. */
803fb7
+                sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
803fb7
+
803fb7
+        return r;
803fb7
+}
803fb7
+
803fb7
 static int dispatch_blocking_source_event(sd_event_source *event,
803fb7
                                           void *userdata) {
803fb7
         RemoteSource *source = userdata;
803fb7
 
803fb7
-        return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
803fb7
+        return handle_raw_source(event, source->fd, EPOLLIN, server);
803fb7
 }
803fb7
 
803fb7
 static int accept_connection(const char* type, int fd,