前景提要
在 lab0 中,我们实现了流控制字节流ByteStream
。
在 lab1 中,我们创建了一个模块StreamReassembler
,该模块接受一系列从同一字节流中提取的子字符串,并将它们重新填充回ByteStream
,同时将其内存消耗限制在给定的容量 capacity。
总体认知
现在,在 lab2 中,你将实现 TCP 中处理入字节流的部分:TCPReceiver
。在编写 StreamReassembler
和 ByteStream
时,你已经完成了其中大部分的「算法」工作。TCPReceiver
主要的逻辑是:将获取到的 Segment 传递到 Reassembler 中。
在 lab3 中,你将实现连接的另一端 TCPSender
:一个将出站字节流转换为不可靠数据报中发送段的工具。它负责读取 ByTestStream
(由某些发送方应用程序创建并写入),并将流转换为一系列传出 TCP Segment。当然,它的作用远远不止发送,它还具备有重传等高级逻辑。
至少在我们当前的认知来看,我们是要实现两个独立的模块:
我们先来拆解下这张图,这或许可以让您具备更多对它的一些详细认知。
- TCPReceiver 中包含有 Reassembler,这意味着确认包是否完整的逻辑确实是在 TCPReceiver 中的,符合预期。这个算法我们已经实现,并且我们的 Reassembler 也留了一个借口
push_string()
给到 TCPReceiver。这也是 lab2 的核心逻辑。 - TCPReceiver 和 TCPSender 中都包含了 ByteStream。
- 实际上 TCPReceiver 的 ByteStream 叫做 stream_out,是经过处理的网络数据输出给应用程序的出口,方便应用程序获取网络传输的数据。
- TCPSender 的 ByteStream 叫做 stream_in,是应用程序数据进入 TCP 发送端的入口,方便 TCPSender 读取数据并封装成 TCP 段发送。
- 我们可以这么方便理解:我们实现的 TCP 协议,它是「面向」主机程序而不是外部网络。因此主机程序向它发送数据的时候,就是「入向字节流」,也就是 stream_in。同理,它向主机程序发送信息(也就是外部网络发回来的数据),这个就是「出向字节流」,也就是 stream_out。
- stream_in 不需要我们单独设置成员对象,因为它在 Reassembler 中已有涉及。
- 实际上,TCPSender 也同样会感应到读取的数据。
- 这里我们开个天眼:TCPSender 是需要根据对面的 TCP 协议的网络终端的 window_size 即「窗口大小」(也就是对面还有多少「容量」来接收你的数据)和 ackno(对面接收到了你的哪些数据)。
- 所以 TCPSender 和 TCPReceiver 之上,还有一层封装来做双方的配合工作。不过这个部分是下一节的内容了。
- 刚刚我们有提到,TCPSender 会感应到对面 TCP 节点的 window_size 即「窗口大小」(也就是对面还有多少「容量」来接收你的数据)和 ackno(对面接收到了你的哪些数据)。
- 这些数据,恰恰就是由 TCPReceiver 来提供。因此,TCPReceiver 除了传递数据之外,还会处理并暴露 window_size 和 ackno。
lab2: TCPReceiver
1.1 序列号
1.1.1 概念
实际上我们绝大多数人进入 lab2 就会被这张表格糊脸:
Sequence Numbers | Absolute Sequence Numbers | Stream Indices |
---|---|---|
Start at the ISN | Start at 0 | Start at 0 |
Include SYN/FIN | Include SYN/FIN | Omit SYN/FIN |
32 bits, wrapping | 64 bits, non-wrapping | 64 bits, non-wrapping |
“seqno” | “absolute seqno” | “stream index” |
如果你确实看不懂的话,我来给你说明下~
- Sequence Numbers:序列号,简称为 SN。用于标识数据段(Segment)中第一个字节在数据流中的位置。是 TCP 协议层的核心机制,用于保证数据顺序和可靠性。
- 这里说一下 Initial Sequence Numbers,简称为 ISN。它表示在 TCP 传递过程中的第一个发送的 Segment 的 SN 值。
- Absolute Sequence Numbers:TCP 流的全局唯一标识,便于分析。
- Stream Indices:字节流内部的偏移量,独立于网络协议,用于数据存储和传输前的准备。
1. Sequence Numbers(序列号)
- TCP 协议中的核心机制,用于标识数据段(Segment)中第一个字节在数据流中的位置。
特点:
- 32 位无符号整数,范围为
0 ~ 2^32-1
,达到上限后循环(模运算)。 - 初始序列号(ISN):由发送方随机生成,用于防止历史数据干扰。
- 示例:
假设发送方发送的数据段携带序列号 1000,数据长度为 500 字节,
则下一个数据段的序列号应为 1000 + 500 = 1500。
2. Absolute Sequence Numbers(绝对序列号)
TCP 序列号的全局唯一标识,从初始序列号(ISN)开始累加,不考虑溢出 32 位溢出后的循环。
与普通序列号的区别:
- 普通序列号是模
2^32
的循环值,而绝对序列号是无限增长的逻辑值。(我们在实现的时候,使用uint64_t
作为 ASN 的数据结构) - 绝对序列号常用于分析或调试工具(如 Wireshark),避免循环带来的歧义。
- 普通序列号是模
应用场景
- 网络抓包分析:
- 例如,Wireshark会显示绝对序列号(
SEQ/ACK Analysis
),方便用户理解数据顺序。
- 例如,Wireshark会显示绝对序列号(
- 跨连接跟踪:
- 同一主机的不同TCP连接可能有相同的ISN,但绝对序列号是唯一的。
3. Stream Indices(流索引)
- 数据在字节流中的绝对位置,通常用于内存或文件中的偏移量。也就是说,它从 0 开始计数。
与TCP序列号的关系:
- 转换逻辑:
TCP序列号 = Stream Index + Initial Sequence Number (ISN)
- 示例:若ISN为
1000
,字节流中第500字节的Stream Index为500
,则对应的TCP序列号为1000 + 500 = 1500
。
4. 三者的对比
概念 | 作用 | 应用协议/场景 | 示例 |
---|---|---|---|
Sequence Numbers | TCP数据段的顺序标识,支持循环和重传。 | TCP协议层 | 数据段序列号为 1500 ,表示该段第一个字节在TCP流中的位置。 |
Absolute Sequence Numbers | TCP流的全局唯一标识,无限增长。 | 抓包工具、协议分析 | Wireshark显示绝对序列号为 1500 ,避免循环歧义。 |
Stream Indices | 字节流内部的绝对偏移量,与TCP无关。 | 内存缓冲区、文件系统、TCP发送方实现 | 用户写入字节流的第500字节对应Stream Index 500 。 |
一个典型的场景在于网络抓包,我们通过观察绝对序列号可清晰看到数据传输的全局顺序,例如:
数据包1: Absolute SEQ=1000, 数据长度=500 → 下一个绝对SEQ=1500
数据包2: Absolute SEQ=1500, 数据长度=300 → 下一个绝对SEQ=1800
5. 在项目中的作用
在 Segment 中,序列号是以 SN 形式存在的,具备:
- ISN 完全可能是任何值。它不严格遵循每次发送一定要从 Segment 从 0 为起始点开始发送。
- 是 32 位的数据。
但是在我们的 TCPReceiver 实现中,使用 32 为 SN 存储数据很显然满足不了我们的需求,因为它太小了。因此我们的 TCPReceiver 中,存储的是 ASN。我们可以得到这样的结论:
如果操作涉及读取 Segment 中的 SN 到 TCP,那么一定要经历一个 32 位转换到 64 位的过程。反之亦然。
32 位到 64 位的转换是有挑战的,因为它会有精度损失,也会有「SN 段在 64 位的 ASN 数据中具体是指哪一段」这样的问题。
我们为了解决这个问题,在开始 lab2 前,我们首先要完成 WrappingInt32
这个类。这个类很好的帮我们解决了 SN 到 ASN 的转换过程~
在下文中,ASN 我们会使用代码中的写法:absolute_seqno,与之对应的,SN 也将被写作 seqno。
6. 初步认识转换逻辑
由于 SN 是从 ISN 开始,因此在这种转换中,也会有 ISN 的参与。你一定不会错过这张图:
element | SYN | c | a | t | FIN |
---|---|---|---|---|---|
seqno | 2^32-2 | 2^32-1 | 0 | 1 | 2 |
absolute seqno | 0 | 1 | 2 | 3 | 4 |
stream index | 0 | 1 | 2 |
我们首先要来了解下 SYN 和 FIN 这俩标志位:
- SYN 标志位
SYN 是 Synchronize Sequence Numbers(同步序列号)的缩写。当 SYN 标志位置为 1 时,表示这是一个用于建立 TCP 连接的同步请求包。
你可能会看到「三次握手」「四次握手」中 SYN 很复杂的过程概念,但在 TCPSender 中它的含义很简单:TCPSender 发出的第一个 Segment,SYN 一定为 1/true。我们也可以推断得到,TCPReceiver 接收到了 SYN(实际上第一个接受到的数据并不是 SYN,但我们在 Reassembler 中一定会把 SYN 放在第一位),也就意味着第一个段被我们接收到了。
- FIN 标志位
FIN 是 Finish 的缩写。当 FIN 标志位置为 1 时,表示发送方已经没有数据要发送了,请求关闭 TCP 连接。
同理,TCPSender 发出的最后一个 Segment,FIN 一定为 1/true。TCPReceiver 接收到了 FIN,也就意味着最后一个段被我们接收到了。
所以 SYN 和 FIN 实际上就是第一个段和最后一个段。我们站在 stream index 的角度来看,TCPSender 发送的数据顺序分别是:
SYN 0 1 2 FIN
由于真实发送的数据被分为了 0 1 2,SYN 和 FIN 是我们主动添加上去的,stream index 如此划分确实也是合理。
那么我们如果将它们视作一个「整体」,以 SYN 为 0 进行标号呢?那就是:
SYN 0 1 2 FIN # stream index
0 1 2 3 4 # absolute seqno
我们就如此知道了 absolute seqno 和 stream index 的关系。在 TCP 的视角下,SYN 和 FIN 同样也为「Segment」的一部分,这个时候我们自然就对它们进行统一标号了。这就是 absolute 的由来。SYN 的 absolute_seqno 一定为 0,FIN 的 absolute_seqno 也一定是本次发射的最大值。
即使是 SN 以 ISN 开头,TCPReceiver 和 TCPSender 保存的 absolute_seqno 也是从 0 开始计数的。
不难发现:在不进行类型(32 位、64 位)转换的情况下,seqno = absolute_seqno + isn(ISN)。
7. 总结
刚刚讲了很多概念,我们接下来总结一下:
其中标注为蓝色的部分为 TCPReceiver 需要消费的。它包括了:
- Payload:我们需要将它的数据给放进 Reassembler 中。
- SYN、FIN:起始收到、末尾收到的标志位。
- seqno:转换为 absolute_seqno 后,放进 Reassembler 中。
标注为红色的部分为 TCPReceiver 暴露出去的接口,它主要是:
- 从 Reassembler 中处理后读出来的
ackno
,window_size()
。其中ackno
也是一个WrappingInt32
的数据。
1.1.2 推导转换逻辑
回绕现象:在 TCP 协议中,序列号是 32 位无符号整数,范围是 0 到 2^32 - 1。当序列号增长到 2^32 - 1 后,会重新从 0 开始,这就产生了回绕现象。
观察 WrappingInt32
这个类,我们可以发现它的核心方法是 unwrap 和 wrap,这恰好对应 32 位到 64 位数据的相互转换。为了和代码本身更为贴近,我们联系这个类,做出定义:
absolute_seqno:使用 uint_64
作为数据类型。
seqno:使用 WrappingInt32
作为数据类型。WrappingInt32 重载了部分计算符号,这使得它可以和一部分正常的 size_t 等类型的数据直接进行加减运算。
wrap 是做这样的定义的:
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn)
我们之前有说过:
seqno = absolute_seqno + isn(ISN)
将 64 位的绝对序列号 n 强制转换为 32 位无符号整数 n_32。在转换过程中,如果 n 超过了 2^32 - 1
,会自动进行取模运算,相当于 n % 2^32
。
因此 wrap 方法直接如此实现:
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
auto n_32 = static_cast<uint32_t>(n);
return isn + n_32;
}
TCP 的序列号是 32 位的,可能会因为回绕 (wrap around) 的问题,所以需要将其转换为一个更大的数值空间,也就是 absolute_seqno。
而 checkpoint 的作用是作为一个参考点,帮助确定最接近的 absolute_seqno 值。也就是说,当多个可能的 absolute_seqno 可能对应同一个 32 位的 seqno 时,选择离 checkpoint 最近的那个。
具体实现步骤如下:
- 使用 wrap 方法将检查点 checkpoint 转换为 32 位的包装检查点 wrapped_checkpoint。
- 计算包装序列号 n 与包装检查点 wrapped_checkpoint 之间的差值 diff。
- 分别计算无符号和有符号的绝对序列号 unsigned_unwrapped 和 signed_unwrapped。
- 根据 diff 和 checkpoint 的值进行判断:
- 当 diff < 0 且 checkpoint < INT64_MAX 时,如果 signed_unwrapped 为负数,说明在计算过程中发生了溢出,需要重新计算 unsigned_unwrapped。
- 否则,直接使用 unsigned_unwrapped 作为最终结果。
假设初始序列号 isn 为 100,包装序列号 n 为 150,检查点 checkpoint 为 2^32 + 120。首先计算包装检查点 wrapped_checkpoint 为 100 + 120 = 220,差值 diff 为 150 - 220 = -70。然后分别计算 unsigned_unwrapped 和 signed_unwrapped,根据判断条件进行调整,最终得到正确的绝对序列号。
1.1.3 与 TCPReceiver 的联系
那在 TCPReceiver 的 receiver_segment()
中,什么时候需要调用这个 unwrap()
函数呢?
是在处理接收到的 TCP 段的序列号时,需要将其转换为绝对序列号,以便确定数据的位置。例如,当收到一个数据包时,里面的 seqno 是 32 位的,我们需要知道它在整个字节流中的绝对位置,这样才能正确重组数据。
此时我们已经有了核心方法:
auto absolute_seqno = unwrap(seg.header().seqno, this->_isn.value(), /* 未知 checkpoint */);
在调用 unwrap 的时候,应该把第三个参数 checkpoint 设置为什么值?
我们先明确一个结论:checkpoint 值的设立,是为了确保 segment_received()
获取到的 seqno
转为的 absolute_seqno
,符合预期。进一步地说,它能被转为正确的 absolute_seqno
而不是 absolute_seqno - n*2^32
亦或者是 absolute_seqno + n*2^32
。因为 checkpoint 值的不同,导致的转化出现不符合预期的情况,就是转化出来的 absolute_seqno
多出或者少了若干个 2^32 的量级。
我们来直观表示一下。假设我们有三个 2^32 的区间 ,checkpoint 在 (2^32, 2*2^32) 这个区间范围内,我们可以这样表示当前的区间:
0, ..., 2^32, ..., [checkpoint], ..., 2*2^32, ..., 3*2^32
一个 seqno
为 1 的 segment 片段,它有可能被分配到区间内的 1
,也有可能被分配到 1+2^32
或者 1+2*2^32
这两个地方。我们站外设计者的视角来看当然希望它转换后的 absolute_seqno
也是 1
. 但很可惜,它最终得到的值是 1+2^32
或者 1+2*2^32
,因为它们离 checkpoint 更近。具体结果是什么则要取决于它是否离 checkpoint 更加接近了。
从直观上,离 checkpoint 近的 seqno
才更不容易出错。实际上,它拥有一个十分明确的「边界」,在边界内的 seqno
转换后,一定不会出错。在边界外的 seqno
转换后,一定会出错。我们进一步讨论这个「边界」的范围:
假定 checkpoint 为 2+2^32
,那么它的「边界」是这个样子:
0, ..., [下边界: 3+2^31], ..., 2^32, ..., [checkpoint: 2+2^32], ..., [上边界: 1+2^32+2^31], ..., 2*2^32, ..., 3*2^32
是一个:以 checkpoint 为中心,大小为 2^32-1 的区间。
所以 checkpoint 是什么值,就转化为了:「区间」的中心值是什么值,才能让 seqno
的值尽可能地落在这个区间内。
也可以等价于,是否能找到一个区间,到达 TCP 接收端的 segment 片段中的 seqno
尽可能落在这个区间。
当然这个问题实际上我们现在仍然难以证明(笔者写到这里也不知道后续的章节是否会证明这个问题,哈哈哈)所以我们抛出结论:
checkpoint 的正确取值应为当前接收方期望的下一个字节的绝对序列号,即已成功重组并写入字节流(ByteStream)的字节数量。具体来说:
checkpoint 是 stream_out().bytes_written()
,即已写入的字节数。
absolute_seqno
从 0 开始计数,每个字节对应一个绝对序列号。例如,第一个字节为 0,第二个为 1,依此类推。- 若已成功重组并写入 k 个字节(
stream_out().bytes_written()
== k),则下一个期望的字节的absolute_seqno
是 k。 - 使用 k 作为 checkpoint,unwrap 会选择最接近 k 的合法
absolute_seqno
,确保正确处理序列号回绕。到达 TCP 接收端的 segment 片段中的seqno
尽可能落在了以absolute_seqno
为中心,长度为 2^32-1 的区间。
是不是听着好像也还挺合理的… 和现实中对应,2^32 已经是 4.295GB 大小了。以我们对于 TCP 的控制来说,传输应该不会乱成这个样子吧。康宇的博客也有这样的描述:
但几乎不可能出现相邻到达的两个 segment 序号差值超过
INT32_MAX
的情况,除非延迟以年为单位,或者产生了比特差错(后面的 LAB 可能涉及)。
我们现在不再纠结,然后处理出来应该是这样:
auto absolute_seqno = unwrap(seg.header().seqno, this->_isn.value(), stream_out().bytes_written());
当然在我的项目中,我新增了一个方法获取 StreamReassembler
中的 _first_disassembled_index
:
size_t StreamReassembler::first_disassembled_index() const {
return this->_first_disassembled_index;
}
它和 stream_out().bytes_written()
实际上是同一个值。这一点就不再展开。
1.2 实现
1.2.1 理论
实现其实很简单,有了 WrappingInt32
后,只需要接收段的时候:
- 针对于 SYN FIN 做特殊处理。
- 在
push_string()
的核心逻辑之外,补充下 seqno 和 abs_seqno 的转换逻辑就好了。因为我们传入 Reassembler 需要将 seqno 转换为 abs_seqno,而暴露出 ackno 的逻辑需要 abs_seqno 转换为 32 位的数据。
我们仍然需要对它有一个全局的了解。从 TCPReceiver 开始,TCPSender 和后面的章节都会有一个状态的判断,这涉及到我们是否能通过单元测试。
但有了我们上面的分析,这个图是不是就感觉很简单了。对于这个图应该确实没有什么可以说的了。
针对于 error 这个,虽然说放到本节来处理也是可以的,但是它已经在 Reassembler 中被处理好了。
其中需要注意的是,在收到 SYN 和 FIN 后,计算 abs_seqno 的时候,需要额外 +1 和 +2. 这是因为 SYN 和 FIN 也被记作了 abs_seqno 之内。
1.2.2 代码
tcp_receiver.hh
class TCPReceiver {
//! Our data structure for re-assembling bytes.
StreamReassembler _reassembler;
//! The maximum number of bytes we'll store.
size_t _capacity;
std::optional<WrappingInt32> _ackno{};
std::optional<WrappingInt32> _isn{};
void _update_ackno();
}
tcp_receiver.cc
void TCPReceiver::segment_received(const TCPSegment &seg) {
if (!this->_isn.has_value()) {
if (seg.header().syn) {
this->_isn = seg.header().seqno;
this->_reassembler.push_substring(seg.payload().copy(), 0, seg.header().fin);
this->_update_ackno();
}
return;
}
auto absolute_seqno = unwrap(seg.header().seqno, this->_isn.value(), this->_reassembler.first_disassembled_index());
// absolute_seqno(绝对序列号)已经包含了一个 syn,因此需要 -1 才能得到正确的起始索引
// 如果这个地方已经收到 fin 了,并且绝对序列号大于 fin,那么就要 -2
// 但我们实际上在绝对序列号大于 fin 的时候在 push_string 内部就会被抛弃掉,因此不必这么做,直接优化成 -1
this->_reassembler.push_substring(seg.payload().copy(), absolute_seqno - 1, seg.header().fin);
this->_update_ackno();
}
optional<WrappingInt32> TCPReceiver::ackno() const { return this->_ackno; }
size_t TCPReceiver::window_size() const { return this->_capacity - this->_reassembler.stream_out().buffer_size(); }
void TCPReceiver::_update_ackno() {
if (this->stream_out().input_ended()) {
this->_ackno = wrap(this->_reassembler.first_disassembled_index() + 2, this->_isn.value());
return;
}
if (this->_isn.has_value()) {
this->_ackno = wrap(this->_reassembler.first_disassembled_index() + 1, this->_isn.value());
}
}
lab3: TCPSender
TCPSender 负责处理以下事务:
- 根据对面的 window_size 窗口大小发送数据。
- 通过确认哪些数据已经被对方接受,来确认是否重发数据。
- 虽然说无需刻意制造一个状态机,但需要控制 SYN、FIN 和此时是否已知全部数据已被接收到,借此让 test 来间接知道你当前是啥状态。
2.1 理论
2.1.1 发送数据
发送数据,就是字面意思…
当然,我们需要知道两个事情:
- 什么时候可以发送数据?
一种状况是,我们需要判断这些数据我们能完全发送,也就是数据量不能超过我们发送端的承受能力,当然,也不能超过发送端缓冲区的容量(即我们在这一刻时间能拿出多少数据出来发送)。
另一种情况是,不能超过对方接收端的接收能力。
因此,需要分两种情况讨论:
- 数据量不能超过我们发送端的承受能力
发送端承受能力,可以用 TCPConfig::MAX_PAYLOAD_SIZE
来表示,这是发送端的一个属性,也是自定义好的,不需要我们为它赋值。
发送端缓冲区的容量,也就是 this->_stream.buffer_size()
。
因此,我们可以从这个条件得出发送端的数据条件为:
auto sender_max_segment_payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, this->_stream.buffer_size());
- 不能超过对方的接收能力
对方的接收能力,在下一个问题中我们可以知道它是通过唯一的信号 window_size 来通知发送方的。这表示接收端的窗口大小。
那么,我们可以进一步将 length 表示为:
auto segment_payload_size = min(sender_max_segment_payload_size, this->_window_size)
发送的时候,也需要有一个特殊条件,那就是对面窗口为 0 的时候,我们需要持续发送数据量为 1 的 Segment,以避免死锁。
对于这个问题,我们可以在 fill_window()
方法内,设计一个临时参数,欺骗发送端此时对面的 window_size 最小为 1. 这样就可以确保发送端(在对面窗口为 0 的时候)可以一直发送 Segment.payload 为 1 的数据了。
此时的代码,我们可以这么写:
// 为防止死锁,发送数据的时候即便对面窗口为 0,也至少发送 1 单位的数据
auto window_size = max<uint16_t>(1, this->_window_size);
// 中间省略部分逻辑
auto segment_payload_size = min(sender_max_segment_payload_size, window_size)
对面窗口为 0 的时候,我们需要持续发送数据量为 1 的 Segment。它还有一个作用,那就是处于这种情况的时候不会触发超时重传窗口翻倍的机制。这个我们在「超时重传」中会说明。
2.1.2 接收 Ack
- 最简单过程理解
怎么理解呢?假如说有三个 Segment,它们的 seqno 为 1,2,3.
假设我们把这些 Segment 全都发送了。但是我们要怎么知道接收方拿到了这些数据呢?接收方也必须要同时给发送方发送数据,做「信息同步」,或者用互联网黑化「对齐」。
「信息同步」的信息,会带一个数据叫做 ackno,它的取值巧不巧也正好是 1,2,3. 我们做一个假设,ackno 为 2. 那么发送方就知道 1 和 2 已经被接收了。
也就是说,ackno 代表的 seqno 及之前所有的 seqno,全都被接收方接受到了。
另外,我们把已经发送出去但还未成功确认的数据报称作 outstanding segments。
- 实际上,同步的数据还包括了 window_size
在发送数据的时候,我们通过接收方的「窗口大小」,知道了我们最多一次性往 Segment 中放入多少数据。那么窗口大小也必须要从「接收方」同步给「发送方」。
因此,我们可以解答什么是「接收 Ack」了:实际上就是接收方发送信息,同步发送方信息。这个信息包含两个标志,一个是 ackno,也就是接收方接收到了哪些数据。另一个则是 window_size,也就是接收方的窗口大小,发送方据此调整下一步发送的 Segment 的大小。
- 接收 Ack 之后,需要主动发起一次
fill_window()
这个主要是为了防止死锁。
2.1.3 状态变化
- 发送数据包含了一系列的状态变化
这张图可以说就是每一个学习网络的孩子都会碰到的图了。我们该如何拆解这张图呢?
首先我们排除 Error,因为细心的你早就可以发现你在 ByteStream 中已经大概知道 Error 是怎么出现的了~ 当然,也会有一些其它因素导致 Error,但直到这里我们仍然还是不讨论的。请放心。
观察其它的状态,我们首先要了解两个标志位(SYN 和 FIN 标志位):
- SYN 标志位
- 含义:即同步(Synchronize)标志位,用于在 TCP 连接建立时进行同步序列号的操作。
- 作用:发送方(其实就是 TCPSender)发送 SYN 标志位置为 1 的 TCP 报文段,请求建立连接,同时在报文中携带一个初始序列号(ISN)。
(服务器收到后,会返回一个 SYN=1 且 ACK=1 的报文段作为响应,其中也包含自己的初始序列号,这样双方就完成了序列号的同步,为后续可靠的数据传输奠定基础,这也是 TCP 三次握手过程中的关键步骤。)
–以上这一段是八股文,但是是不是已经有一种理论和实践相结合的体验了–
SYN 被 TCPSender 发送后,进入 SYN_SENT 状态。收到对应的 SYN=1 且 ACK=1 的报文段作为响应后,进入 SYN_ACKED 状态。
- FIN 标志位
- 含义:即结束(Finish)标志位,用于表示发送方已经没有数据要发送了,请求关闭连接。
- 作用:当发送方(其实就是 TCPSender)完成数据传输后,会发送 FIN 标志位置为 1 的 TCP 报文段给对方,告知对方自己的数据发送完毕,请求关闭连接。对方收到后会发送确认报文段(ACK),表示已经收到关闭请求。若接收方也没有数据要发送了,也会发送 FIN 报文段给对方,完成连接的关闭过程,这是 TCP 四次挥手过程中的重要操作。
FIN 基本也是同理,发送端 Sender 发送 FIN 后,进入入 FIN_SENT 状态;收到对应的 FIN=1 且 ACK=1 的报文段作为响应后,进入 FIN_ACKED 状态。
关于状态的变化,单元测试套件是如何来判断的呢?我们可以去 tcp_helpers/tcp_state.cc
来学习是如何判断,这对我们理解这几个状态到底是怎么回事很有好处。
这不是一个必选项,我们实际上无需在程序中写明我们是哪个状态,也不用根据我们当前在哪个状态去做状态机之类的逻辑。因此在我们写代码的时候,不用刻意再去抽离 State 模块。
2.1.4 超时重传
TCPSender 承担着两项重要任务:
其一,它会把原始数据流拆分成多个 TCP 报文(其实就是上一节内容的 Segment),并将这些报文发送出去;这个过程我们在前面说过了。重点了解数据「拆分了」就好。
其二,它会对每一个已发送出去但尚未被接收方接收的报文,记录其发送时间。倘若存在部分已发送报文,在一段特定的时间内都未得到接收方通过确认号(ackno)进行的确认,那么这些数据包就必须进行重传。
我们来拆解「其二」,首先就是 1「一段特定的时间」应该如何解释,2「重传哪些数据包」。
- 重传所需要的一段特定的时间
TCPSender 在构造时会被给予一个重传超时时间 RTO 的初始值,它的全称叫 Retransmission Timeout,以下如有可能我们尽可能简称它为 RTO。RTO 是在重新发送未完成 TCP 段之前需要等待的毫秒数,实际上就是「重传所需要的一段特定的时间」的别称。
- RTO 会一直变化,因此它一共是有两个值,它的初始值,和它的现在值。
- RTO 值将会随着时间的流逝(或者更应该说是网络环境的变化)而变化,但初始的RTO将始终不变。
- 它的初始值是系统设置好的,不需要我们关心。但它的现值需要我们去根据特定的情况去计算,有一套算法。这就是 RTO 重传算法。
- 重传的逻辑:RTO 重传算法
RTO 重传算法一共有很多种,在理论知识里面我们基本都学了。我们在 lab 中使用较为简单的那一种,它具体描述如下:
- (1)从发送数据包的那一刻开始计时。
- (2)经过一个 RTO 后,没有接收到 Ack。
- (3)此时触发重传,
RTO = RTO * 2;
数据包被重新发送。 - (3.1)如果一直没有接受到 ack,那么在上一步重复循环。一直到 uint32_t 下的最大值为止不再继续翻倍。
- (3.2)如果此时接受到了 ackno,无论如何都会将此时的 RTO 重置为初始的 RTO。
- (4.1)此时如果仍然有未确认的包等待重传,那么则基于被重置的 RTO 重新开始计时。
- (4.2)此时如果没有未确认的包(也就是对面收到了我们的全部发送的包),那么计时停止。
另外,在 2.1.1 我们有提到过,
对面窗口为 0 的时候,我们需要持续发送数据量为 1 的 Segment。它还有一个作用,那就是处于这种情况的时候不会触发超时重传窗口翻倍的机制。
因为这个时候,即便是重传,也只能重传 Segment.payload 为 1 的段。此时它正好处于上述步骤中 2、3、3.1 的区间内,它同样也会一直触发重传,但是不会触发 RTO = RTO * 2;
的机制。此时,RTO = RTO
。每次重传都会间隔相同的时间段,且这些时间段都为 RTO。
实际上的 RTO 重传算法更加复杂,一些课本上的重点特性(快重传、慢启动、选择性确认)我列举在这里供参考(但都是《自顶向下》中学过的):
它们更像是 RTO 算法的进一步优化。如果能做完 lab3 并且认真弄懂了其中的内容,再来学习这里会有一种更好的感觉。说不定真的有大神可以把这几个优化做进了 Sender 了呢?
(这里不看也可以,是选择性阅读,和 lab 没有任何关系)
超时重传算法补充
点我展开超时重传算法
- 初始阶段:在 TCP 连接刚建立时,发送方会将拥塞窗口(cwnd)初始化为一个较小的值,通常为 1 个最大段长度(MSS)。这意味着发送方在开始时只能发送一个 MSS 大小的数据段。
- 确认回复与窗口增长:当发送方发送一个数据段后,会等待接收方的确认(ACK)。一旦收到接收方对该数据段的 ACK,发送方就会将拥塞窗口的大小增加。在慢启动阶段,拥塞窗口是以指数方式增长的,即每收到一个 ACK,拥塞窗口就增加一个 MSS。例如,最初拥塞窗口为 1 个 MSS,收到第一个 ACK 后变为 2 个 MSS,收到第二个 ACK 后变为 4 个 MSS,以此类推。
- 阈值判断:发送方还会维护一个慢启动阈值(ssthresh)。当拥塞窗口的大小达到或超过慢启动阈值时,慢启动阶段结束,进入拥塞避免阶段,此时拥塞窗口的增长方式会发生变化,不再是指数增长,而是线性增长,以更缓慢、更稳定的方式增加发送速率,避免网络拥塞。
2.2 细节
2.2.1 固有字段含义
根据 TCPSender.hh 的成员变量,我这里做了一份它们的梳理。为什么要这么做呢?因为这个确实还是有点复杂,所以需要先过一遍它的初始代码~
在这个梳理中,我尽量将我的梳理信息和 2.1 理论知识中的部分挂钩起来,让你可以在拥有理论知识的基础上,
成员变量
- 以下值会自动初始化,我们只用使用就好:
WrappingInt32 _isn;
:初始序列号(Initial Sequence Number,ISN),用于 TCP 连接建立时的 SYN 段。WrappingInt32
是 32 位包装类型,处理序列号回绕,ISN 是 TCP 连接发送方首字节相对序列号。
std::queue<TCPSegment> _segments_out;
:待发送的 TCP 段队列。TCPSender
将需发送的 TCPSegment
对象放入此队列,由 TCPConnection
取出并发送。
何为发送数据?就是把 segment 放进它就好了。后面我们会详细讲到。
unsigned int _initial_retransmission_timeout;
:连接的初始重传超时时间(Retransmission Timeout,RTO)。段发出后,若在此时长内未收到 ACK 确认,则需重传。
ByteStream _stream;
:待发送的字节流,包含未发送的字节数据。TCPSender
从此字节流读取数据封装成 TCP 段发送。
- 以下值非已有,但有方法需要能直接获取到它们,因此我们直接设置,并需要我们主动更新:
uint64_t _next_seqno{0};
:下一个要发送的字节的绝对序列号。每次发送 TCP 段后,_next_seqno
会依段长度更新。
uint64_t _acked_seqno{0};
:已收到 ACK 确认的最大绝对序列号,用于判断哪些段已被对方成功接收。
uint16_t _window_size{1};
:接收方的窗口大小,表示接收方当前能接收的字节数,初始值为 1。发送方依此窗口大小决定发送数据量。
成员方法
- 接收 Ack:
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
功能:处理接收到的 ACK 确认,依 ACK 号和窗口大小更新 _acked_seqno
和 _window_size
等相关状态,处理未确认段队列。
是三大核心实现方法之一。用来确认哪些数据已经被接收,(此时发送方不用再维护这些数据,可以抛弃),如果此时仍然保有未接收到的数据,那么启动定时器。(注意不是触发重传!)
- 发送空 Segment:
void send_empty_segment();
功能:生成空负载的 TCP 段(常用于创建空 ACK 段),放入 _segments_out
队列。
这个直接实现然后发送就好。
- 填充 Receiver 窗口:
void fill_window();
功能:创建并发送 TCP 段,尽量填满接收方窗口。从 _stream
读取数据,封装成段发送,直到窗口填满或无更多数据可发。
这个就是实际上的发送方法。也是三大核心实现方法之一。下面我们会着重介绍。
- 时间流逝:
void tick(const size_t ms_since_last_tick);
功能:通知 TCPSender
时间流逝。依流逝时间更新定时器状态,超时则进行重传操作。
每隔几毫秒,tick 函数将会被调用,其参数声明了过去的时间,是三大核心实现方法之一。因为时间流逝方法中必然要判断当前是否需要触发重传。一旦我们认为需要重传,那么需要重新给定时器设立新的时间,并且从我们暂存的数据结构体中发送最早进入这个暂存数据结构体中的 Segment。
为什么要这么绕:直接调用 clock 或者 time 将会导致测试套件不可用。
- 访问方法:
size_t bytes_in_flight() const;
功能:返回已发送但未确认的字节数,即 _outstanding_seg
队列中所有段占用的序列号数量。
方法用 fill_window
发出后,对应的 seqno 没有在 receive_ack
中被接收到。
unsigned int consecutive_retransmissions() const;
功能:返回连续重传的次数。
- 序列号方法:
uint64_t next_seqno_absolute() const { return _next_seqno; }
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
通过这两个方法,用合适的数据类型返回 _next_seqno
。
通过梳理,我们可以知道,主要的三大功能(发送数据、接收 Ack、超时重传)分别由三个主方法完成。我仍然推荐先思考好这三个方法以外的其它方法如何实现,再去一个个推理三个主方法应该怎么写。
2.2.2 发送 Segment
发送数据的核心代码实际上就是它:
// 发送数据,segments_out 中的数据被发送出去是调用者做的事情,Sender 无需关心
this->segments_out().push(segment_to_send);
fill_window()
方法中,我们可以梳理出它的主要逻辑:
只要「已经被发送出去」的数据量,小于对面 window_size 就必须一直发送。在这个过程中
this->bytes_in_flight()
和 window_size 都会动态变化:- 每发送一次,
this->bytes_in_flight()
都会增长,两者确实会一直差距缩小。 - 但由于 Sender 同时也在 receive_ack,因此 window_size 也会一直增长。
- 他们是一个动态的过程,因此需要使用 while 一直循环,直到
this->bytes_in_flight()
即已发送的数据足够多,预计可以填满 window_size 才会满足。 - window_size 变为 0 时,Sender 仍然会一直发送,这个我们后面再进行补充。
- 每发送一次,
// 只要还有「预订要发送」但是这些数据还没填满 Receiver 窗口的空间,就从 _stream 中读取数据,然后发出去
// 要读取到的数据需要充分考虑到 window_size,然后有选择性的读取数据的长度
while (this->bytes_in_flight() < window_size) {
TCPSegment segment_to_send;
// 给 segment_to_send 添加 seqno
segment_to_send.header().seqno = this->next_seqno();
// 数据是从 _stream 中读取的
auto payload = this->_stream.read(payload_size);
segment_to_send.payload() = string(payload);
// 发送数据,segments_out 中的数据被发送出去是调用者做的事情,Sender 无需关心
this->segments_out().push(segment_to_send);
// 更新序列号和发出但未 ACK 的字节数
this->_next_seqno += payload_true_size; // _next_seqno 是 absolute seqno
}
std::queue<std::pair<uint64_t, TCPSegment>> _outstanding_seg;
已发出但未收到 ACK 确认的 TCP 段队列。每个元素是 std::pair
,uint64_t
表示段的绝对序列号,TCPSegment
是具体段,用于管理未确认段。
bool _has_set_fin_flag{false};
:是否发送了 FIN 标志位的标志。FIN 标志表示发送方无更多数据要发送,请求关闭连接。发送带 FIN 标志的段后,此标志设为 true
。
bool _has_set_syn_flag{false};
:是否发送了 SYN 标志位的标志。SYN 标志用于发起 TCP 连接建立。发送带 SYN 标志的段后,此标志设为 true
。
2.2.3 接收 Ack
我们在 ack_received() 方法中处理接收 Ack 的逻辑。我们接收 ackno 和 window_size 这两个参数。
首先,window_size 非常好理解,我们直接更新类里面的 _window_size 就可以了。
ackno 主要是用于确认有哪些值已经被收到了。
2.2.4 超时重传
一旦我们认为需要重传,那么需要重新给定时器设立新的时间,并且从我们暂存的数据结构体中发送最早进入这个暂存数据结构体中的 Segment。
TCPSenderTimer _timer;
:重传定时器,跟踪每个段的发送时间,超时触发重传操作。
uint32_t _consecutive_retransmissions_count{0};
:连续重传的次数。段重传后仍未收到 ACK 确认,继续重传,此计数器记录连续重传次数,用于调整重传超时时间。
2.3 边界情况
参考新开的 Lab 2-4 单元测试篇,里面给出了关于这部分单元测试的讲解和一些建议。
https://blog.pengdonglai.com/2025/02/15/stanford-cs144-2-1/
2.4 具体实现
我的具体实现基本上借鉴了大佬的实现… 所以思路看上去很像那确实是我太菜了。
fill_window()
void TCPSender::fill_window() {
if (this->_has_set_fin_flag) {
return;
}
// 为防止死锁,发送数据的时候即便对面窗口为 0,也至少发送 1 单位的数据
auto window_size = max<uint16_t>(1, this->_window_size);
// 只要还有「预订要发送」但是这些数据还没填满 Receiver 窗口的空间,就从 _stream 中读取数据,然后发出去
// 要读取到的数据需要充分考虑到 window_size,然后有选择性的读取数据的长度
while (this->bytes_in_flight() < window_size) {
TCPSegment segment_to_send;
// 给 segment_to_send 添加 seqno
segment_to_send.header().seqno = this->next_seqno();
if (!this->_has_set_syn_flag) {
segment_to_send.header().syn = true;
this->_has_set_syn_flag = true;
}
// MAX_PAYLOAD_SIZE 只限制字符串长度并不包括 SYN 和 FIN,但是 window_size 包括 SYN 和 FIN
auto payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE,
min(window_size - this->bytes_in_flight() - segment_to_send.header().syn,
this->_stream.buffer_size()));
// 数据是从 _stream 中读取的
auto payload = this->_stream.read(payload_size);
segment_to_send.payload() = string(payload);
// 如果读到 EOF 了且 window_size 还有空位,那就把 fin 给放进去
// 注意这里有一个细节,那就是这里不需要检查 TCPConfig::MAX_PAYLOAD_SIZE,因为即便是它满了,那也可以放入 FIN
if (!this->_has_set_fin_flag && _stream.eof() &&
this->bytes_in_flight() + segment_to_send.length_in_sequence_space() < window_size) {
segment_to_send.header().fin = true;
this->_has_set_fin_flag = true;
}
// payload_true_size 包含了 fin 和 syn 后的大小
uint64_t payload_true_size = segment_to_send.length_in_sequence_space();
// 空数据报就不发送了
if (payload_true_size == 0) {
break;
}
// 发送数据,segments_out 中的数据被发送出去是调用者做的事情,Sender 无需关心
this->segments_out().push(segment_to_send);
// 如果定时器关闭,则启动定时器
if (!this->_timer.is_running()) {
this->_timer.start();
}
// 保存备份,重发时可能会用
this->_outstanding_seg.emplace(this->_next_seqno, std::move(segment_to_send));
// 更新序列号和发出但未 ACK 的字节数
this->_next_seqno += payload_true_size; // _next_seqno 是 absolute seqno
}
}
ack_received()
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
// 和接收方原理一样,checkpoint 为 next_seqno_absolute
auto abs_ackno = unwrap(ackno, _isn, next_seqno_absolute());
// 传入的 ACK 是不可靠的,直接丢弃
if (abs_ackno > this->next_seqno_absolute()) {
return;
}
// 在接受到 ack 的时候如果收到窗口为 0,窗口大小发送方仍将其视为 1,允许发送 1 字节的数据。
// 但是这个时候,未采用 TCP 标准的 指数退避策略(即 RTO 不翻倍)
this->_window_size = window_size;
// 用于标记是否有 segment 是否成功
bool some_seg_ack_successful = false;
// 处理已经收到的包(序列号空间要小于 ACK)
while (!this->_outstanding_seg.empty()) {
auto &[abs_seq, segment] = this->_outstanding_seg.front();
if (abs_seq + segment.length_in_sequence_space() - 1 < abs_ackno) {
some_seg_ack_successful = true;
this->_acked_seqno += segment.length_in_sequence_space();
this->_outstanding_seg.pop();
} else {
break;
}
}
// 有成功 ACK 的包,则重置定时器(触发 RTO),清零连续重传次数
if (some_seg_ack_successful) {
this->_consecutive_retransmissions_count = 0;
this->_timer.set_time_out(_initial_retransmission_timeout);
this->_timer.start();
}
// 没有等待 ACK 的包了,则关闭定时器
if (this->bytes_in_flight() == 0) {
this->_timer.stop();
}
// 尝试继续填满窗口发送
fill_window();
}
Timer 类则是完全照搬这里的实现:https://lrl52.top/1004/cs144-lablab3/,这里不再做赘述。