跳轉至

KCP 源碼剖析

在閱讀本文之前,如果對 KCP 完全不熟悉,請花點時間先查閱 KCP 項目的說明文件:傳送門這篇文章的目的是深入瞭解 KCP 的實現細節。

KCP 是什么?

KCP 是一個快速可靠的協議,能以比 TCP 更低的延遲來傳送資料,資料重傳更快,等待時間更短。

TCP是為了處理流量而設計的(每秒內可以傳輸多少KB的數據),著眼於充分利用帶寬。而KCP則是針對流速設計的(單個數據包從一端發送到另一端需要多少時間),以10%-20%的帶寬浪費換取了比TCP快30%-40%的傳輸速度。TCP信道就像是一條流速緩慢,但每秒流量龐大的大河,而KCP則像是急流湍急的小溪川。

以上是 KCP 文件中所述,關鍵字詞為帶寬流速,KCP 將消耗帶寬,帶來的好處是更大更均衡的傳輸速率。詳情請參考 KCP 自身的文件。

KCP 資料結構

KCP 源碼位於 ikcp.hikcp.c 兩個檔案中,ikcp.h 的核心部分是資料結構的宣告部分,首先是 SEGMENT 資料包,它是 KCP 協議處理資料的最小單位:

SEGMENT 結構(點擊展開程式碼)
//=====================================================================
一個 SEGMENT 就是一個 SETMENT
//=====================================================================
struct IKCPSEG
{
鏈表節點發送和接收隊列都是這裡的鏈表的結構
    struct IQUEUEHEAD node;

會話編號同一個會話編號相同
    IUINT32 conv;

敬请为我转换成传统中文

    // 資料包類型,例如 DATA 或 ACK
    IUINT32 cmd;

因MTU的限制大數據包會被拆分成多個小數據包這是小數據包的編號
    IUINT32 frg

每個數據包都會附帶上發送方的接收窗口大小
    IUINT32 wnd;

// 如果這是一個確認收據包,則發送時間將設置為源數據包的時間戳記
    IUINT32 ts;

唯一標識資料包的編號
    IUINT32 sn;

所有小於una的數據包都已成功接收這與TCP中的概念一致最老的未確認序列號SND
    IUINT32 una;

數據長度
    IUINT32 len;

超時重傳時間
    IUINT32 resendts;

下次超时等待時間
    IUINT32 rto;

快速重传收到本数据包之后的数据包的数量大于一定数量就触发快速重传
    IUINT32 fastack;

發送次數
    IUINT32 xmit;

// 資料
    char data[1];
};

閱讀完 SEGMENT 的註釋,大致上可以看出 KCP 的核心也是一種 ARQ 協定,透過自動超時重傳來確保資料的傳送。接著再來看看 KCP 結構 KCPCB 的定義:

KCP 結構(點擊展開程式碼)
//---------------------------------------------------------------------
// IKCPCB
//---------------------------------------------------------------------
struct IKCPCB
{
// 会話編號
mtu, mss最大傳輸單元最大報文段大小
// state: 會話狀態,0 有效,-1 斷開
    IUINT32 conv, mtu, mss, state;

// snd_una: 等待 ACK 的包編號
// snd_nxt: 下一個等待發送的資料包編號
// rcv_nxt: 下一個等待接收的資料包編號
    IUINT32 snd_una, snd_nxt, rcv_nxt;

// ts_recent, ts_lastack: Not used
// ssthresh: 拥塞控制慢啟動閾值
    IUINT32 ts_recent, ts_lastack, ssthresh;

// rx_rto: rto (retransmission timeout),重發超時時間
// rx_rttval, rx_srtt, rx_minrto: 計算 RTO 的中間變數
    IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;

// snd_wnd, rcv_wnd: 最大發送和接收視窗大小
// rmt_wnd: 遠端窗口,對方剩餘可接收窗口大小
// cwnd: 可傳送窗口大小
// 探測:是否需要發送控制訊息的標誌
    IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;

// current: 當前時間
// interval: 更新間隔
ts_flush: 下次需要更新的時間
// xmit: 發送失敗次數
    IUINT32 current, interval, ts_flush, xmit;

對應鏈表的長度
    IUINT32 nrcv_buf, nsnd_buf;
    IUINT32 nrcv_que, nsnd_que;

// nodelay: 控制超時重傳的 rto 增長速度
// updated: Have you called ikcp_update before?
    IUINT32 nodelay, updated;

// ts_probe, probe_wait: When the receiving window of the remote end remains 0 for a long time, actively initiate inquiries at regular intervals.
    IUINT32 ts_probe, probe_wait;

// deal_link: 對端長時間無應答
// incr: 參與計算傳送視窗大小
    IUINT32 dead_link, incr;

// queue: 與使用者層接觸的數據包
// buf: 用來暫存協議資料的數據包
    struct IQUEUEHEAD snd_queue;
    struct IQUEUEHEAD rcv_queue;
    struct IQUEUEHEAD snd_buf;
    struct IQUEUEHEAD rcv_buf;

需要發送 ack 的資料包資訊
    IUINT32 *acklist;

需要確認的封包數量
    IUINT32 ackcount;

// 記錄清單中的記憶體大小
    IUINT32 ackblock;

// 由使用者介面傳入的資料
    void *user;

// 儲存一個 kcp 封包的空間
    char *buffer;

觸發快速重傳的 fastack 次數
    int fastresend;

快速重傳最大次數
    int fastlimit;

// nocwnd: 不考慮慢啟動的傳送窗口大小
// stream: 流模式
    int nocwnd, stream;

    // debug log
    int logmask;

// 傳送資料介面
    int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);

    void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

逐一將 KCP 結構內的字段加上註釋,你會初步感覺到 KCP 的協議並不是太複雜。透徹分析代碼後,大家都能讀懂並理解 KCP 協議。😊

KCP 的 ARQ 實現

KCP 本質上是一種 ARQ(自動重複請求)協議,最基本的是要確保可靠的傳輸。那麼我們可以先來關注 KCP 的基本 ARQ 部分,KCP 是怎麼實現可靠傳輸的。

ARQ 顧名思義,當我們認為對端接收資料包失敗時,自動重新發送對應的資料包,它是透過確認接收和超時重傳兩個機制,來實現可靠傳輸。具體的程式實現上,KCP 給每個資料包(就是上一節提到的 SEGMENT)分配唯一的 sn 標識符,一旦對端接收到資料包,會回覆一個 ACK 包(同樣是 SEGMENT),ACK 包的 sn 跟接收到的資料包 sn 相同,通知接收到此資料包已經接收成功。SEGMENT 上還有一個 una 欄位,表示下一個期待接收的資料包的編號,換句話說,即是所有在該編號之前的資料包都已經接收完,相當於一個全量的 ACK 包,發送端可以更快的更新發送緩衝和發送窗口。

我們可以透過追蹤 KCP 封包的發送和接收代碼,來了解最基本的 ARQ 實現:

發送

將文字翻譯成繁體中文:

傳送過程為 ikcp_send -> ikcp_update -> ikcp_output,上層調用ikcp_send將資料傳送給KCP,KCP在ikcp_update中處理資料的傳送。

ikcp_send(點擊展開程式碼)
//---------------------------------------------------------------------
// 用戶可以調用ikcp_send來呼叫kcp發送數據接口
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;

// mss 不能小於1
    assert(kcp->mss > 0);
    if (len < 0) return -1;

    // append to previous segment in streaming mode (if possible)
    if (kcp->stream != 0) {
處理流模式
        // ......
    }

計算分包如果數據長度 len 大於 mss需要分成多個包發送對端接收到之後再拼起來
    if (len <= (int)kcp->mss) count = 1;
    else count = (len + kcp->mss - 1) / kcp->mss;

    if (count >= (int)IKCP_WND_RCV) return -2;

    if (count == 0) count = 1;

// Subcontracting
    for (i = 0; i < count; i++) {
計算封包的資料長度並分配相應的 seg 結構
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size);
        assert(seg);
        if (seg == NULL) {
            return -2;
        }

設置 seg 的數據信息frg 表示分包編號
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;

將其添加到 snd_queue 末尾並將 nsnd_qua 加一
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);
        kcp->nsnd_que++;
        if (buffer) {
            buffer += size;
        }
        len -= size;
    }

    return 0;
}

ikcp_send 是由 KCP 的上層來調用的傳送數據介面,所有讓 KCP 傳送的數據,都應該通過這個介面。ikcp_send 做的事情很簡單,主要就是把數據,根據 kcp->mss(一個包最大數據長度)來分成多個包,並設置分包編號,最後放到傳送鏈表 snd_queue 的末尾。流模式就是把多次調用 ikcp_send 的數據都看成一個流,會先自動填充未滿的 SEGMENT 再分配新的,詳細實現本文不討論,感興趣的,相信看完本文,再對應看看程式碼就能理解。

ikcp_send調用完成後,數據將被放置在KCP的snd_queue中,接著KCP需要找到合適的時機將待發送的數據發送出去。這部分程式碼都位於ikcp_updateikcp_flush函數中:

ikcp_update(點擊展開程式碼)
//---------------------------------------------------------------------
ikcp_update是一個供上層定期調用的接口用於更新kcp的狀態發送數據
// update state (call it repeatedly, every 10ms-100ms), or you can ask 
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec. 
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
    IINT32 slap;

    kcp->current = current;

ikcp_flush 會檢查這個上層必須呼叫過 ikcp_update 才能呼叫 ikcp_flush建議只使用 ikcp_update
    if (kcp->updated == 0) {
        kcp->updated = 1;
        kcp->ts_flush = kcp->current;
    }

    slap = _itimediff(kcp->current, kcp->ts_flush);

    if (slap >= 10000 || slap < -10000) {
        kcp->ts_flush = kcp->current;
        slap = 0;
    }

    if (slap >= 0) {
// 下次排出的時間
        kcp->ts_flush += kcp->interval;
        if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
            kcp->ts_flush = kcp->current + kcp->interval;
        }
        ikcp_flush(kcp);
    }
}

ikcp_update 做的事情很簡單,判斷一下 ts_flush 的時間,符合條件則調用 ikcp_flush,主要的處理邏輯都在 ikcp_flush 裏面了,因為 ikcp_flush 內容複雜一點,我們目前只關注跟 ARQ 發送相關的部分:

傳送資料(點擊展開代碼)
//---------------------------------------------------------------------
// ikcp_flush
//---------------------------------------------------------------------
void ikcp_flush(ikcpcb *kcp)
{
    IUINT32 current = kcp->current;

// buffer 是要傳給 ikcp_output 的資料,初始化為 3 倍數據包大小
    char *buffer = kcp->buffer;
    char *ptr = buffer;
    int count, size, i;
    IUINT32 resent, cwnd;
    IUINT32 rtomin;
    struct IQUEUEHEAD *p;
    int change = 0;
    int lost = 0;
    IKCPSEG seg;

    // 'ikcp_update' haven't been called.
    if (kcp->updated == 0) return;

    seg.conv = kcp->conv;
    seg.cmd = IKCP_CMD_ACK;
    seg.frg = 0;

seg.wnd 表示當前可接收窗口大小
    seg.wnd = ikcp_wnd_unused(kcp);
    seg.una = kcp->rcv_nxt;
    seg.len = 0;
    seg.sn = 0;
    seg.ts = 0;

發送 ack
// 計算傳送窗口
    //...

將資料包從 snd_queue 移動到 snd_buf
移動時必須符合發送視窗的大小當發送視窗已滿時即停止移動
放在 snd_buf 的里面的資料就是可以直接呼叫 ikcp_output 給對端發送的資料
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        IKCPSEG *newseg;
        if (iqueue_is_empty(&kcp->snd_queue)) break;

        newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

        iqueue_del(&newseg->node);
        iqueue_add_tail(&newseg->node, &kcp->snd_buf);
        kcp->nsnd_que--;
        kcp->nsnd_buf++;

        newseg->conv = kcp->conv;
        newseg->cmd = IKCP_CMD_PUSH;
        newseg->wnd = seg.wnd;
        newseg->ts = current;

// seg 唯一序號,其實就是一個遞增的 kcp->snd_nxt
        newseg->sn = kcp->snd_nxt++;

在這裡設置unna通知對端下一個等待接收的數據包序號
        newseg->una = kcp->rcv_nxt;
        newseg->resendts = current;
        newseg->rto = kcp->rx_rto;
        newseg->fastack = 0;
        newseg->xmit = 0;
    }

計算快速重傳標誌超時等待時間
    // ...

// 傳送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
初次發送
// set->xmit 表示發送次數
// 重新傳送超時重傳等待時間
            needsend = 1;
            segment->xmit++;
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
超時重傳
            // ...
        }
        else if (segment->fastack >= resent) {
快速重传
            // ...
        }

        if (needsend) {
            int need;
            segment->ts = current;
            segment->wnd = seg.wnd;
            segment->una = kcp->rcv_nxt;

            size = (int)(ptr - buffer);
            need = IKCP_OVERHEAD + segment->len;

每當緩衝區中的資料超過 MTU 就應該儘快發送出去以盡量避免底層再次分包
            if (size + need > (int)kcp->mtu) {
                ikcp_output(kcp, buffer, size);
                ptr = buffer;
            }

 seg 控制數據複製到 buffer kcp 會自行處理大小端問題
            ptr = ikcp_encode_seg(ptr, segment);

再複製數據
            if (segment->len > 0) {
                memcpy(ptr, segment->data, segment->len);
                ptr += segment->len;
            }


            if (segment->xmit >= kcp->dead_link) {
                kcp->state = (IUINT32)-1;
            }
        }
    }

    // flash remain segments
    size = (int)(ptr - buffer);
    if (size > 0) {
        ikcp_output(kcp, buffer, size);
    }

計算 ssthresh更新慢啟動窗口
    // ...
}

我們目前只專注於 ikcp_flush 裡面有關發送數據的邏輯:

首先,KCP 會根據對端的接收窗口大小,將 snd_queue 上的資料移動到 snd_buf 上,計算移動數量的公式為 num = snd_nxt - (snd_una + cwnd),換句話說:已經成功發送的最大封包序號 snd_una 加上滑動窗口大小 cwnd 大於下一個待發送的封包序號 snd_nxt,則可以繼續發送新的資料封包。在移動 SEG 的同時,設置控制欄位。

遍歷 snd_buf,如果需要傳送資料包,則將資料複製到 buffer 上,複製的同時使用 ikcp_encode_seg 處理控制欄位資料的大小端問題。

最後呼叫 ikcp_outputbuffer 上的資料發送出去

到這裡,KCP 完成數據的發送。

接收

接收的過程是與發送相反的:ikcp_input -> ikcp_update -> ikcp_recv,用戶接收到網絡上的數據之後,需要調用 ikcp_input 傳給 KCP 解析,調用 ikcp_update 的時候會給發送端回覆 ACK 包,上層通過調用 ikcp_recv 來接收 KCP 解析之後的數據。

接收資料(點擊展開程式碼)
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    IUINT32 maxack = 0, latest_ts = 0;
    int flag = 0;

合法性檢查
    if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

// data could be multiple KCP packets, handle in a loop
    while (1) {
        IUINT32 ts, sn, len, una, conv;
        IUINT16 wnd;
        IUINT8 cmd, frg;
        IKCPSEG *seg;

/KCP 封包不足結束
        if (size < (int)IKCP_OVERHEAD) break;

首先將控制欄位解析出來
        data = ikcp_decode32u(data, &conv);
        if (conv != kcp->conv) return -1;

        data = ikcp_decode8u(data, &cmd);
        data = ikcp_decode8u(data, &frg);
        data = ikcp_decode16u(data, &wnd);
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);
        data = ikcp_decode32u(data, &una);
        data = ikcp_decode32u(data, &len);

        size -= IKCP_OVERHEAD;

        if ((long)size < (long)len || (int)len < 0) return -2;

// 檢查資料包類型
        if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
            return -3;

        kcp->rmt_wnd = wnd;

在這裡,`una` 代表了發送者的 `kcp->rcv_nxt`,據此資料可以移除已確認接收的資料包
        ikcp_parse_una(kcp, una);
將已確認接收的封包刪除後更新 snd_una 下一個要發送的序號
        ikcp_shrink_buf(kcp);

        if (cmd == IKCP_CMD_ACK) {
// 确认包
            // ...
        }
        else if (cmd == IKCP_CMD_PUSH) {
數據包
如果接收到的數據包序號 sn 在接收窗口內則正常處理否則直接丟棄等待重傳
            if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {

接收到的每個數據包都要回一個 ack 記錄下來
                ikcp_ack_push(kcp, sn, ts);

// 使用 ikcp_parse_data 函數處理接收到的數據
                if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
                    seg = ikcp_segment_new(kcp, len);
                    seg->conv = conv;
                    seg->cmd = cmd;
                    seg->frg = frg;
                    seg->wnd = wnd;
                    seg->ts = ts;
                    seg->sn = sn;
                    seg->una = una;
                    seg->len = len;

                    if (len > 0) {
                        memcpy(seg->data, data, len);
                    }

                    ikcp_parse_data(kcp, seg);
                }
            }
        }
        else if (cmd == IKCP_CMD_WASK) {
查詢視窗包
            // ...
        }
        else if (cmd == IKCP_CMD_WINS) {
// 查詢視窗的回覆封包
            // ...
        }
        else {
            return -3;
        }

        data += len;
        size -= len;
    }

處理快速重傳邏輯
    // ...

更新發送視窗
    // ...

    return 0;
}

ikcp_input 迴圈處理每一個 SEG 包,首先檢查數據包的合法性和類型,因為每個數據包都會攜帶 una,存放的是發送端等待接收的包序號,需要小於 una 的包對端都已經接受成功,所以可以把 snd_buff 中需要小於 una 的都刪掉,並更新 snd_nxt,這一部分由 ikcp_parse_unaikcp_shrink_buf 來處理。接收到的每個數據包,都需要回覆 ACK 包,由 ikcp_ack_push 記錄下來,最後調用 ikcp_parse_data 處理數據。

解析數據(點擊展開代碼)
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
    struct IQUEUEHEAD *p, *prev;
    IUINT32 sn = newseg->sn;
    int repeat = 0;

// Serial Number Verification
    if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
        _itimediff(sn, kcp->rcv_nxt) < 0) {
        ikcp_segment_delete(kcp, newseg);
        return;
    }

請找到 newseg 應該放置的位置因為接收到的 seg 可能是亂序的
    for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        prev = p->prev;
        if (seg->sn == sn) {
// 重複收到
            repeat = 1;
            break;
        }
        if (_itimediff(sn, seg->sn) > 0) {
            break;
        }
    }

 newseg 放置在 rcv_buf 的正確位置
    if (repeat == 0) {
        iqueue_init(&newseg->node);
        iqueue_add(&newseg->node, p);
        kcp->nrcv_buf++;
    }    else {
        ikcp_segment_delete(kcp, newseg);
    }

將數據從 rcv_buf 移動到 rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
如果 seg 序號是等待接收的序號移動到 rcv_queue
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }    else {
            break;
        }
    }
}

ikcp_parse_data 的主要工作是將 newseg 放置在 kcp->rcv_buf 的適當位置,並將數據從 rcv_buf 移動到 rcv_queue。這裡的適當位置指的是,rcv_buf 是按照 sn 遞增順序排列的,newseg 需要根據自己的 sn 大小來尋找適當位置。將 rcv_buf 上的數據移動到 rcv_queue 的條件是,rcv_buf 上的數據包序號等於 KCP 等待接收的包序號 kcp->rcv_nxt,移動一個數據包後,需要更新 kcp->rcv_nxt,再處理下一個數據包。

在调用 ikcp_input 后,当上层调用 ikcp_update 时,将发送 ACK 数据包;而调用 ikcp_recv 时,则会向上层传递有效数据。ikcp_updateikcp_recv 是相互独立的,没有特定的调用顺序要求,而取决于上层调用的时机。让我们首先来看一下ikcp_update 中与 ACK 数据包发送相关的部分:

回覆 ACK(點擊展開程式碼)
我們之前提過ikcp_update 最終會呼叫 ikcp_flush
void ikcp_flush(ikcpcb *kcp, IUINT32 current)
{
    // ...

回覆 ACK 封包
    count = kcp->ackcount;
    for (i = 0; i < count; i++) {
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->ackcount = 0;

    // ...
}

ACK 包的之前已經由 ikcp_ack_push 保存起來了,所以這裡只需要 ikcp_ack_get 獲取每個 ACK 包的資訊,發送給對方。上層可以使用 ikcp_recv 從 KCP 獲取數據:

ikcp_recv(點擊展開程式碼)
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
    struct IQUEUEHEAD *p;
    int ispeek = (len < 0)? 1 : 0;
    int peeksize;
    int recover = 0;
    IKCPSEG *seg;
    assert(kcp);

一些有效性檢查
    if (iqueue_is_empty(&kcp->rcv_queue))
        return -1;
    if (len < 0) len = -len;

計算能返回的數據長度
    peeksize = ikcp_peeksize(kcp);

    if (peeksize < 0)
        return -2;
    if (peeksize > len)
        return -3;

確認接收視窗大小
    if (kcp->nrcv_que >= kcp->rcv_wnd)
        recover = 1;

遍歷 rcv_queue將資料複製到 buffer 
    for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
        int fragment;
        seg = iqueue_entry(p, IKCPSEG, node);
        p = p->next;

        if (buffer) {
            memcpy(buffer, seg->data, seg->len);
            buffer += seg->len;
        }

        len += seg->len;

// 判斷分包
        fragment = seg->frg;

移除資料包
        if (ispeek == 0) {
            iqueue_del(&seg->node);
            ikcp_segment_delete(kcp, seg);
            kcp->nrcv_que--;
        }

所有的子程序都已經複製完成結束迴圈
        if (fragment == 0)
            break;
    }

    assert(len == peeksize);

// rcv_queue 又空了一些,嘗試繼續從 rcv_buf 移動到 rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }    else {
            break;
        }
    }

    return len;
}

ikcp_recv 一次調用只會返回一個完整的數據包,上層可以循環調用直到沒有數據返回為止,函數的邏輯比較簡單,就是從 rcv_queue 中複製數據到上層傳進來的 buffer 裡面,至此接收方對於接收到的數據包已經處理完畢。

當接收方處理數據包時,如果發送方收到了ACK包,接下來我們來看看發送方處理ACK包的情況:

處理 ACK 封包(點擊展開程式碼)
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    // ...
    IUINT32 maxack = 0, latest_ts = 0;
    // ...
    while (1) {
        // ...
// ts is the kcp->current of the peer
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);

        if (cmd == IKCP_CMD_ACK) {
更新 rot
            if (_itimediff(kcp->current, ts) >= 0) {
                ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
            }
更新 snd_buf
            ikcp_parse_ack(kcp, sn);
            ikcp_shrink_buf(kcp);

maxack = 所有ACK封包中的最大sn
            if (flag == 0) {
                flag = 1;
                maxack = sn;
                latest_ts = ts;
            }    else {
                if (_itimediff(sn, maxack) > 0) {
                #ifndef IKCP_FASTACK_CONSERVE
                    maxack = sn;
                    latest_ts = ts;
                #else
                    if (_itimediff(ts, latest_ts) > 0) {
                        maxack = sn;
                        latest_ts = ts;
                    }
                #endif
                }
            }
        }
        // ...
    }

如果收到 ACK 封包請記錄下來以便進行快速重傳
    if (flag != 0) {
        ikcp_parse_fastack(kcp, maxack, latest_ts);
    }
}

收到 ACK 包後,需要埋首於 ikcp_parse_ackikcp_shrink_buf 來更新 snd_buf,同時還要呼叫 ikcp_update_ack 來計算更新 rto(重新傳輸超時時間)。ikcp_input 則計算收到的 ACK 包中的最大序號,用以記錄快速重傳。這樣一來,當發送端接收到 ACK 包時,會從 snd_buf 中刪除發送的資料,確保資料包可靠地送達接收端,完成一倵完整ARQ確認接收流程。

超時重傳

前面介紹的是 KCP 實現的 ARQ 中的 確認接收機制,ARQ 還需要一個超時重傳來保證可靠性,下面我們來看看 KCP 是怎麼做超時重傳的。

讓我們回到 ikcp_flush 函式:

超時重傳(點擊展開代碼)
void ikcp_flush(ikcpcb *kcp)
{
    // ...
傳送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
// 首次發送
            needsend = 1;
            segment->xmit++;
設置 segment->rto
透過 segment->rto 計算 segment->resendts 的逾時重傳時間
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
// Timeout retransmission
            needsend = 1;
            segment->xmit++;
            kcp->xmit++;
// nodelay 控制下一次超时重传时间的计算
            if (kcp->nodelay == 0) {
                segment->rto += kcp->rx_rto;
            }    else {
                segment->rto += kcp->rx_rto / 2;
            }
            segment->resendts = current + segment->rto;
            lost = 1;
        }
        else if (segment->fastack >= resent) {
// 快速重傳
            // ...
        }
        if (needsend) {
// 傳送資料
            // ...
        }
    // ...
}

一旦當前時間 current 大於 segment->resendts 超時重傳時間,說明在這段時間內,都沒有收到接收方的 ACK 包,觸發超時重傳機制,needsend = 1,重新發送數據。

擁有確認接收和超時重傳機制後,KCP 就能夠確保基本的可靠數據傳輸。然而,為了保持更穩定的數據流速,KCP 進行了更多的優化處理。現在我們一起來看看 KCP 採取了哪些進一步的優化措施。

KCP 提升流速的策略

快速重傳

發送端發送了序號為 snsn + 1 兩個資料包,如果只收到了 sn + 1 的 ACK 包,那可能是因為 sn 的 ACK 包在網路中還沒到達,又或者 ACK 包丟了,又或者 sn 資料包丟了,如果此時還沒到超時重傳的時間,網路也還不太擁擠,只是因為某種原因而突發丟包,那麼發送端主動提前發送 sn 資料包,可以幫助接收方更快地接收資料,提高流速。

KCP 內部也已實現快速重傳機制,同時在 ikcp_flush 中:

快速重傳(點擊展開程式碼)
void ikcp_flush(ikcpcb *kcp)
{
    // ...
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;

// 傳送 snd_buf
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
            // ...
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            // ...
        }
        else if (segment->fastack >= resent) {
快速重傳
            if ((int)segment->xmit <= kcp->fastlimit ||
                kcp->fastlimit <= 0) {
                needsend = 1;
                segment->xmit++;
                segment->fastack = 0;
                segment->resendts = current + segment->rto;
                change++;
            }
        }
        if (needsend) {
// 傳送資料
            // ...
        }
    // ...
}

要啟用快速重傳,需滿足兩個條件: * segment->fastack >= resent,resent 是可配置的参数 kcp->fastresend,配置為 0 會關閉快速重傳。segment->fastack 是在函數 ikcp_parse_fastack 裡面設置的,這個函數是在 ikcp_input 裡面調用,會根據 ikcp_input 算出的 maxack 來給所有 sn 小於 maxacksegment->fastack 加一,所以 segment->fastack 就是表示收到比 sn 大的包的次數。 當 segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0 時,setgment->xmit 代表傳送次數,kcp->fastlimit 則是可設定的最大快速重傳次數,傳送次數必須少於最大快速重傳次數。

當滿足快速重傳的上述條件時,KCP 將執行快速重傳,請注意快速重傳並不會重置超時重傳時間,原來的超時時間依然會生效。

縮短逾時重傳時間

重送機制是一項非常良好的機制,但實在花費太多時間。按照 TCP 的策略,每次超時重傳的等待時間會倍增,那等待時間就會急速增加。在這等待時間內,由於接收端的接收窗口可能已經耗盡,無法接收新資料,同時需要等待重傳的封包序號卻在最前面。接收端必須等到收到重送的封包後,才能將所有數據返回給上層。這種情況下,整個網路的流速幾乎為零。KCP 增加了配置,可以減緩等待時間的增長,而且不會倍增。透過配置 kcp->nodelay 控制,每次等待時間僅會增長1倍的 RTO 或0.5倍的 RTO,有效地減緩等待時間的增長,協助網路盡快恢復流速。

更新傳送視窗

傳送窗口是指同時傳輸的數據包數量,窗口越大,同時傳輸的數據越多,流速越快,但窗口太大可能導致網絡擁塞,丟包率增加,數據重傳次數增多,流速下降。因此,發送窗口需要根據網絡情況持續更新,逐漸接近最佳值。在KCP中有關發送窗口的程式碼:

發送視窗(點擊展開代碼)
ikcpcb* ikcp_create(IUINT32 conv, void *user)
{
    // ...
`snd_wnd`  `rcv_wnd` 分別代表發送和接收緩衝區的大小
    kcp->snd_wnd = IKCP_WND_SND;    // 32
    kcp->rcv_wnd = IKCP_WND_RCV;    // 128
對端接收視窗大小         128
    kcp->rmt_wnd = IKCP_WND_RCV
// 設置發送窗口 cwnd 初始值為 0
    kcp->cwnd = 0;
傳送視窗的位元組大小用來計算擴 Congestion Windowcwnd)。
    kcp->incr = 0
慢啟動閾值slow start threshold
    kcp->ssthresh = IKCP_THRESH_INIT;
// nocwnd is a configurable parameter, 1 disregards cwnd.
    kcp->nocwnd = 0;
    // ...
}

void ikcp_flush(ikcpcb *kcp)
{
    // ...
在發送數據時首先計算發送窗口大小即發送緩衝區大小和對方接收窗口大小的較小值
    cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
默認情況下還需要考慮 kcp->cwnd即不斷更新的發送窗口
    if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

根據 cwnd 的大小 snd_queue 移動到 snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
    }
傳送資料
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
觸發超時重傳 lost = 1
觸發快速重傳 變更++

更新慢启动閾值和送出窗口
    if (change) {
如果觸發快速重傳ssthresh 設置為網路上正在傳輸的資料包數量的一半
        IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
        kcp->ssthresh = inflight / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;

// 傳送窗口設為閾值再加上快速重傳相關的 resent
        kcp->cwnd = kcp->ssthresh + resent;
        kcp->incr = kcp->cwnd * kcp->mss;
    }

    if (lost) {
如果有超時重傳將啟動慢啟動ssthresh 閾值將設為傳送窗口的一半
        kcp->ssthresh = cwnd / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
將傳送視窗重設為1重新執行慢啟動增長
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

    if (kcp->cwnd < 1) {
因為初始化為 0來到這裡會再設定成 1
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }
}

int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
處理接收的資料

    while (1) {
        // ...
        data = ikcp_decode16u(data, &wnd)
// rmt_wnd 是對方的接收窗口大小
        kcp->rmt_wnd = wnd
        // ...
處理數據
    }

更新咗發送視窗
 kcp->snd_una - prev_una > 0 表示這次輸入已收到 ACK 且發送緩衝區 snd_buf 有變化
    if (_itimediff(kcp->snd_una, prev_una) > 0) {
再評估對方的接收窗口
        if (kcp->cwnd < kcp->rmt_wnd) {
            IUINT32 mss = kcp->mss;

            if (kcp->cwnd < kcp->ssthresh) {
小於慢啟動閾值時成倍增加
                kcp->cwnd++;
                kcp->incr += mss;

            }    else {
在超過慢啟動閾值之後通過公式更新 incr進而計算 cwnd
                if (kcp->incr < mss) kcp->incr = mss;
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                    kcp->cwnd++;
                }
            }
// The value updated needs to be compared with rmt_wnd again.
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }
}

計算發送窗口 kcp->cwnd 的大小相關的程式碼片段會稍微複雜一些,因為在發送和接收數據時都需要更新。kcp->cwnd 初始化為 0, 在初次呼叫 ikcp_flush 时,若结果小於1,便將其改為1。之後發送端依據發送窗口大小,發送相對應數量的資料包,並等候確認訊號。 回覆ACK封包。ACK封包在 kcp->input 中被處理,若 kcp->input 中判別出有ACK封包,並清除發送緩衝區中的發送資料封包,表示某資料封包已成功抵達,kcp->cwnd++。事實上很可能一次kcp->input只處理一個ACK封包,可理解為每收到一個ACK封包就會執行kcp->cwnd++,這個遞增的實現效果是倍增的,例如目前kcp->cwnd = 2,發送兩個資料封包,收到兩個ACK封包,觸發兩次遞增,最終就是kcp->cwnd = 4倍增。

cwnd 可以持續指數增長,直到超過慢啟動閾值,或者發生擁塞超時重傳、快速重傳的情況。發生了超時重傳之後,會觸發慢啟動,慢啟動閾值 ssthresh = kcp->cwnd / 2,發送窗口 kcp->cwnd = 1,回到最初重新指數增長。如果發生了快速重傳,KCP 先提前減少 ssthresh,也就是減少了 cwnd 指數增長的空間,降低增長速度,提前減緩擁塞的情況。

KCP 還增加了一個配置 nocwnd,當 nocwnd = 1,發送數據時不再考慮發送窗口大小,直接讓最大能發送的數量發送數據包,滿足高速模式下的要求。

總結

本文簡單地分析了 KCP 的源碼,並討論了 KCP 上 ARQ 的實現,以及一些 KCP 提升流速的策略。還有很多細節沒有提到,感興趣的可以自己翻 KCP 的源碼對照著看,相信也能有不少的收穫。

Original: https://wiki.disenone.site/tc

This post is protected by CC BY-NC-SA 4.0 agreement, should be reproduced with attribution.

此訊息是使用 ChatGPT 翻譯的,請在反饋指出任何遺漏之處。