KCP 源码剖析
阅读本文之前,如果没听说过 KCP ,或者一点都不了解 KCP,麻烦抽一点时间先看看 KCP 项目的说明文档:传送门。本文的目的是深入 KCP 的实现细节去理解 KCP 。
什么是 KCP
KCP 是一个快速可靠协议,能够以比 TCP 更低的延迟来传送数据,数据重传更快,等待时间更短。
TCP是为流量设计的(每秒内可以传输多少KB的数据),讲究的是充分利用带宽。而 KCP是为流速设计的(单个数据包从一端发送到一端需要多少时间),以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度。TCP信道是一条流速很慢,但每秒流量很大的大运河,而KCP是水流湍急的小激流
以上是 KCP 文档上面写的,关键词是带宽和流速,KCP 会损耗带宽,带来的好处是更大更均衡的传输速率。更多的说明参考 KCP 自身的文档。
KCP 数据结构
KCP 源码在 ikcp.h
和 ikcp.c
里面,ikcp.h
核心的是数据结构的声明,首先是 SEGMENT
数据包,是 KCP 协议处理数据的最小单位:
SEGMENT 结构(点击展开代码)
//=====================================================================
// SEGMENT 一个 SETMENT 就是一个数据包
//=====================================================================
struct IKCPSEG
{
// 链表节点,发送和接受队列都是这里的链表的结构
struct IQUEUEHEAD node;
// 会话编号,同一个会话编号相同
IUINT32 conv;
// 数据包类型,譬如 DATA 或者 ACK
IUINT32 cmd;
// 由于 MTU 的限制,大数据包会拆分成多个小数据包,这个是小数据包的编号
IUINT32 frg
// 每个数据包,都会附带上发送方的接受窗口大小
IUINT32 wnd;
// 发送时间,如果是 ACK 包,会设置为源数据包的 ts
IUINT32 ts;
// 唯一标识数据包的编号
IUINT32 sn;
// 代表小于 una 的数据包都接收成功,跟 TCP 含义一致:oldest unacknowledged sequence number SND
IUINT32 una;
// 数据长度
IUINT32 len;
// 超时重传时间
IUINT32 resendts;
// 下次超时等待时间
IUINT32 rto;
// 快速重传,收到本数据包之后的数据包的数量,大于一定数量就触发快速重传
IUINT32 fastack;
// 发送次数
IUINT32 xmit;
// 数据
char data[1];
};
看完 SEGMENT
的注释,大致能看出 KCP 的核心也是一个 ARQ 协议,通过自动超时重传来保证数据的送达。接着再来看看 KCP 结构 KCPCB
的定义:
KCP 结构(点击展开代码)
//---------------------------------------------------------------------
// IKCPCB
//---------------------------------------------------------------------
struct IKCPCB
{
// conv: 会话编号
// mtu, mss: 最大传输单元,最大报文段大小
// state: 会话状态,0 有效,-1 断开
IUINT32 conv, mtu, mss, state;
// snd_una: 等待 ACK 的包编号
// snd_nxt: 下一个等待发送的数据包编号
// rcv_nxt: 下一个等待接收的数据包编号
IUINT32 snd_una, snd_nxt, rcv_nxt;
// ts_recent, ts_lastack: 未用到
// ssthresh: 拥塞控制慢启动阈值
IUINT32 ts_recent, ts_lastack, ssthresh;
// rx_rto: rto (retransmission timeout),超时重传时间
// rx_rttval, rx_srtt, rx_minrto: 计算 rto 的中间变量
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
// snd_wnd, rcv_wnd: 最大发送和接收窗口大小
// rmt_wnd: remote wnd ,对端剩余接受窗口大小
// cwnd: 可发送窗口大小
// probe: 是否要发送控制报文的标志
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
// current: 当前时间
// interval: 更新间隔
// ts_flush: 下次需要更新的时间
// xmit: 发送失败次数
IUINT32 current, interval, ts_flush, xmit;
// 对应链表的长度
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
// nodelay: 控制超时重传的 rto 增长速度
// updated: 是否调用过 ikcp_update
IUINT32 nodelay, updated;
// ts_probe, probe_wait: 对端接收窗口长时间为 0 时主动定期发起询问
IUINT32 ts_probe, probe_wait;
// deal_link: 对端长时间无应答
// incr: 参与计算发送窗口大小
IUINT32 dead_link, incr;
// queue: 跟用户层接触的数据包
// buf: 协议缓存的数据包
struct IQUEUEHEAD snd_queue;
struct IQUEUEHEAD rcv_queue;
struct IQUEUEHEAD snd_buf;
struct IQUEUEHEAD rcv_buf;
// 需要发送 ack 的数据包信息
IUINT32 *acklist;
// 需要 ack 的包数量
IUINT32 ackcount;
// acklist 内存大小
IUINT32 ackblock;
// 用户层传进来的数据
void *user;
// 存放一个 kcp 包的空间
char *buffer;
// 触发快速重传的 fastack 次数
int fastresend;
// 快速重传最大次数
int fastlimit;
// nocwnd: 不考虑慢启动的发送窗口大小
// stream: 流模式
int nocwnd, stream;
// debug log
int logmask;
// 发送数据接口
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};
逐一把 KCP 结构里面的字段注释上,可以初步感觉到,整套 KCP 的协议不太复杂,细细去分析代码,你我都能读懂并理解 KCP 协议
KCP 的 ARQ 实现
KCP 本质上是一个 ARQ (Auto Repeat-reQuest,自动重传) 协议,最基本的是要保证可靠的传输。那么我们可以先来关注 KCP 的基本 ARQ 部分,KCP 是怎么实现可靠传输的。
ARQ 顾名思义,当我们认为对端接收数据包失败时,自动重新发送对应的数据包,它是通过确认接收和超时重传两个机制,来实现可靠传输。具体的代码实现上, KCP 给每个数据包(就是上一节提到的 SEGMENT
) 分配唯一的 sn
标识符,一旦对端接收到数据包,会回复一个 ACK 包(同样是 SEGMENT
),ACK 包的 sn
跟接收到的数据包 sn
相同,通知接收到此数据包已经接收成功。SEGMENT
上还有一个 una
字段,表示下一个期待接收的数据包的编号,换句话说,即是所有在该编号之前的数据包都已经接收完,相当于一个全量的 ACK 包,发送端可以更快的更新发送缓冲和发送窗口。
我们可以通过跟踪 KCP 包的发送和接受代码,来理解最基本的 ARQ 实现:
发送
发送的过程是 ikcp_send
-> ikcp_update
-> ikcp_output
,上层调用 ikcp_send
把数据传给 KCP,KCP 在 ikcp_update
中处理数据的发送。
ikcp_send(点击展开代码)
//---------------------------------------------------------------------
// 发送数据接口,用户调用 ikcp_send 来让 kcp 发送数据
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;
// mss 不能小于1
assert(kcp->mss > 0);
if (len < 0) return -1;
// append to previous segment in streaming mode (if possible)
if (kcp->stream != 0) {
// 处理流模式
// ......
}
// 计算分包,如果数据长度 len 大于 mss,需要分成多个包发送,对端接受到之后再拼起来
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
if (count >= (int)IKCP_WND_RCV) return -2;
if (count == 0) count = 1;
// 分包
for (i = 0; i < count; i++) {
// 计算包的数据长度,并分配对应的 seg 结构
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
// 设置 seg 的 数据信息,frg 表示分包编号
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
// 加到 snd_queue 的末尾,nsnd_qua 加一
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}
return 0;
}
ikcp_send
是由 KCP 的上层来调用的发送数据接口,所有让 KCP 发送的数据,都应该通过这个接口。 ikcp_send
做的事情很简单,主要就是把数据,根据 kcp->mss
(一个包最大数据长度)来分成多个包,并设置分包编号,最后放到发送链表 snd_queue
的末尾。流模式就是把多次调用 ikcp_send
的数据都看成一个流,会先自动填充未满的 SEGMENT
再分配新的,详细实现本文不讨论,感兴趣的,相信看完本文,再对应看看代码就能理解。
ikcp_send
调用完成之后,数据放在的 KCP 的 snd_queue
中,那么后面 KCP 需要找个时机,把待发送的数据发送出去,这块代码都放在 ikcp_update
和 ikcp_flush
里面:
ikcp_update(点击展开代码)
//---------------------------------------------------------------------
// ikcp_update 是给上层定期调用的接口,用来更新 kcp 的状态,发送数据
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;
kcp->current = current;
// ikcp_flush 会检查这个,上层必须调用过 ikcp_update 才能调用 ikcp_flush,建议只使用 ikcp_update
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
slap = _itimediff(kcp->current, kcp->ts_flush);
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
if (slap >= 0) {
// 下次 flush 的时间
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}
ikcp_update
做的事情很简单,判断一下 ts_flush
的时间,符合条件则调用 ikcp_flush
,主要的处理逻辑都在 ikcp_flush
里面了,因为 ikcp_flush
内容复杂一点,我们目前只关注跟 ARQ 发送相关的部分:
发送数据(点击展开代码)
//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current;
// buffer 是要传给 ikcp_output 的数据,初始化为 3 倍数据包大小
char *buffer = kcp->buffer;
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0;
int lost = 0;
IKCPSEG seg;
// 'ikcp_update' haven't been called.
if (kcp->updated == 0) return;
seg.conv = kcp->conv;
seg.cmd = IKCP_CMD_ACK;
seg.frg = 0;
// seg.wnd 是表示当前可接收窗口大小
seg.wnd = ikcp_wnd_unused(kcp);
seg.una = kcp->rcv_nxt;
seg.len = 0;
seg.sn = 0;
seg.ts = 0;
// 发送 ack
// 计算 发送窗口
//...
// 把数据包从 snd_queue 移动到 snd_buf
// 移动是需要满足 发送窗口 大小,发送窗口满了,就停止移动
// 放在 snd_buf 的里面的数据,就是可以直接调用 ikcp_output 给对端发送的数据
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
// seg 唯一序号,其实就是一个递增的 kcp->snd_nxt
newseg->sn = kcp->snd_nxt++;
// una 在这里设置,通知对端下一个等待接收的包序号
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
// 计算快速重传标志,超时等待时间
// ...
// 发送 snd_buf
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
// 首次发送
// set->xmit 表示发送次数
// resendts 超时重传的等待时间
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
// 超时重传
// ...
}
else if (segment->fastack >= resent) {
// 快速重传
// ...
}
if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;
size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;
// 每当 buffer 中的数据超过 mtu ,那就先发出去,尽量避免底层再分包
if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
// 把 seg 控制数据复制到 buffer 上,kcp 自己来处理大小端问题
ptr = ikcp_encode_seg(ptr, segment);
// 再复制数据
if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}
// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}
// 计算 ssthresh,更新慢启动窗口
// ...
}
我们目前只关注 ikcp_flush
里面有关发送数据的逻辑:
- 首先 KCP 会根据对端的接收窗口大小,把
snd_queue
上的数据移动到snd_buf
上面,计算移动数量的公式是num = snd_nxt - (snd_una + cwnd)
,也就是:已发送成功的最大包序号snd_una
加上 滑动窗口大小cwnd
大于 下个待发送的包序号snd_nxt
,则可以继续再发送新的数据包。移动SEG
的同时,设置控制字段。
- 遍历
snd_buf
,如果需要发送数据包,则把数据复制到buffer
上,复制的同时用ikcp_encode_seg
处理控制字段数据的大小端问题。
- 最后调用
ikcp_output
把buffer
上的数据发送出去
至此, KCP 完成数据的发送。
接收
接收的过程是跟发送相反的:ikcp_input
-> ikcp_update
-> ikcp_recv
,用户接收到网络上的数据之后,需要调用 ikcp_input
传给 KCP 解析,调用 ikcp_update
的时候会给发送端回复 ACK 包,上层通过调用 ikcp_recv
来接收 KCP 解析之后的数据。
接收数据(点击展开代码)
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
IUINT32 maxack = 0, latest_ts = 0;
int flag = 0;
// 合法性检查
if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;
// data 可能是多个 KCP 包,循环处理
while (1) {
IUINT32 ts, sn, len, una, conv;
IUINT16 wnd;
IUINT8 cmd, frg;
IKCPSEG *seg;
// 不够一个 KCP 包,退出
if (size < (int)IKCP_OVERHEAD) break;
// 先把控制字段解析出来
data = ikcp_decode32u(data, &conv);
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
size -= IKCP_OVERHEAD;
if ((long)size < (long)len || (int)len < 0) return -2;
// 数据包类型检查
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;
kcp->rmt_wnd = wnd;
// 这里的 una 是发送方的 kcp->rcv_nxt,根据这个数据,可以去掉已确认接收的数据包
ikcp_parse_una(kcp, una);
// 去掉已确认接收的包后,更新 snd_una 下一个要发送的序号
ikcp_shrink_buf(kcp);
if (cmd == IKCP_CMD_ACK) {
// ack 包
// ...
}
else if (cmd == IKCP_CMD_PUSH) {
// 数据包
// 如果接收到的数据包序号 sn,在接收窗口内,则正常处理,否则直接丢弃,等重传
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
// 接收到的每个数据包,都要回一个 ack 包,记录下来
ikcp_ack_push(kcp, sn, ts);
// 接收的数据调用 ikcp_parse_data 处理
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
ikcp_parse_data(kcp, seg);
}
}
}
else if (cmd == IKCP_CMD_WASK) {
// 查询窗口包
// ...
}
else if (cmd == IKCP_CMD_WINS) {
// 查询窗口的回复包
// ...
}
else {
return -3;
}
data += len;
size -= len;
}
// 处理快速重传逻辑
// ...
// 更新发送窗口
// ...
return 0;
}
ikcp_input
循环处理每一个 SEG
包,先检查数据包的合法性和类型,因为每个数据包都会带上 una
,存放的是发送端等待接收的包序号,需要小于 una
的包对端都已经接受成功,所以可以把 snd_buff
中需要小于 una
的都删掉,并更新 snd_nxt
,这一部分由 ikcp_parse_una
和 ikcp_shrink_buf
来处理。接收到的每个数据包,都需要回复 ACK 包,由 ikcp_ack_push
记录下来,最后调用 ikcp_parse_data
处理数据。
解析数据(点击展开代码)
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
struct IQUEUEHEAD *p, *prev;
IUINT32 sn = newseg->sn;
int repeat = 0;
// 序号检查
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
_itimediff(sn, kcp->rcv_nxt) < 0) {
ikcp_segment_delete(kcp, newseg);
return;
}
// 找出 newseg 应该放置的位置,因为接收到的 seg 可能是乱序的
for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
prev = p->prev;
if (seg->sn == sn) {
// 重复收到
repeat = 1;
break;
}
if (_itimediff(sn, seg->sn) > 0) {
break;
}
}
// 把 newseg 放到 rcv_buf 正确的位置上
if (repeat == 0) {
iqueue_init(&newseg->node);
iqueue_add(&newseg->node, p);
kcp->nrcv_buf++;
} else {
ikcp_segment_delete(kcp, newseg);
}
// 把数据从 rcv_buf 移动到 rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
// 如果 seg 序号是等待接收的序号,移动到 rcv_queue
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
}
ikcp_parse_data
主要的工作就是把 newseg
放置到 kcp->rcv_buf
合适的位置上,并把数据从 rcv_buf
移动到 rcv_queue
。rcv_buf
合适的位置的意思是,rcv_buf
是按照 sn
的递增顺序排列的,newseg
需要根据自己的 sn
大小查找合适的位置。rcv_buf
上的数据要移动到 rcv_queue
,条件是 rcv_buf
上的数据包序号,等于 KCP 在等待接收的包序号 kcp->rcv_nxt
,移动一个数据包之后,需要更新 kcp->rvc_nxt
,再处理下一个数据包。
ikcp_input
之后,上层调用 ikcp_update
时候会发送 ACK 包,调用 ikcp_recv
会给上层返回有效数据。ikcp_update
和 ikcp_recv
互相独立,没有调用顺序要求,视上层的调用时机而定。我们先来看 ikcp_update
里面有关 ACK 发送的部分:
回复 ACK(点击展开代码)
// 前面说过,ikcp_update 最终是调用 ikcp_flush
void ikcp_flush(ikcpcb *kcp, IUINT32 current)
{
// ...
// 回复 ACK 包
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
// ...
}
ACK 包的之前已经由 ikcp_ack_push
保存起来了,所以这里只需要 ikcp_ack_get
获取每个 ACK 包的信息,发送给对方。上层可以使用 ikcp_recv
从 KCP 获取数据:
ikcp_recv(点击展开代码)
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);
// 一些有效性检查
if (iqueue_is_empty(&kcp->rcv_queue))
return -1;
if (len < 0) len = -len;
// 计算能返回的数据长度
peeksize = ikcp_peeksize(kcp);
if (peeksize < 0)
return -2;
if (peeksize > len)
return -3;
// 判断下接收窗口
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;
// 遍历 rcv_queue,把数据复制到 buffer 上
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}
len += seg->len;
// 判断分包
fragment = seg->frg;
// 移除数据包
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}
// 所有分包都复制完,退出循环
if (fragment == 0)
break;
}
assert(len == peeksize);
// rcv_queue 又空了一些,尝试继续从 rcv_buf 移动到 rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
return len;
}
ikcp_recv
一次调用只会返回一个完整的数据包,上层可以循环调用直到没有数据返回为止,函数的逻辑比较简单,就是从 rcv_queue
中复制数据到上层传进来的 buffer
里面,至此接收方对于接收到的数据包已经处理完毕。
接收方处理数据包的时候,给发送方发送了 ACK 包,我们再来看看发送方接受 ACK 包的处理:
处理 ACK 包(点击展开代码)
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
// ...
IUINT32 maxack = 0, latest_ts = 0;
// ...
while (1) {
// ...
// ts 是对端的 kcp-> current
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
if (cmd == IKCP_CMD_ACK) {
// 更新 rot
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
// 更新 snd_buf
ikcp_parse_ack(kcp, sn);
ikcp_shrink_buf(kcp);
// maxack = 这次 input 的所有 ACK 包中最大的 sn
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
}
// ...
}
// 如果有收到 ACK 包,记录用来做快速重传
if (flag != 0) {
ikcp_parse_fastack(kcp, maxack, latest_ts);
}
}
可以看到接收到 ACK 包后同样会需要 ikcp_parse_ack
和 ikcp_shrink_buf
来更新 snd_buf
,另外还需要调用 ikcp_update_ack
来计算更新 rto (retransmission timeout,超时重传时间)。ikcp_input
计算收到的 ACK 包中最大序号,来记录做快速重传用。就这样,发送方收到 ACK 包,把发送数据从 snd_buf
中移除,该数据包可靠地送达到了接收方,一次完整的 ARQ 确认接收过程结束。
超时重传
前面介绍的是 KCP 实现的 ARQ 中的 确认接收机制,ARQ 还需要一个超时重传来保证可靠性,下面我们来看看 KCP 是怎么做超时重传的。
让我们回到 ikcp_flush
函数:
超时重传(点击展开代码)
void ikcp_flush(ikcpcb *kcp)
{
// ...
// 发送 snd_buf
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
// 首次发送
needsend = 1;
segment->xmit++;
// 设置 segment->rto
// 通过 segment->rto 计算 segment->resendts 超时重传时间
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
// 超时重传
needsend = 1;
segment->xmit++;
kcp->xmit++;
// nodelay 控制下一次超时重传时间的计算
if (kcp->nodelay == 0) {
segment->rto += kcp->rx_rto;
} else {
segment->rto += kcp->rx_rto / 2;
}
segment->resendts = current + segment->rto;
lost = 1;
}
else if (segment->fastack >= resent) {
// 快速重传
// ...
}
if (needsend) {
// 发送数据
// ...
}
// ...
}
一旦当前时间 current
大于 segment->resendts
超时重传时间,说明在这段时间内,都没有收到接收方的 ACK 包,触发超时重传机制,needsend = 1
,重新发送数据。
有了确认接收和超时重传机制,KCP 就可以保证基础的可靠数据传输了。但是为了能够保持更稳定的数据流速,KCP 还做了更多的事情,下面我们一起看看 KCP 还做了那些优化。
KCP 提升流速的策略
快速重传
发送方发送了序号为 sn
和 sn + 1
两个数据包,如果只收到了 sn + 1
的 ACK 包,那可能是因为 sn
的 ACK 包在网路中还没到达,又或者 ACK 包丢了,又或者 sn
数据包丢了,如果此时还没到超时重传的时间,网络也还不太拥堵,只是因为某种原因而突发丢包,那么发送方主动提前发送 sn
数据包,可以帮助接收方更快地接收数据,提高流速。
KCP 里面也相应实现了快速重传机制,也在 ikcp_flush
里面:
快速重传(点击展开代码)
void ikcp_flush(ikcpcb *kcp)
{
// ...
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
// 发送 snd_buf
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
if (segment->xmit == 0) {
// ...
}
else if (_itimediff(current, segment->resendts) >= 0) {
// ...
}
else if (segment->fastack >= resent) {
// 快速重传
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
}
}
if (needsend) {
// 发送数据
// ...
}
// ...
}
要出发快速重传,有两个条件:
* segment->fastack >= resent
,resent 是可配置的参数 kcp->fastresend
,配置为 0 会关闭快速重传。segment->fastack
是在函数 ikcp_parse_fastack
里面设置的,这个函数是在 ikcp_input
里面调用,会根据 ikcp_input
算出的 maxack
来给所有 sn
小于 maxack
的 segment->fastack
加一,所以 segment->fastack
就是表示收到比 sn
大的包的次数。
* segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0
,setgment->xmit
是发送次数,kcp->fastlimit
是可配置的最大快速重传次数,发送次数需要小于最大快速重传次数
一旦满足快速重传的以上条件,KCP 就会执行快速重传,要注意快速重传并不会重置超时重传时间,原来的超时时间依然会生效。
缩短超时重传时间
超时重传是个很好的机制,但就是太花时间了,按照 TCP 的策略,每次超时重传时间翻倍,等待时间膨胀得很快,在等待时间内,很可能由于接收端的接收窗口已耗尽,无法接收新数据,而等待重传的包序号是在最前面,接收方要接收到重传的包才能把所有数据返回给上层,这种情况,整个网路的流速几乎为 0。KCP 增加了配置可以减缓等待的时间增长,而且也不会是翻倍,通过配置 kcp->nodelay
控制每次等待时间只会增长 1 倍的 RTO 或者 0.5 倍的 RTO,有效减缓等待时间的增长,帮助网路尽快恢复流速。
更新发送窗口
发送窗口表示的是同时传输的数据包数量,窗口越大,同时传输的数据越多,流速越大,但窗口过大,会导致网络拥塞,丢包率上升,数据重传增多,流速下降。所以发送窗口需要根据网络情况不断更新,慢慢趋近最优。 KCP 中关于发送窗口的代码:
发送窗口(点击展开代码)
ikcpcb* ikcp_create(IUINT32 conv, void *user)
{
// ...
// snd_wnd,rcv_wnd 发送和接受的缓冲区大小
kcp->snd_wnd = IKCP_WND_SND; // 32
kcp->rcv_wnd = IKCP_WND_RCV; // 128
// 对端接收窗口大小 // 128
kcp->rmt_wnd = IKCP_WND_RCV
// 发送窗口 cwnd 初始化 0
kcp->cwnd = 0;
// 发送窗口字节数大小,参与计算 cwnd
kcp->incr = 0
// 慢启动阈值,slow start threshold
kcp->ssthresh = IKCP_THRESH_INIT;
// nocwnd 是可配置参数,1 不考虑 cwnd
kcp->nocwnd = 0;
// ...
}
void ikcp_flush(ikcpcb *kcp)
{
// ...
// 发送数据时先计算 发送窗口大小,是发送缓冲区大小和对方接收窗口大小的小值
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
// 默认还需要考虑 kcp->cwnd,即是不断更新的发送窗口
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
// 根据 cwnd 大小,snd_queue 移动到 snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
}
// 发送数据
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
// 触发超时重传 lost = 1
// 触发快速重传 change++
// 更新慢启动阈值和发送窗口
if (change) {
// 如果有触发快速重传,ssthresh 设置为网络上正在传输的数据包数量的一半
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
// 发送窗口为阈值再加上快速重传相关的 resent
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}
if (lost) {
// 如果有超时重传,触发慢启动, ssthresh 阈值为发送窗口的一半
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
// 发送窗口回到 1,重新慢启动增长
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
if (kcp->cwnd < 1) {
// 因为初始化为 0,来到这里会再设置成 1
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
}
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
// 处理接收的数据
while (1) {
// ...
data = ikcp_decode16u(data, &wnd)
// rmt_wnd 是对方的接收窗口大小
kcp->rmt_wnd = wnd
// ...
// 处理数据
}
// 最后更新发送窗口
// kcp->snd_una - prev_una > 0,表示本次 input 有接受到 ACK 并且发送缓冲 snd_buf 有变化
if (_itimediff(kcp->snd_una, prev_una) > 0) {
// 再判断对方的接收窗口
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) {
// 小于慢启动阈值,双倍增长
kcp->cwnd++;
kcp->incr += mss;
} else {
// 大于慢启动阈值之后,通过公式更新 incr ,进而计算 cwnd
if (kcp->incr < mss) kcp->incr = mss;
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
kcp->cwnd++;
}
}
// 更新出来的值还要再比较下 rmt_wnd
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}
}
计算发送窗口 kcp->cwnd
的大小涉及的代码片段会稍微多一些,因为在发送和接收数据的时候,都会需要更新。kcp->cwnd
初始化为 0,
之后会在第一次调用 ikcp_flush
的时候,判断小于 1 ,便修改成 1。之后发送方根据发送窗口大小,发送出相应数量的数据包,等待 ACK
回复包。ACK 包在 kcp->input
中进行处理,kcp->input
中如果判断有 ACK 包,并有清除发送缓冲中的发送数据包,说明有数据包已经完成送达,kcp->cwnd++
。实际上很可能是一次 kcp->input
只处理到一个 ACK 包,可以理解为,每收到一个 ACK 包都会有 kcp->cwnd++
,这句自增实现的是翻倍的效果,譬如当前 kcp->cwnd = 2
,发送两个数据包,收到两个 ACK 包,触发了两次自增,最后就是 kcp->cwnd = 4
翻倍。
cwnd
可以一直指数增长,直到超过慢启动阈值,或者发生拥堵超时重传,快速重传的情况。发生了超时重传之后,会触发慢启动,慢启动阈值 ssthresh = kcp->cwnd / 2
,发送窗口 kcp->cwnd = 1
,回到最初重新指数增长。如果发生了快速重传,KCP 先提前减少 ssthresh
,也即是减少了 cwnd
指数增长的空间,降低增长速度,提前减缓拥堵的情况。
KCP 还增加了一个配置 nocwnd
,当 nocwnd = 1
,发送数据是不再考虑发送窗口大小,直接让最大能发送的数量发送数据包,满足高速模式下的要求。
小结
本文简单地分析了 KCP 的源码,并讨论了 KCP 上 ARQ 的实现,和一些 KCP 提升流速的策略。还有很多细节没有提到,感兴趣的可以自己翻 KCP 的源码对照着看,相信也能有不少的收获。
原文地址:https://wiki.disenone.site
本篇文章受 CC BY-NC-SA 4.0 协议保护,转载请注明出处。
Visitors. Total Visits. Page Visits.