From d5b9233daf158d2402f6dbc81ea80b07d0fe9003 Mon Sep 17 00:00:00 2001 From: Dmitry Gladkov Date: Fri, 22 Dec 2017 17:20:20 +0300 Subject: [PATCH] prov/verbs: Add support of different CQ formats for the verbs/RDM Signed-off-by: Dmitry Gladkov --- prov/verbs/src/ep_rdm/verbs_cq_ep_rdm.c | 70 +++++++++++++++++++++++++++------ prov/verbs/src/ep_rdm/verbs_rdm.h | 1 - prov/verbs/src/fi_verbs.h | 23 ++++++----- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/prov/verbs/src/ep_rdm/verbs_cq_ep_rdm.c b/prov/verbs/src/ep_rdm/verbs_cq_ep_rdm.c index 5b4065fb..3612851b 100644 --- a/prov/verbs/src/ep_rdm/verbs_cq_ep_rdm.c +++ b/prov/verbs/src/ep_rdm/verbs_cq_ep_rdm.c @@ -51,7 +51,6 @@ static ssize_t fi_ibv_rdm_tagged_cq_readfrom(struct fid_cq *cq, void *buf, { struct fi_ibv_rdm_cq *_cq = container_of(cq, struct fi_ibv_rdm_cq, cq_fid); - struct fi_cq_tagged_entry *entry = buf; struct fi_ibv_rdm_request *cq_entry; size_t ret = 0; @@ -66,13 +65,7 @@ static ssize_t fi_ibv_rdm_tagged_cq_readfrom(struct fid_cq *cq, void *buf, src_addr[ret] = _cq->ep->av->conn_to_addr(_cq->ep, cq_entry->minfo.conn); - entry[ret].op_context = cq_entry->context; - entry[ret].flags = (cq_entry->comp_flags & ~FI_COMPLETION); - entry[ret].len = cq_entry->len; - entry[ret].buf = (cq_entry->comp_flags & FI_TRANSMIT) ? - cq_entry->src_addr : cq_entry->dest_buf; - entry[ret].data = cq_entry->imm; - entry[ret].tag = cq_entry->minfo.tag; + _cq->read_entry(cq_entry, ret, buf); if (cq_entry->state.eager == FI_IBV_STATE_EAGER_READY_TO_FREE) { FI_IBV_RDM_DBG_REQUEST("to_pool: ", cq_entry, @@ -116,7 +109,6 @@ ssize_t fi_ibv_rdm_cq_sreadfrom(struct fid_cq *cq, void *buf, size_t count, ((timeout < 0) ? SIZE_MAX : (fi_gettime_ms() + timeout)); size_t counter = 0; ssize_t ret = 0; - struct fi_cq_tagged_entry *cqe_buf = buf; struct fi_ibv_rdm_cq *_cq = container_of(cq, struct fi_ibv_rdm_cq, cq_fid); switch (_cq->wait_cond) { @@ -130,7 +122,9 @@ ssize_t fi_ibv_rdm_cq_sreadfrom(struct fid_cq *cq, void *buf, size_t count, } do { - ret = fi_ibv_rdm_tagged_cq_readfrom(cq, &cqe_buf[counter], + ret = fi_ibv_rdm_tagged_cq_readfrom(cq, + ((char *)buf + + (counter * _cq->entry_size)), threshold - counter, src_addr); counter += (ret > 0) ? ret : 0; @@ -260,6 +254,50 @@ static struct fi_ops fi_ibv_rdm_cq_fi_ops = { .ops_open = fi_no_ops_open, }; +static void fi_ibv_rdm_cq_read_context_entry(struct fi_ibv_rdm_request *cq_entry, + int i, void *buf) +{ + struct fi_cq_entry *entry = buf; + + entry[i].op_context = cq_entry->context; + } + + static void fi_ibv_rdm_cq_read_msg_entry(struct fi_ibv_rdm_request *cq_entry, + int i, void *buf) + { + struct fi_cq_msg_entry *entry = buf; + + entry[i].op_context = cq_entry->context; + entry[i].flags = (cq_entry->comp_flags & ~FI_COMPLETION); + entry[i].len = cq_entry->len; + } + + static void fi_ibv_rdm_cq_read_data_entry(struct fi_ibv_rdm_request *cq_entry, + int i, void *buf) + { + struct fi_cq_data_entry *entry = buf; + + entry[i].op_context = cq_entry->context; + entry[i].flags = (cq_entry->comp_flags & ~FI_COMPLETION); + entry[i].len = cq_entry->len; + entry[i].buf = (cq_entry->comp_flags & FI_TRANSMIT) ? + cq_entry->src_addr : cq_entry->dest_buf; + entry[i].data = cq_entry->imm; + } + + static void fi_ibv_rdm_cq_read_tagged_entry(struct fi_ibv_rdm_request *cq_entry, + int i, void *buf) + { + struct fi_cq_tagged_entry *entry = buf; + + entry[i].op_context = cq_entry->context; + entry[i].flags = (cq_entry->comp_flags & ~FI_COMPLETION); + entry[i].len = cq_entry->len; + entry[i].buf = (cq_entry->comp_flags & FI_TRANSMIT) ? + cq_entry->src_addr : cq_entry->dest_buf; + entry[i].data = cq_entry->imm; + entry[i].tag = cq_entry->minfo.tag; + } int fi_ibv_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq, void *context) @@ -296,12 +334,22 @@ int fi_ibv_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, _cq->cq_fid.ops = &fi_ibv_rdm_cq_ops; switch (attr->format) { - case FI_CQ_FORMAT_UNSPEC: case FI_CQ_FORMAT_CONTEXT: + _cq->entry_size = sizeof(struct fi_cq_entry); + _cq->read_entry = fi_ibv_rdm_cq_read_context_entry; + break; case FI_CQ_FORMAT_MSG: + _cq->entry_size = sizeof(struct fi_cq_msg_entry); + _cq->read_entry = fi_ibv_rdm_cq_read_msg_entry; + break; case FI_CQ_FORMAT_DATA: + _cq->entry_size = sizeof(struct fi_cq_data_entry); + _cq->read_entry = fi_ibv_rdm_cq_read_data_entry; + break; + case FI_CQ_FORMAT_UNSPEC: case FI_CQ_FORMAT_TAGGED: _cq->entry_size = sizeof(struct fi_cq_tagged_entry); + _cq->read_entry = fi_ibv_rdm_cq_read_tagged_entry; break; default: ret = -FI_ENOSYS; diff --git a/prov/verbs/src/ep_rdm/verbs_rdm.h b/prov/verbs/src/ep_rdm/verbs_rdm.h index 6230c975..71aabea3 100644 --- a/prov/verbs/src/ep_rdm/verbs_rdm.h +++ b/prov/verbs/src/ep_rdm/verbs_rdm.h @@ -309,7 +309,6 @@ struct fi_ibv_rdm_ep { int use_odp; int scq_depth; int rcq_depth; - int cqread_bunch_size; int is_closing; int recv_preposted_threshold; diff --git a/prov/verbs/src/fi_verbs.h b/prov/verbs/src/fi_verbs.h index 43f11255..06becb78 100644 --- a/prov/verbs/src/fi_verbs.h +++ b/prov/verbs/src/fi_verbs.h @@ -257,16 +257,21 @@ struct fi_ibv_cq { struct util_buf_pool *wce_pool; }; +struct fi_ibv_rdm_request; +typedef void (*fi_ibv_rdm_cq_read_entry)(struct fi_ibv_rdm_request *cq_entry, + int index, void *buf); + struct fi_ibv_rdm_cq { - struct fid_cq cq_fid; - struct fi_ibv_domain *domain; - struct fi_ibv_rdm_ep *ep; - struct dlist_entry request_cq; - struct dlist_entry request_errcq; - uint64_t flags; - size_t entry_size; - int read_bunch_size; - enum fi_cq_wait_cond wait_cond; + struct fid_cq cq_fid; + struct fi_ibv_domain *domain; + struct fi_ibv_rdm_ep *ep; + struct dlist_entry request_cq; + struct dlist_entry request_errcq; + uint64_t flags; + size_t entry_size; + fi_ibv_rdm_cq_read_entry read_entry; + int read_bunch_size; + enum fi_cq_wait_cond wait_cond; }; int fi_ibv_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, -- 2.15.GIT