写一个JXIO的发包代码追踪,一边整理一边贴。贴出的代码都不是完整代码,只按逻辑选择重要部分贴出。
/JXIO/examples/org/accelio/jxio/helloworld/HelloClient.java
1 2 3 4 5 6
| ClientSession client; EventQueueHandler eqh; Msg msg = this.mp.getMsg(); msg.getOut().put("Hello Server".getBytes("UTF-8")); client.sendRequest(msg); eqh.run();
|
/JXIO/src/java/org/accelio/jxio/ClientSession.java
1 2 3 4 5 6 7 8
| public void sendRequest(Msg msg){ int ret = Bridge.clientSendReq(this.getId(), msg.getId(), msg.getOut().position(), in_size, msg.getIsMirror()); if(ret > 0){ 表示出现了一些什么问题 } msg.setClientSession(this); eqh.addMsgInUse(msg); }
|
/JXIO/src/java/org/accelio/jxio/impl/Bridge.java
这是一个JNI(Java Naive Interface)的桥,本文只关心发包部分。
1 2 3 4 5
| private static native int clientSendReqNative(long ptrSession, long ptrMsg, int out_size, int in_size, boolean is_mirror); public static int clientSendReq(final long ptrSession, final long ptrMsg, final int out_size, final int in_size, final boolean is_mirror) { return clientSendReqNative(ptrSession, ptrMsg, out_size, in_size, is_mirror); }
|
/JXIO/src/c/src/Bridge.cc
1 2 3 4 5 6
| extern "C" JNIEXPORT jint JNICALL Java_org_accelio_jxio_impl_Bridge_clientSendReqNative(JNIEnv *env, jclass cls, jlong ptr_session, jlong ptr_msg, jint out_size, jint in_size, jboolean is_mirror) { Msg * msg = (Msg*) ptr_msg; Client * client = (Client*)ptr_session; return client->send_msg(msg, out_size, in_size, is_mirror); }
|
/JXIO/src/c/src/Client.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| int Client::send_msg(Msg *msg, const int out_size, const int in_size, const bool is_mirror) { struct xio_msg *xio_msg; if (is_mirror) xio_msg = msg->get_mirror_xio_msg(); else xio_msg = msg->get_xio_msg(); msg->set_xio_msg_out_size(xio_msg, out_size); msg->set_xio_msg_in_size(xio_msg, in_size); int ret_val = xio_send_request(this->con, xio_msg); if (ret_val) { CLIENT_LOG_TRACE("Error in sending xio_msg: '%s' (%d) (ret_val=%d)", xio_strerror(xio_errno()), xio_errno(), ret_val); return xio_errno(); } return 0; }
|
/accelio/include/xio_user.h(xio_kernel.h)
上一段代码中的xio_msg在这个头文件中定义。这是一段accelio中的代码。
暂时先不解释这个数据结构,先mark着。
/accelio/src/common/xio_connection.c
以下代码按照逻辑顺序排列,在文件中的顺序应该反过来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| int xio_send_request(struct xio_connection *connection, struct xio_msg *msg) { xio_msg_list_insert_tail(&reqs_msgq, pmsg, pdata); xio_connection_xmit(connection) } static int xio_connection_xmit(struct xio_connection *connection) { retval = xio_connection_xmit_inl(connection, msgq1, in_flight_msgq1, flush_msgq1, &retry_cnt); } static inline int xio_connection_xmit_inl( struct xio_connection *connection, struct xio_msg_list *msgq, struct xio_msg_list *in_flight_msgq, void (*flush_msgq)(struct xio_connection *, enum xio_status), int *retry_cnt) { retval = xio_connection_send(connection, msg); xio_msg_list_remove(msgq, msg, pdata); } int xio_connection_send(struct xio_connection *connection, struct xio_msg *msg) { retval = xio_nexus_send(connection->nexus, task); }
|
/accelio/src/common/xio_nexus.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int xio_nexus_send(struct xio_nexus *nexus, struct xio_task *task) { int retval = xio_nexus_xmit(nexus); } static int xio_nexus_xmit(struct xio_nexus *nexus) { while(1){ if (list_empty(&nexus->tx_queue)) break; retval = nexus->transport->send(nexus->transport_hndl, task); if(retval != 0) { break; } } return retval; }
|
接下来追踪一下nexus->transport->send究竟是在什么地方初始化的。
由于这个函数指针的源头来自connection->nexus->transport->send,我们来看一看这个connection是如何初始化的。
————————–关于connection的初始化问题——————————–
在/JXIO/src/c/src/Client.cc的构造函数中,有这样一句this->con = xio_connect(&cparams);我们来找一下这个xio_connect函数。
/accelio/src/common/xio_session_client.c
1 2 3 4 5 6 7 8 9
| struct xio_connection *xio_connect(struct xio_connection_params *cparams) { nexus = xio_nexus_open(ctx, portal, &session->observer, session->session_id, attr_mask, pattr); retval = xio_nexus_connect(nexus, portal, &session->observer, cparams->out_addr); }
|
/accelio/src/common/xio_nexus.c
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct xio_nexus *xio_nexus_open(struct xio_context *ctx, const char *portal_uri, struct xio_observer *observer, uint32_t oid, uint32_t attr_mask, struct xio_nexus_init_attr *init_attr)
{ transport = xio_get_transport(proto); nexus->transport_hndl = transport->open(transport, ctx, &nexus->trans_observer, nexus->trans_attr_mask, ptrans_init_attr); }
|
————————–mark一下好像这个问题并没有解决—————————-
虽然我们并不知道connection->nexus->transport->send函数在哪里被赋值,但是我们猜测这是根据下层的硬件设备是走TCP还是RDMA有着直接的关系。这里我们需要分情况进行讨论。本文只讨论RDMA部分。根据这篇文章的说明,这个send调用的发包函数是xio_rdma_send_req,接下来我们从这个函数开始继续追踪。
/accelio/src/usr/transport/rdma/xio_rdma_datapath.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| static int xio_rdma_send_req(struct xio_rdma_transport *rdma_hndl, struct xio_task *task) { if (must_send) { retval = xio_rdma_xmit(rdma_hndl); if (retval) { if (xio_errno() != EAGAIN) { ERROR_LOG("xio_rdma_xmit failed\n"); return -1; } retval = 0; } } } static int xio_rdma_xmit(struct xio_rdma_transport *rdma_hndl) { retval = xio_post_send(rdma_hndl, first_wr, req_nr); } static int xio_post_send(struct xio_rdma_transport *rdma_hndl, struct xio_work_req *xio_send, int num_send_reqs) { retval = ibv_post_send(rdma_hndl->qp, &xio_send->send_wr, &bad_wr); }
|
就是这货!ibv_post_send。发送数据包最终调用的是这个函数。
现在问题来了,根据对ibv_post_send函数的解释,这个函数支持很多rdma的操作,如下表:
但这不是问题,在/accelio/src/usr/transport/rdma/xio_rdma_management.c中,有一句rdma_hndl->beacon.opcode = IBV_WR_SEND;这是在QP(Queue Pair)创建的时候就订好的。以后可以试试看可不可以改成其它选项。
至此,发包的逻辑就已经告一段落了,我们收包再见。