写一个JXIO的发包代码追踪,一边整理一边贴。贴出的代码都不是完整代码,只按逻辑选择重要部分贴出。

/JXIO/examples/org/accelio/jxio/helloworld/HelloClient.java

1
2
3
4
5
6
ClientSession client;
EventQueueHandler eqh;
Msg msg = this.mp.getMsg();
msg.getOut().put("Hello Server".getBytes("UTF-8"));
client.sendRequest(msg); //发送请求
eqh.run();

/JXIO/src/java/org/accelio/jxio/ClientSession.java

1
2
3
4
5
6
7
8
public void sendRequest(Msg msg){
int ret = Bridge.clientSendReq(this.getId(), msg.getId(), msg.getOut().position(), in_size, msg.getIsMirror()); //发送请求
if(ret > 0){
表示出现了一些什么问题
}
msg.setClientSession(this);
eqh.addMsgInUse(msg);
}

/JXIO/src/java/org/accelio/jxio/impl/Bridge.java

这是一个JNI(Java Naive Interface)的桥,本文只关心发包部分。

1
2
3
4
5
private static native int clientSendReqNative(long ptrSession, long ptrMsg, int out_size, int in_size, boolean is_mirror);
public static int clientSendReq(final long ptrSession, final long ptrMsg, final int out_size, final int in_size, final boolean is_mirror) {
return clientSendReqNative(ptrSession, ptrMsg, out_size, in_size, is_mirror);
}

/JXIO/src/c/src/Bridge.cc

1
2
3
4
5
6
extern "C" JNIEXPORT jint JNICALL Java_org_accelio_jxio_impl_Bridge_clientSendReqNative(JNIEnv *env, jclass cls, jlong ptr_session, jlong ptr_msg, jint out_size, jint in_size, jboolean is_mirror)
{
Msg * msg = (Msg*) ptr_msg;
Client * client = (Client*)ptr_session;
return client->send_msg(msg, out_size, in_size, is_mirror); //发送请求
}

/JXIO/src/c/src/Client.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int Client::send_msg(Msg *msg, const int out_size, const int in_size, const bool is_mirror)
{
struct xio_msg *xio_msg;
if (is_mirror)
xio_msg = msg->get_mirror_xio_msg();
else
xio_msg = msg->get_xio_msg();
msg->set_xio_msg_out_size(xio_msg, out_size);
msg->set_xio_msg_in_size(xio_msg, in_size);
int ret_val = xio_send_request(this->con, xio_msg); //发送请求
if (ret_val) {
CLIENT_LOG_TRACE("Error in sending xio_msg: '%s' (%d) (ret_val=%d)", xio_strerror(xio_errno()), xio_errno(), ret_val);
return xio_errno();
}
return 0;
}

/accelio/include/xio_user.h(xio_kernel.h)
上一段代码中的xio_msg在这个头文件中定义。这是一段accelio中的代码。
暂时先不解释这个数据结构,先mark着。

/accelio/src/common/xio_connection.c

以下代码按照逻辑顺序排列,在文件中的顺序应该反过来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int xio_send_request(struct xio_connection *connection,
struct xio_msg *msg)
{
xio_msg_list_insert_tail(&reqs_msgq, pmsg, pdata);
xio_connection_xmit(connection)
}
static int xio_connection_xmit(struct xio_connection *connection)
{
retval = xio_connection_xmit_inl(connection,
msgq1, in_flight_msgq1,
flush_msgq1,
&retry_cnt);
}
static inline int xio_connection_xmit_inl(
struct xio_connection *connection,
struct xio_msg_list *msgq,
struct xio_msg_list *in_flight_msgq,
void (*flush_msgq)(struct xio_connection *, enum xio_status),
int *retry_cnt)
{
retval = xio_connection_send(connection, msg);
xio_msg_list_remove(msgq, msg, pdata);
}
int xio_connection_send(struct xio_connection *connection,
struct xio_msg *msg)
{
retval = xio_nexus_send(connection->nexus, task);
}

/accelio/src/common/xio_nexus.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int xio_nexus_send(struct xio_nexus *nexus, struct xio_task *task)
{
int retval = xio_nexus_xmit(nexus);
}
static int xio_nexus_xmit(struct xio_nexus *nexus)
{
while(1){
if (list_empty(&nexus->tx_queue)) break;
retval = nexus->transport->send(nexus->transport_hndl, task); //在哪里初始化的呢?
if(retval != 0)
{
//处理一些error?
break;
}
}
return retval;
}

接下来追踪一下nexus->transport->send究竟是在什么地方初始化的。
由于这个函数指针的源头来自connection->nexus->transport->send,我们来看一看这个connection是如何初始化的。

————————–关于connection的初始化问题——————————–

在/JXIO/src/c/src/Client.cc的构造函数中,有这样一句this->con = xio_connect(&cparams);我们来找一下这个xio_connect函数。

/accelio/src/common/xio_session_client.c

1
2
3
4
5
6
7
8
9
struct xio_connection *xio_connect(struct xio_connection_params *cparams)
{
nexus = xio_nexus_open(ctx, portal, &session->observer,
session->session_id,
attr_mask, pattr);
retval = xio_nexus_connect(nexus, portal,
&session->observer,
cparams->out_addr);
}

/accelio/src/common/xio_nexus.c

1
2
3
4
5
6
7
8
9
10
11
12
13
struct xio_nexus *xio_nexus_open(struct xio_context *ctx,
const char *portal_uri,
struct xio_observer *observer, uint32_t oid,
uint32_t attr_mask,
struct xio_nexus_init_attr *init_attr)
{
transport = xio_get_transport(proto); //这里面的transport->init又是哪里来的?
nexus->transport_hndl = transport->open(transport, ctx,
&nexus->trans_observer,
nexus->trans_attr_mask,
ptrans_init_attr);
}

————————–mark一下好像这个问题并没有解决—————————-

虽然我们并不知道connection->nexus->transport->send函数在哪里被赋值,但是我们猜测这是根据下层的硬件设备是走TCP还是RDMA有着直接的关系。这里我们需要分情况进行讨论。本文只讨论RDMA部分。根据这篇文章的说明,这个send调用的发包函数是xio_rdma_send_req,接下来我们从这个函数开始继续追踪。

/accelio/src/usr/transport/rdma/xio_rdma_datapath.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int xio_rdma_send_req(struct xio_rdma_transport *rdma_hndl,
struct xio_task *task)
{
//之前省略了为RDMA response准备buffer,还有把消息放到队列里等内容
if (must_send) {
retval = xio_rdma_xmit(rdma_hndl);
if (retval) {
if (xio_errno() != EAGAIN) {
ERROR_LOG("xio_rdma_xmit failed\n");
return -1;
}
retval = 0;
}
}
}
static int xio_rdma_xmit(struct xio_rdma_transport *rdma_hndl)
{
retval = xio_post_send(rdma_hndl, first_wr, req_nr);
}
static int xio_post_send(struct xio_rdma_transport *rdma_hndl,
struct xio_work_req *xio_send,
int num_send_reqs)
{
retval = ibv_post_send(rdma_hndl->qp, &xio_send->send_wr, &bad_wr);
}

就是这货!ibv_post_send。发送数据包最终调用的是这个函数。

现在问题来了,根据对ibv_post_send函数的解释,这个函数支持很多rdma的操作,如下表:

ibv_post_send_op

但这不是问题,在/accelio/src/usr/transport/rdma/xio_rdma_management.c中,有一句rdma_hndl->beacon.opcode = IBV_WR_SEND;这是在QP(Queue Pair)创建的时候就订好的。以后可以试试看可不可以改成其它选项。

至此,发包的逻辑就已经告一段落了,我们收包再见。