commit 175f2b06805bb79d6994c5c909b741c8dcba3528
Author: Sultan Alsawaf <sultan@openresty.com>
Date: Fri Jan 22 15:19:33 2021 -0800
runtime: utilize relay subbufs as much as possible
The relay subbufs used for printing are used very inefficiently, causing
print messages to be frequently dropped. The cause for this inefficiency
is that every time a print flush occurs, the current subbuf is switched
out even if it isn't filled. We can't wait for a subbuf to fill up
before switching it out either, or messages will be delayed.
To remedy this, we instead check to see if there's any data in any
subbuf and use that as an indicator to staprun to tell if there's data
available to read. Then when staprun attempts to read the data out, we
can switch out the current subbuf if it has data in it. This lets us
squeeze out every bit of storage from the subbufs.
Any print drops experienced after this patch should be fixed by
increasing the subbuf count (_stp_nsubbufs).
diff --git a/runtime/transport/procfs.c b/runtime/transport/procfs.c
index a365db6f2..97a6e123a 100644
--- a/runtime/transport/procfs.c
+++ b/runtime/transport/procfs.c
@@ -235,14 +235,16 @@ static int _stp_procfs_transport_fs_init(const char *module_name)
{
#ifdef STAPCONF_PROC_OPS
relay_procfs_operations.proc_open = relay_file_operations.open;
- relay_procfs_operations.proc_poll = relay_file_operations.poll;
+ relay_procfs_operations.proc_poll = __stp_relay_file_poll;
relay_procfs_operations.proc_mmap = relay_file_operations.mmap;
- relay_procfs_operations.proc_read = relay_file_operations.read;
+ relay_procfs_operations.proc_read = __stp_relay_file_read;
relay_procfs_operations.proc_lseek = relay_file_operations.llseek;
relay_procfs_operations.proc_release = relay_file_operations.release;
#else
relay_procfs_operations = relay_file_operations;
relay_procfs_operations.owner = THIS_MODULE;
+ relay_procfs_operations.poll = __stp_relay_file_poll;
+ relay_procfs_operations.read = __stp_relay_file_read;
#endif
if (_stp_mkdir_proc_module()) // get the _stp_procfs_module_dir* created
diff --git a/runtime/transport/relay_v2.c b/runtime/transport/relay_v2.c
index 27729f4c8..3b505c992 100644
--- a/runtime/transport/relay_v2.c
+++ b/runtime/transport/relay_v2.c
@@ -77,14 +77,6 @@ static size_t __stp_relay_switch_subbuf(struct rchan_buf *buf, size_t length)
buf->dentry->d_inode->i_size += buf->chan->subbuf_size -
buf->padding[old_subbuf];
smp_mb();
- if (waitqueue_active(&buf->read_wait))
- /*
- * Calling wake_up_interruptible() and __mod_timer()
- * from here will deadlock if we happen to be logging
- * from the scheduler and timer (trying to re-grab
- * rq->lock/timer->base->lock), so just set a flag.
- */
- atomic_set(&_stp_relay_data.wakeup, 1);
}
old = buf->data;
@@ -101,10 +93,56 @@ static size_t __stp_relay_switch_subbuf(struct rchan_buf *buf, size_t length)
return length;
}
+static bool __stp_relay_buf_empty(struct rchan_buf *buf)
+{
+ return !buf->offset && buf->subbufs_produced == buf->subbufs_consumed;
+}
+
+static unsigned int __stp_relay_file_poll(struct file *filp, poll_table *wait)
+{
+ struct rchan_buf *buf = filp->private_data;
+
+ if (buf->finalized)
+ return POLLERR;
+
+ if (filp->f_mode & FMODE_READ) {
+ poll_wait(filp, &buf->read_wait, wait);
+ if (!__stp_relay_buf_empty(buf))
+ return POLLIN | POLLRDNORM;
+ }
+
+ return 0;
+}
+
+static ssize_t __stp_relay_file_read(struct file *filp, char __user *buffer,
+ size_t count, loff_t *ppos)
+{
+ struct rchan_buf *buf = filp->private_data;
+ unsigned long flags;
+
+ /* This trylock will only fail after the print driver is destroyed */
+ if (_stp_print_trylock_irqsave(&flags)) {
+ /* Switch out the current buffer if it has any data */
+ if (buf->offset)
+ __stp_relay_switch_subbuf(buf, 0);
+ _stp_print_unlock_irqrestore(&flags);
+ }
+
+ /*
+ * Optimization: bail out early if there's nothing to read. This is
+ * faster than going into relay's read() function without having any
+ * data, as it will acquire a mutex lock for the inode before bailing.
+ */
+ if (buf->subbufs_produced == buf->subbufs_consumed)
+ return -ENODATA;
+
+ return relay_file_operations.read(filp, buffer, count, ppos);
+}
+
static void __stp_relay_wakeup_readers(struct rchan_buf *buf)
{
if (buf && waitqueue_active(&buf->read_wait) &&
- buf->subbufs_produced != buf->subbufs_consumed)
+ !__stp_relay_buf_empty(buf))
wake_up_interruptible(&buf->read_wait);
}
@@ -112,10 +150,9 @@ static void __stp_relay_wakeup_timer(stp_timer_callback_parameter_t unused)
{
int i;
- if (atomic_read(&_stp_relay_data.wakeup)) {
+ if (atomic_cmpxchg(&_stp_relay_data.wakeup, 1, 0)) {
struct rchan_buf *buf;
- atomic_set(&_stp_relay_data.wakeup, 0);
for_each_possible_cpu(i) {
buf = _stp_get_rchan_subbuf(_stp_relay_data.rchan->buf,
i);
@@ -266,6 +303,8 @@ static int _stp_transport_data_fs_init(void)
}
relay_file_operations_w_owner = relay_file_operations;
relay_file_operations_w_owner.owner = THIS_MODULE;
+ relay_file_operations_w_owner.poll = __stp_relay_file_poll;
+ relay_file_operations_w_owner.read = __stp_relay_file_read;
#if (RELAYFS_CHANNEL_VERSION >= 7)
_stp_relay_data.rchan = relay_open("trace", _stp_get_module_dir(),
_stp_subbuf_size, _stp_nsubbufs,
@@ -323,10 +362,12 @@ _stp_data_write_reserve(size_t size_request, void **entry)
buf = _stp_get_rchan_subbuf(_stp_relay_data.rchan->buf,
smp_processor_id());
- if (unlikely(buf->offset + size_request > buf->chan->subbuf_size)) {
+ if (buf->offset >= buf->chan->subbuf_size) {
size_request = __stp_relay_switch_subbuf(buf, size_request);
if (!size_request)
return 0;
+ } else if (buf->offset + size_request > buf->chan->subbuf_size) {
+ size_request = buf->chan->subbuf_size - buf->offset;
}
*entry = (char*)buf->data + buf->offset;
buf->offset += size_request;
@@ -342,10 +383,6 @@ static unsigned char *_stp_data_entry_data(void *entry)
static int _stp_data_write_commit(void *entry)
{
- struct rchan_buf *buf;
-
- buf = _stp_get_rchan_subbuf(_stp_relay_data.rchan->buf,
- smp_processor_id());
- __stp_relay_switch_subbuf(buf, 0);
+ atomic_set(&_stp_relay_data.wakeup, 1);
return 0;
}