|
|
1ff636 |
From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001
|
|
|
1ff636 |
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
|
|
|
1ff636 |
Date: Fri, 13 Mar 2015 00:02:28 -0400
|
|
|
1ff636 |
Subject: [PATCH] journal-remote: process events without delay
|
|
|
1ff636 |
|
|
|
1ff636 |
journal-remote buffers input, and then parses it handling one journal entry at a time.
|
|
|
1ff636 |
It was possible for useful data to be left in the buffer after some entries were
|
|
|
1ff636 |
processesed. But all data would be already read from the fd, so there would be
|
|
|
1ff636 |
no reason for the event loop to call the handler again. After some new data came in,
|
|
|
1ff636 |
the handler would be called again, and would then process the "old" data in the buffer.
|
|
|
1ff636 |
|
|
|
1ff636 |
Fix this by enabling a handler wherever we process input data and do not exhaust data
|
|
|
1ff636 |
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
|
|
|
1ff636 |
we encounter EAGAIN.
|
|
|
1ff636 |
|
|
|
1ff636 |
Looping over the input data is done in this roundabout way to allow the event loop
|
|
|
1ff636 |
to dispatch other events in the meanwhile. If the loop was inside the handler, a
|
|
|
1ff636 |
source which produced data fast enough could completely monopolize the process.
|
|
|
1ff636 |
|
|
|
1ff636 |
https://bugs.freedesktop.org/show_bug.cgi?id=89516
|
|
|
1ff636 |
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
|
|
|
1ff636 |
---
|
|
|
1ff636 |
src/journal-remote/journal-remote-parse.c | 1 +
|
|
|
1ff636 |
src/journal-remote/journal-remote-parse.h | 1 +
|
|
|
23b3cf |
src/journal-remote/journal-remote.c | 65 ++++++++++++++++++++---
|
|
|
1ff636 |
3 files changed, 59 insertions(+), 8 deletions(-)
|
|
|
1ff636 |
|
|
|
1ff636 |
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
|
|
|
181b3f |
index 6c096de03..7e6295435 100644
|
|
|
1ff636 |
--- a/src/journal-remote/journal-remote-parse.c
|
|
|
1ff636 |
+++ b/src/journal-remote/journal-remote-parse.c
|
|
|
1ff636 |
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
|
|
|
1ff636 |
writer_unref(source->writer);
|
|
|
1ff636 |
|
|
|
1ff636 |
sd_event_source_unref(source->event);
|
|
|
1ff636 |
+ sd_event_source_unref(source->buffer_event);
|
|
|
1ff636 |
|
|
|
1ff636 |
free(source);
|
|
|
1ff636 |
}
|
|
|
1ff636 |
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
|
|
|
181b3f |
index 22db55091..06a50296a 100644
|
|
|
1ff636 |
--- a/src/journal-remote/journal-remote-parse.h
|
|
|
1ff636 |
+++ b/src/journal-remote/journal-remote-parse.h
|
|
|
1ff636 |
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
|
|
|
1ff636 |
Writer *writer;
|
|
|
1ff636 |
|
|
|
1ff636 |
sd_event_source *event;
|
|
|
1ff636 |
+ sd_event_source *buffer_event;
|
|
|
1ff636 |
} RemoteSource;
|
|
|
1ff636 |
|
|
|
1ff636 |
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
|
|
|
1ff636 |
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
|
|
|
181b3f |
index d1486e7cd..b7cc6d717 100644
|
|
|
1ff636 |
--- a/src/journal-remote/journal-remote.c
|
|
|
1ff636 |
+++ b/src/journal-remote/journal-remote.c
|
|
|
1ff636 |
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
|
|
|
1ff636 |
int fd,
|
|
|
1ff636 |
uint32_t revents,
|
|
|
1ff636 |
void *userdata);
|
|
|
1ff636 |
+static int dispatch_raw_source_until_block(sd_event_source *event,
|
|
|
1ff636 |
+ void *userdata);
|
|
|
1ff636 |
static int dispatch_blocking_source_event(sd_event_source *event,
|
|
|
1ff636 |
void *userdata);
|
|
|
1ff636 |
static int dispatch_raw_connection_event(sd_event_source *event,
|
|
|
1ff636 |
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
|
|
|
1ff636 |
|
|
|
1ff636 |
r = sd_event_add_io(s->events, &source->event,
|
|
|
1ff636 |
fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
|
|
|
1ff636 |
- dispatch_raw_source_event, s);
|
|
|
1ff636 |
- if (r == -EPERM) {
|
|
|
1ff636 |
+ dispatch_raw_source_event, source);
|
|
|
1ff636 |
+ if (r == 0) {
|
|
|
1ff636 |
+ /* Add additional source for buffer processing. It will be
|
|
|
1ff636 |
+ * enabled later. */
|
|
|
1ff636 |
+ r = sd_event_add_defer(s->events, &source->buffer_event,
|
|
|
1ff636 |
+ dispatch_raw_source_until_block, source);
|
|
|
1ff636 |
+ if (r == 0)
|
|
|
1ff636 |
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
|
|
|
1ff636 |
+ } else if (r == -EPERM) {
|
|
|
1ff636 |
log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
|
|
|
1ff636 |
r = sd_event_add_defer(s->events, &source->event,
|
|
|
1ff636 |
dispatch_blocking_source_event, source);
|
|
|
1ff636 |
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
|
|
|
1ff636 |
**********************************************************************
|
|
|
1ff636 |
**********************************************************************/
|
|
|
1ff636 |
|
|
|
1ff636 |
-static int dispatch_raw_source_event(sd_event_source *event,
|
|
|
1ff636 |
- int fd,
|
|
|
1ff636 |
- uint32_t revents,
|
|
|
1ff636 |
- void *userdata) {
|
|
|
1ff636 |
+static int handle_raw_source(sd_event_source *event,
|
|
|
1ff636 |
+ int fd,
|
|
|
1ff636 |
+ uint32_t revents,
|
|
|
1ff636 |
+ RemoteServer *s) {
|
|
|
1ff636 |
|
|
|
1ff636 |
- RemoteServer *s = userdata;
|
|
|
1ff636 |
RemoteSource *source;
|
|
|
1ff636 |
int r;
|
|
|
1ff636 |
|
|
|
1ff636 |
+ /* Returns 1 if there might be more data pending,
|
|
|
1ff636 |
+ * 0 if data is currently exhausted, negative on error.
|
|
|
1ff636 |
+ */
|
|
|
1ff636 |
+
|
|
|
1ff636 |
assert(fd >= 0 && fd < (ssize_t) s->sources_size);
|
|
|
1ff636 |
source = s->sources[fd];
|
|
|
1ff636 |
assert(source->fd == fd);
|
|
|
1ff636 |
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
|
|
|
1ff636 |
return 1;
|
|
|
1ff636 |
}
|
|
|
1ff636 |
|
|
|
1ff636 |
+static int dispatch_raw_source_until_block(sd_event_source *event,
|
|
|
1ff636 |
+ void *userdata) {
|
|
|
1ff636 |
+ RemoteSource *source = userdata;
|
|
|
1ff636 |
+ int r;
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ /* Make sure event stays around even if source is destroyed */
|
|
|
1ff636 |
+ sd_event_source_ref(event);
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ r = handle_raw_source(event, source->fd, EPOLLIN, server);
|
|
|
1ff636 |
+ if (r != 1)
|
|
|
1ff636 |
+ /* No more data for now */
|
|
|
1ff636 |
+ sd_event_source_set_enabled(event, SD_EVENT_OFF);
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ sd_event_source_unref(event);
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ return r;
|
|
|
1ff636 |
+}
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+static int dispatch_raw_source_event(sd_event_source *event,
|
|
|
1ff636 |
+ int fd,
|
|
|
1ff636 |
+ uint32_t revents,
|
|
|
1ff636 |
+ void *userdata) {
|
|
|
1ff636 |
+ RemoteSource *source = userdata;
|
|
|
1ff636 |
+ int r;
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ assert(source->event);
|
|
|
1ff636 |
+ assert(source->buffer_event);
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ r = handle_raw_source(event, fd, EPOLLIN, server);
|
|
|
1ff636 |
+ if (r == 1)
|
|
|
1ff636 |
+ /* Might have more data. We need to rerun the handler
|
|
|
1ff636 |
+ * until we are sure the buffer is exhausted. */
|
|
|
1ff636 |
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
|
|
|
1ff636 |
+
|
|
|
1ff636 |
+ return r;
|
|
|
1ff636 |
+}
|
|
|
1ff636 |
+
|
|
|
1ff636 |
static int dispatch_blocking_source_event(sd_event_source *event,
|
|
|
1ff636 |
void *userdata) {
|
|
|
1ff636 |
RemoteSource *source = userdata;
|
|
|
1ff636 |
|
|
|
1ff636 |
- return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
|
|
|
1ff636 |
+ return handle_raw_source(event, source->fd, EPOLLIN, server);
|
|
|
1ff636 |
}
|
|
|
1ff636 |
|
|
|
1ff636 |
static int accept_connection(const char* type, int fd,
|