简介
CS144 是斯坦福大学的经典计算机网络课程(全称 Introduction to Computer Networks)以动手实现网络协议著称,实验项目包括:
网络调试工具:使用
tcpdump
和Wireshark
分析网络流量。可靠数据传输协议:实现滑动窗口协议(类似 TCP 的简化版)。
路由协议模拟:编写路由算法模拟器(如距离向量算法)。
完整 TCP 协议栈:在用户态实现 TCP 的可靠传输、拥塞控制等功能,与真实操作系统协议栈交互。
CS144 的 Lab 截止 2025 年初有两个大版本,一个是 Sponge 版本,一个是 Minnow,Sponge 是旧版本,也就是所谓「8 个 Project 带你实现整个 TCP/IP 协议栈」的版本。而 Minnow 是 2023 年春季新上线的版本。
课程主页: https://cs144.github.io/
GitHub: https://github.com/CS144/minnow
24 Winter 的课程主页 (archived by Wayback Machine): https://web.archive.org/web/20241209004804/https://cs144.github.io/
0.1 准备条件
对于 C++ 的要求:非常适合 C++ 初学者来学,不需要你会写很「艰深/炫酷」的魔法代码,你只需要稍微在你已经知道的语法基础上再学一点点,就可以完成所有的 labs。当然 0 小白肯定是完全不行,至少得会 C++ 语法,然后让你写几个 LeetCode 啥的应该也得要是会的。
你应当具备一定的调试代码的能力。在这个 lab 中,能一次性写对这几个方法的人少之又少,因此调试就成了必不可少的一环。本文档也会介绍一下 Clion 环境下如何对这个项目进行断点调试。如果觉得断点调试有难度,也可以通过打印 log 来调试。
不止一个朋友反映这个 lab 的单元测试给的不够。一旦 lab1-2 有边界条件没有考虑好,会严重影响到 lab3-4 的开发体验——不少朋友发现 3-4 无法编译通过其实出在 lab1-2 的内容没有写好导致。因此,适当补充 lab1-2 模块的单元测试很重要,并且你也要知道单元测试的基本概念,以及 C++ 下怎么玩转它们。
对于计算机网络的要求:强烈建议学完《自顶向下》这本书后再来做这个 lab。虽然说对于大神自然是不需要,但是有了理论基础后,再来做可以大幅缩减「抽象感」——至少你会明白你在做什么。
对于 Linux 基本功的要求:Git、至少你得操作过 Linux 命令行。这方面如果经验为 0 则会被搭建环境那一步就劝退了。虽然这种状况下确实也还是能读代码,但是会少了自己参与实现的乐趣。
lab0:热身
1.1 搭建环境
Windows 下建议使用 WSL,你可以参考我的 WSL 环境进行配置。https://blog.pengdonglai.com/2025/02/03/env-book-1/
在 linux 环境下,运行如下命令,就可以愉快地进行实验了。
apt-get install g++ git clang-tidy clang-format libpcap-dev \
iptables mininet tcpdump telnet wireshark socat netcat-openbsd \
doxygen graphviz
如果使用 IDE,那么确保 IDE 在 Linux 的环境下运行,可以参考我的环境配置中关于 Clion 的部分~
随后,我们可以将代码拷贝下来,我用的是 https://github.com/vikshanker/sponge:
# 你可以使用其他源,不一定要和我这个一样
git clone https://github.com/vikshanker/sponge
Stanford 推荐我们使用 Git 来进行分支管理。但实际上不用 Git,我们将很难把实验进行下去。而且找遍了全互联网也没有看见几个不用 Git 完成的例子。因此我们在安装环境的时候,也请务必在 Linux/WSL 环境下把 Git 给搭建好(我的环境搭建指南中也有),并且可以正常连接到 Github。
原因很简单:那就是因为我们只是负责在已经搭建好的、工程实现很好的框架上做代码填空题,把关键的几个组件 (e.g., TCPSender/TCPReceiver
, NetworkInterface
, etc.) 中的某几个函数实现/完善了而已。
但实际上,cs144 的 lab 是一个十分典型的现代 C++ 工程,不管是代码规范,工程实现,测试框架,还是现代 C++ 的各种语法,写的都非常优雅且规范。对于还不太熟悉 C++ 的同学们来说,尤其是现代 C++ 的各种语言特性用法,这门课是非常好的入门课。
可以说,相比于学校只教语法,这个 lab 更是教会了我们什么是「现代工程」,它的「落地感」是非常强的。从这个意义来说,使用 Git 进行版本管理,本身也是「现代代码工程」的一部分。
1.2 整体认知
这里就涉及到了一个棘手的问题,万一哪天作者删库跑路了(你斯坦福爸爸已经做过一遍了,笑死,sponge
早就被删的一干二净),我们从一个不知名的地方拷贝了代码。亦或者,你回退了其他人的代码例如我的,这个时候你该如何辨别你可以开始做实验了呢?
1.2.1 Startcode/初始代码
编程作业从 checkpoint 0 (network warmup) 到最后的 checkpoint 7: putting it all together) 一共分成了八个 checkpoint。
所以在我们拷贝 Github 的时候,需要首先观察它的 commit 是不是如下图所示:
这个 commit 中的代码,就是 lab0 所需要的基座代码。我们需要做的就是在它的基础之上进行填补和修复 bug,这代码确实是有点问题的,之后我们会通过测试程序和跑动单元测试,来验证我们写的内容是否正确。
重点来啦,只有这个 commit 的仓库是不应该被选择的。这里我们开一个天眼:后续的 lab1 一直到最后,都会有代码被补充进来。我们都必须要从远端来合并代码到我们的开发分支。这么说可能会有些抽象,我们来看一个例子。
除了我们当前的提交,我们记作 commit0。我们的开发记作 my-commit0。我们此时的分支的提交顺序为:
commit0
之后我们完成了 lab0 的开发,此时的分支提交顺序为:
commit0 --> my-commit0
助教准备了这些分支:
1. lab1:commit0 --> commit1
2. lab2:commit0 --> commit1 --> commit2
那么在开始 lab1 前,我们都必须将助教的 lab1 分支合并到我们的分支。合并之后,我们将会得到这样的一个分支提交顺序:
commit0 --> commit1 --> my-commit0 --> merged branch lab1
这个时候我们才能开始 lab1 的开发工作。
所以我们得出了结论:我们 fork 的代码仓库,也必须同时含有助教的几个分支才可以进行开发。不然 lab0 写完我们就要抓瞎了。在 Github 上截图大概就是这样的:
当我们确认了仓库本身含有这几个 startercode 分支,我们才能意识到,这恰恰就是我们想要的仓库。
我们可以通过以下 Git 命令行来实现代码的 merge 操作:
git fetch origin lab2-startercode
git checkout lab2-startercode
git checkout [*Your branch here*]
git merge lab2-startercode
这样子我们也可以在本地查看 lab2-startercode 分支。如果你不需要,也可以直接一步到位:
git merge origin/lab2-startercode
如果有报错说分支不存在,那么请检查你使用的仓库是否符合我们的要求(远端是否有对应 startcode 分支)
1.2.2 Compile/编译
这个代码仓库按照助教的预期,应该是 clone 到本地后就可以直接编译。编译的步骤大致如下:
首先定位到项目内 root 文件夹。
(如果使用 Clion 可跳过)创造一个 build 文件夹里面放 cmake 产物。并进入它。
mkdir build
cd build
如果您使用 Clion,那么大概率这个时候 Clion 已经自动识别出来了这是一个 cmake 工程,并且创建好了 cmake-build-debug 或者其他您自定义的 build 文件夹,进入它就好。之后也不用执行 cmake 命令了。
cd cmake-build-debug
- (如果使用 Clion 可跳过)构建 cmake 构建系统:(此时已经在 build 文件夹下)
cmake ..
- 执行编译:
make
当然,如果您的电脑具备多个核心,那么完全可以多进程跑,会更快(例如我使用 8 线程,那么我的命令就是):
make -j8
但是我们会因为各种各样的问题导致无法编译。要想完全避免这个办法需要使用官方的 Linux 镜像和正确的 g++ 版本。因为我参考过的博客在这一步出了编译问题的绝大多数都是缺了某个库。(g++ 版本越高,需要额外声明的库就越多,所以保持 g++ 一致是最重要的~)
如果你是 Linux/cpp 高手,那自然是三下五除二就解决了。如果你是学生啥都不懂,那就用官方教程一步步来。
https://stanford.edu/class/cs144/vm_howto/vm-howto-image.html
我们现在已经有了基座代码仓库,也有了开发环境。到了这一步其实也已经很不容易了!我们终于可以正式开始做实验了!
1.3 感性认识
首先,请使用浏览器访问 http://cs144.keithw.org/hello,看看会发生什么?我们看到了一个网站。
当然,作为一个前端,服务端返回一个 html 文本,浏览器解析它这个过程我们再熟悉不过。只是,我们这个 lab 的重点其实是在于通信的过程,也就是使用 http 协议,从服务端传输 html 文本的过程。
接下来让我们使用工具 telnet,手动来访问这个网站吧。
在终端输入如下命令:
telnet cs144.keithw.org http
这个命令告诉 telnet,你要和 cs144.keithw.org
建立一个可靠的字节传输流,使用的协议是 http(超文本传输协议)。
当然,你只是这么做,起不到任何作用… 这也是这个 lab 中的一个坑点,我们必须要快速输入以下三行命令:
telnet cs144.keithw.org http
# 输入上面一行启动字节传输流后,快速输入以下三行内容:
GET /hello HTTP/1.1
Host: cs144.keithw.org
Connection: close
我试的时候我死活试不出来,笑死==(应该是收到魔法的影响了 0.0 不过我就没深究了)。我看了其他人的解法,大概你会得到这样的返回结果:
HTTP/1.1 200 OK
Date: Thu, 16 Jun 2022 05:17:15 GMT
Server: Apache
Last-Modified: Thu, 13 Dec 2018 15:45:29 GMT
ETag: "e-57ce93446cb64"
Accept-Ranges: bytes
Content-Length:14
Connection: close
Content-Type: text/plain
Hello, CS144!
Connection closed by foreign host.
它实际上就表示了我们和服务端进行 http 协议通信的过程。我们这个 lab 要实现的,实际上也是这个过程,我们最终呈现的效果,也是返回一个 Hello, CS144!
那么我们如何用代码来描述这个过程呢?我们接下来 1.3 就将实现这个 Webget
程序!
(什么?你问我双工通信和发送邮件?不重要的东西直接忽略不就好了吗。抓重点!你完全可以通过看视频来学习这两个步骤,照着做就 OK 了~)
(发送邮件需要斯坦福邮箱,一般人做不了)
1.4 Webget 程序
这个程序实际上就帮我们用代码在 Linux 环境下,完成了刚刚 telnet 所做的事情:功能为向指定 IP 地址发送 HTTP GET 请求,然后输出所有响应。
要求调用 Linux kernel 自带的 TCP/IP 协议栈实现即可,Sponge 已经将 C-Style 的 Socket 封装成了几个 C++ 类:FileDescriptor, Socket, TCPSocket, and Address classes。
官方文档已经 RIP 了… 根据这几个类,我们不难发现其中包含了 TCPSocket,于是到了我们喜闻乐见的、使用无封装的原始代码发送 http 请求过程!
我们打开项目,找到 webget.cc 文件。之后我们填充 get_URL 函数,以下是一种可能的解法:
void get_URL(const string &host, const string &path) {
// Your code here.
// You will need to connect to the "http" service on
// the computer whose name is in the "host" string,
// then request the URL path given in the "path" string.
// Then you'll need to print out everything the server sends back,
// (not just one call to read() -- everything) until you reach
// the "eof" (end of file).
TCPSocket sock;
sock.connect(Address(host, "http"));
sock.write("GET " + path + " HTTP/1.1\r\n");
sock.write("Host: " + host + "\r\n");
sock.write("Connection: close\r\n");
sock.write("\r\n");
sock.shutdown(SHUT_WR);
while (!sock.eof()) {
cout << sock.read();
}
sock.close();
cerr << "Function called: get_URL(" << host << ", " << path << ").\n";
// cerr << "Warning: get_URL() has not been implemented yet.\n";
}
这段代码的主要功能是使用 TCP 套接字(TCPSocket)向指定的 HTTP 服务器发送一个简单的 HTTP GET 请求,然后读取并输出服务器返回的响应数据,最后关闭套接字连接。
sock.write("\r\n");
发送一个空行,表示 HTTP 请求头的结束。HTTP 协议规定,请求头和请求体之间需要用一个空行分隔。
sock.shutdown(SHUT_WR);
关闭套接字的写端。SHUT_WR 表示只关闭写操作,即客户端不再向服务器发送数据,但仍然可以接收服务器的响应数据。
sock.eof()
使用一个循环来读取服务器的响应数据,直到套接字的输入流结束(即服务器关闭连接)。sock.eof()
用于检查套接字的输入流是否已经结束。
我们进入 build 文件夹,重新 make 后,运行:
./apps/webget cs144.keithw.org /hello
它的行为应该和上述我们使用 telnet 的行为保持一致。
最后它应该能够通过测试:
make check_webget
在测试的时候,有可能会碰到 t_webget.sh
文件不存在的报错,但是你点进去 test 文件夹,发现这个文件确实是在的。
这个是因为 t_webget.sh
他的分行规范用的是 Windows 下的 CRLF,而不是 Linux 或者 MacOS 下的 LF。如果你使用的是 Clion,那么在 Clion 的右下角就可以将它更改:
这个问题在 Lab4 的时候仍然会出现。它出现的根本原因是我们是在 Windows 系统下 clone 的代码,之后被意外保存为了 CRLF。
1.5 ByteStream
上一节我们使用了 TCPSocket
,调用了 read
和 write
方法,这一节就来实现一个这样的类管道:一个运行在内存的有序可靠字节流(An in-memory reliable byte stream)
1.5.1 是实现什么?
它是一个数据结构,也是一个对象类。它应该具备一个 write
方法,以便于我们可以往这个对象内写入数据。我们也可以使用 read
方法从它身上读取数据。
它应该具备一个容量大小,因为我们不能一味的只存入数据而不读取数据。容量已经满的时候,多出来的内容应该被直接丢弃。
写入端也必须告知这个数据结构它写完了。我们通过一个对外暴露的函数从写入端那边获取到这一信息。
详细说明,它具有以下要求:
- 字节流可以从写入端写入,并以相同的顺序,从读取端读取
- 字节流是有限的,写者可以终止写入。而读者可以在读取到字节流末尾时,产生 EOF 标志,不再读取
- 所实现的字节流必须支持流量控制,以控制内存的使用。当所使用的缓冲区爆满时,超过缓冲区容量的内容直接丢弃。字节流会返回成功写入的字节数
- 写入的字节流可能会很长,必须考虑到字节流大于缓冲区大小的情况。即便缓冲区只有1字节大小,所实现的程序也必须支持正常的写入读取操作
- 在单线程环境下执行,因此不用考虑各类条件竞争问题
1.5.2 如何设计它?
具体的实现,在 libsponge/byte_stream.hh
libsponge/byte_stream.cc
阅读头文件我们已经可以知道 byte_stream 的系统类设计了:
创建的时候,传入一个 int 代表管道的「容量」。
对于写管道,实现相关的函数
write
、remain_capacity
、end_input
、set_error
:- 写入数据:使用
write
方法将一段测试数据写入字节流,并打印实际写入的字节数。 - 检查剩余容量:使用
remaining_capacity
方法检查字节流还能容纳多少字节,并打印结果。 - 标记输入结束:使用
end_input
方法标记输入结束,再次检查输入是否结束并打印结果。 - 记流出现错误:使用
set_error
方法标记流出现错误,使用error
方法检查流是否有错误并打印结果。
- 写入数据:使用
对于读管道,实现
pop_output
、read
、input_ended
、eof
、error
、buf_size
、buf_empty
、bytes_written
、bytes_read
。- 查看数据:使用
peek_output
方法查看缓冲区中的前 x 个字节,不将它们从缓冲区中移除,并打印结果。 - 读取并移除数据:使用
read
方法读取并移除缓冲区中的前 y 个字节,并打印结果。 - 手动移除数据:使用
pop_output
方法手动移除缓冲区中的 x 个字节,并打印提示信息。 - 检查输入是否结束:使用
input_ended
方法检查输入是否已经结束,并打印结果。 - 检查缓冲区是否为空:使用
buffer_empty
方法检查缓冲区是否为空,并打印结果。 - 检查是否到达输出结尾:使用
eof
方法检查是否到达输出结尾,并打印结果。 - 获取已写入的字节数:使用
bytes_written
方法获取已经写入的总字节数,并打印结果。 - 获取已读取的字节数:使用
bytes_read
方法获取已经读取的总字节数,并打印结果。 - 获取当前缓冲区大小:使用
buffer_size
方法获取当前缓冲区的大小,并打印结果。
- 查看数据:使用
通过系统设计分析,我们可知:
- 毫无疑问我们需要一个 _capacity 来保存容量。
- 我们需要一个 _count_written 保存
1.5.3 如何实现?
我们使用双端队列实现,这是因为双端队列可以实现「前进后出」:
byte_stream.hh
std::deque<char> _buffer_deque{}; //!< The data of this stream
size_t _count_written; //!< The written count of the input
size_t _count_read; //!< The read count of the input
size_t _capacity; //!< The capacity of this stream
bool _error{}; //!< Flag indicating that the stream suffered an error.
bool _input_finished_flag{}; //!< Flag indicating that the input has been overed.
byte_stream.cc
ByteStream::ByteStream(const size_t capacity) : _count_written(0), _count_read(0),
_capacity(capacity) {}
size_t ByteStream::write(const string &data) {
auto _safe_len = min(data.size(), this->remaining_capacity());
for (size_t i = 0; i < _safe_len; ++i) {
this->_buffer_deque.push_back(data.at(i));
this->_count_written++;
}
return _safe_len;
}
//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
auto _safe_len = min(len, this->_buffer_deque.size());
string _str;
for (size_t i = 0; i < _safe_len; ++i) {
_str += this->_buffer_deque.at(i);
}
return _str;
}
//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
auto _safe_len = min(len, this->_buffer_deque.size());
for (size_t i = 0; i < _safe_len; ++i) {
this->_buffer_deque.pop_front();
this->_count_read++;
}
}
//! Read (i.e., copy and then pop) the next "len" bytes of the stream
//! \param[in] len bytes will be popped and returned
//! \returns a string
string ByteStream::read(const size_t len) {
string _str = peek_output(len);
this->pop_output(len);
return _str;
}
void ByteStream::end_input() {
this->_input_finished_flag = true;
}
bool ByteStream::input_ended() const { return this->_input_finished_flag; }
size_t ByteStream::buffer_size() const { return this->_count_written - this->_count_read; }
bool ByteStream::buffer_empty() const { return buffer_size() == 0; }
bool ByteStream::eof() const { return this->_input_finished_flag && this->buffer_empty(); }
size_t ByteStream::bytes_written() const { return this->_count_written; }
size_t ByteStream::bytes_read() const { return this->_count_read; }
size_t ByteStream::remaining_capacity() const { return this->_capacity - this->buffer_size(); }
Lab0 我们实现了 ByteStream 这样一个双向字节流。在接下来的实验里,你会实现两个ByteStream:
- 一个出向(outbound)ByteStream,本地应用可以通过 socket 向里面写数据,你的TCP可以把数据传输到远端;
- 一个入向(inbound)ByteStream,你的 TCP 会接收从远端发来的数据,然后被本地应用所读取。
1.6 调试
1.6.1 修改调试配置
首先我们来解决使用 gdc 调试的情况下,部分小伙伴可能会遇到 optimize_out 或者 inline 等问题。这是因为项目的原因,即便是调试模式下,仍然会有部分代码会被编译器「优化」掉。我们打开 etc/cflags.cmake
文件:
修改第 18 行中 -Og
为 -O0
:
# add some flags for the Release, Debug, and DebugSan modes
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -ggdb3 -O0")
即便设置了 -Og
(针对调试进行优化),某些编译器可能会有默认的优化设置,这会覆盖你在 CMake 里设置的 -Og。你可以检查编译器的默认设置,或者明确指定优化级别。
我们这里使用的 -O0
表示不进行优化,能保证所有变量都不会被优化掉。
您还可以通过 volatile
关键字来防止变量被优化:
#include <iostream>
int main() {
volatile int optimized_out_variable = 42;
std::cout << optimized_out_variable << std::endl;
return 0;
}
编译器可能会对一些函数进行内联处理,这可能会导致变量被优化掉。您还可以通过 __attribute__((noinline))
(GCC 和 Clang)或者 __declspec(noinline)
(MSVC)来禁止函数内联:
#include <iostream>
// 禁止函数内联
__attribute__((noinline)) void test_function() {
int variable = 42;
std::cout << variable << std::endl;
}
int main() {
test_function();
return 0;
}
1.6.2 调试心智
我们的调试会使用位于 tests
文件夹下的文件,里面正是一个个单元测试。在项目原理篇中,我会详细说明项目是如何进行单元测试的。
在进行了 lab1 代码的 merge 操作后,我们可以看到 stream 相关的单元测试已经在对应文件夹下了:
// tests
byte_stream_capacity.cc
byte_stream_construction.cc
byte_stream_many_writes.cc
byte_stream_one_writes.cc
// 其它不再一一列举
在 build 文件夹内执行 make 操作后,它们会在 build/tests
(或者 Clion 默认配置的 cmake-build-debug/tests
)下生成对应的二进制文件:
// cmake-build-debug/tests
byte_stream_capacity
byte_stream_construction
byte_stream_many_writes
byte_stream_one_writes
// 其它不再一一列举
我们 lab1 中实现的是 byte_stream,项目中它所有的单元测试名字前缀都是它。它们当中的每一个都可以被执行。
我们在执行 make check_lab1
命令的时候,实际上执行的是 makefile target,在这个过程中实际上就是会跑这几个二进制程序。所以我们执行 make check_lab1
命令走单元测试的时候,你至少要学会看单元测试的输出,以便于让它告诉你你是「错在了哪个单元测试」和「哪个单元测试步骤」,之后你要去找到对应的二进制文件来执行。
1.6.3 Clion 调试
由于我使用的 IDE 为 Clion,所以会这个过程会比 VSCode 描述的相对更细致一些。
我会假定你是在 WSL2 中进行的实验,并且按照我的教程配置好了实验环境(其它环境只要能 make 成功大概率也能跑动 Clion 调试,只是不敢做这个保证)。
在 Clion 右上角调试,选择好对应的调试程序:
假设我选择了 byte_capacity,然后我在 byte_stream.cc 中觉得有问题的地方打上断点。大概如图所示:
之后,点击旁边的那个虫子,就进入了调试的过程。如果程序成功暂停,就说明我们成功了。
1.6.4 VSCode 调试
暂时没时间写了,因为我已经不用 VSCode 写 cpp 了。
lab1:Reassembler
在 Lab1 中,需要实现一个流重组器:将传入的没一份打乱的字节流重新组合形成原来的有序的字节流。即实现 StreamReassembler 类。
为什么要这样做?TCP 对处理乱序和重复包的健壮性来自于它能将字节流的任意摘录拼接回原始流的能力。在离散的可测试模块中实现这一功能将使处理传入段变得更加容易。
merge 完 lab1 的代码后,你会看到我们本节需要实现的文件:stream_reassembler.cc
和 stream_reassembler.hh
2.1 是实现什么?
基于 ByteStream,实现一个流重组器。
(原 Up 主视频链接,也是主讲 CS144 的。讲的很通俗易懂,但是我并没有采纳他对于 lab1 的解法)
我们需要使用到 ByteStream,主要是为了写入它,然后让它帮我们进行缓存。除此以外的功能,都是 Reassembler 自己实现的。
从我们需要使用的文件 stream_reassembler.cc
可以知道,我们和 ByteStream 共享同一个 capacity。从这里我们已经可以感觉出来项目的第一个要求了:
如果我们在 Reassembler 新增数据结构,那么这个数据结构一旦需要线性增长,那么不能超过 capacity 容量。至于说需要需要超过线性增长的数据结构,那更是需要被抛弃掉。
根据头文件可知核心方法为 push_substring()
,它的参数非常好懂。
data 是写入的数据,还是和 ByteStream 一样,写入的是一个 string 字符串。index 的意思是,当前的 data 在发送方处于的位置。比如说一个数据,它的容量为 1024,那么 index 完全可能是 0-1024 中的任意一个数,表示 data 在这个数据中所起到的位置。
根据我们上一个 lab 的知识,最后一个 eof 参数,表示就是到达
push_substring()
的这个 data 是否为处于全数据的末尾。
2.2 Reassembler 核心原理
除此之外,我们得来首先认识下这个流重组器具体是做了什么事情的:
如果您了解 TCP 协议的「可靠交付」机制,您可以更好的明白这里的含义。如果实在不明白,那么也还请您再去阅读相关资料,弄懂后才能来做 Reassembler。
假设我们的初始条件是 capacity 为 5. 这要求我们初始化的空间复杂度需要为 5C。(C 在这里为参数)。
我们将 Reassembler 中的空间称为「管道」,将 BuffStream 的空间称为「缓冲区」。上文中 push_substring()
的 index 称为「string 起始索引」。当然,这么称呼只是为了好听,实际上它们并不叫这个名字。那么我们可以得到第一个认知:
「管道」的容量(共 5C) +「缓冲区」的容量(0) = 5C
2.2.1 第一种过程:整体
(0)我们往空的管道中,string 起始索引为 0 的情况下注入 2 个字符:a b,并且设置 eof 为 1。此时管道的容量为 2C
(管道的容量,只要能放得下它应该要放的数据就够了,比如共 5C {ab 2C},这个数据就是一个合法的数据。一旦表示为共 1C {ab 2C},很显然 ab 放不下,所以是非法数据)
此时可以看到 EOF 的索引为 2,它是由起始索引为 0 + ab 的长度 2 = 2 得来的。当然我们现在不需要知道它有什么作用。直观表示为:
「管道」的容量(共 5C {ab 2C}) +「缓冲区」的容量(0) = 5C
a(管道) b(管道)--EOF-- 0(管道) 0(管道) 0(管道)= 5C
(1)之后,我们的「缓冲区」开始从「管道」中读取 ab,此时我们的直观表示为:
「管道」的容量(共 3C) +「缓冲区」的容量(共 2C {ab 2C}) = 5C
「管道」的容量(共 5C {ab 2C}) +「缓冲区」的容量(0) = 5C
a(缓冲区) b(缓冲区)--EOF-- 0(管道) 0(管道) 0(管道)= 5C
(2)由于此时 EOF 以前的内容全部进入了缓冲区,那么可以认为 reassembler 进入到了「结束态」。
这样就表示了一个读入「管道」,然后从「管道」中读出来放到「缓冲区」,最后结束的全过程。
我们这个时候不禁有一个疑问,我们从「管道」中读出来放到「缓冲区」的过程,怎么确定哪些数据要从「管道」中读出来呢?
我们不妨来看看第二个例子。
2.2.2 第二种过程:核心
(0)我们往空的管道中,string 起始索引为 1 的情况下注入 2 个字符:a b,并且设置 eof 为 0。此时管道的容量为 2C,直观表示为:
「管道」的容量(共 5C {ab 2C}) +「缓冲区」的容量(0) = 5C
0(管道) a(管道) b(管道) 0(管道) 0(管道)= 5C
(1)由于我们还不知道第一个位置上的数据是什么,因此我们还不能读取数据到「缓冲区」中。
(2)往管道中,string 起始索引为 0 的情况下注入 1 个字符:c,并且设置 eof 为 0。直观表示为:
「管道」的容量(共 5C {cab 3C}) +「缓冲区」的容量(0) = 5C
c(管道) a(管道) b(管道) 0(管道) 0(管道)= 5C
(3)我们还不知道 eof 的位置。但我们已经能读取数据到「缓冲区」中,读取数据后,直观表示是这个样子:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) 0(管道) 0(管道)= 5C
(4)往管道中,string 起始索引为 3 的情况下注入 1 个字符:f,并且设置 eof 为 1。
此时我们设置 EOF 为 4(3(起始索引)+ 1(f 的长度))。经过直观表示为:
「管道」的容量(共 2C {f 1C}) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) f(管道)--EOF-- 0(管道)= 5C
(5)我们读取 f 到「缓冲区」:
「管道」的容量(共 1C) +「缓冲区」的容量(4C {cabf 4C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) f(缓冲区)--EOF-- 0(管道)= 5C
(6)由于此时 EOF 以前的内容全部进入了缓冲区,那么可以认为 reassembler 进入到了「结束态」。
通过以上过程,我们总结,一般来说我们有两个条件促成这一点:
我们读取到了 eof,且这个 eof 表示的「string 起始索引」+ data.length() 被我们记录下来称为「EOF 的索引」。
缓冲区从 0 到「EOF 的索引」中,索引对应的所有字符,均「存在」能被有效读取(在直观表示中,它们必须全都不为 0)。然后从索引 0 开始,一路读取所有能读的字符,读到 eof 的 index 为止。
至此我们回答了从「管道」中读出来放到「缓冲区」的过程,怎么确定哪些数据要从「管道」中读出来的问题。
2.2.3 第三种过程:异常
接过程二的(3),从(4)开始我们设置新的过程。(3)后,直观表示大概是这个样子:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) 0(管道) 0(管道)= 5C
(4.1)往管道中,string 起始索引为 3 的情况下注入 3 个字符:fgh,并且设置 eof 为 1。
这个时候我们发现了一个问题,那就是我们注入的容量会超过总的 capacity 设定。想要完全存储,需要 3C(现有 cab)+ 3C(新数据 fgh)。这个时候,超出 capacity 的部分会被直接抛弃。,但是 eof 仍然会被记作 6(这是由「string 起始索引」+ data.length() 即 3 + 3 = 6 决定好的)。
执行完注入管道的逻辑后,我们可以看到这样的直观表示:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) f(管道) g(管道)--EOF--(6)= 5C
我们可以看到,由于 capacity 已经放不下了,h 被直接丢弃。
这种情况下,需要等缓冲区的内容被释放腾出空间以便于重新记录下来 h,亦或者 EOF 的位置被更新到 5,不然不会进入到「结束态」即(超时前)永远不会完成这次的连接过程。
(4.2)回到 3 结束的位置。设想另一种场景:往管道中,string 起始索引为 2 的情况下注入 3 个字符:bfg,并且设置 eof 为 1。
这种情况下,我们可以看到,已经在缓冲区的 b,又被注入到了管道中。这个时候 reassembler 会判断 b 当前 index 位置的字符已经被 reassembled(其实就是等效于它进入了缓冲区),因此 b 实际上不会被处理,会从 f 开始进行逻辑上的处理,并按照之前的逻辑设置好 EOF 为 5:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) f(管道) g(管道)--EOF--(5)= 5C
这种之后会转到上一个步骤的(5)中进行处理且进入「结束态」。
(4.3)回到 3 结束的位置。设想另一种场景:往管道中,string 起始索引为 2 的情况下注入 3 个字符:cfg,并且设置 eof 为 1。
这种情况下,我们可以看到,已经在缓冲区的 b,和重新注入的 c 存在冲突。这个时候我们应当认为是发布侧存在问题,所以在这个 lab 中,要么不作处理,要么抛出 runtime 异常。会从 f 开始进行逻辑上的处理,这一步和(4.2)一致,并按照之前的逻辑设置好 EOF 为 5:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区) f(管道) g(管道)--EOF--(5)= 5C
通过这个步骤可以知道,产生冲突的时候,我们遵循「先来后到」的原则,因为我们认为 TCP 就是可靠交付,所以我们就是会假设最开始传输的内容是正确的。
2.2.4 第四种过程:空碎片
(4.4)回到 3 结束的位置。设想另一种场景:往管道中,string 起始索引为 3 的情况下注入 0 个字符,并且设置 eof 为 1。
这种情况下,我们直接设置 EOF 即可:
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区) b(缓冲区)--EOF-- 0(管道) 0(管道)= 5C
这种之后会转到上一个步骤的(5)中进行处理且进入「结束态」。
(4.5)回到 3 结束的位置。设想另一种场景:往管道中,string 起始索引为 3 的情况下注入 0 个字符,并且设置 eof 为 0。
这种情况下,我们什么也不做,直接返回。
(只要同时满足注入字符为空,eof 也为 0,起始索引是什么都无所谓,都是直接返回)
(4.6)回到 3 结束的位置。设想另一种场景:往管道中,string 起始索引为 2 的情况下注入 0 个字符,并且设置 eof 为 1。
「管道」的容量(共 2C) +「缓冲区」的容量(共 3C {cab 3C}) = 5C
c(缓冲区) a(缓冲区)--EOF-- b(缓冲区) 0(管道) 0(管道)= 5C
这种之后会转到上一个步骤的(5)中进行处理且进入「结束态」。
2.3 数据结构 & 算法实现
我们来定义一次 push_string()
为「一次操作」。
这个时候你会发现,每一次操作可能会包括以下过程:
- 向「管道」中注入数据和 eof 信息,当然也有可能是一个空的数据。
- 将「管道」中,前一部分「连续的」数据,读取到「缓冲区」中。
- 更新 eof 信息,并且检查当前是否达到了 eof 的状态。
好的,通过文字描述,和 2.2 部分对于过程的描述,是不是想到了双指针?即,读取到的缓冲区的 index1,和管道中的 eof index2。index1 是为了区分哪些数据进入了缓冲区,index2 是为了区分当前是否到达了 eof 状态。而 index1 == index2 一旦形成,则向缓冲区注入 end_input()
,是不是一个完整的过程就出来了?
观察实现文件 stream_reassembler.cc
,我们注意到还有这两个方法等着我们实现,通过对 empty 的描述我们可以知道就是管道还没有进缓冲区的容量是否为空,因此我们完成 empty()
方法,并且最简单的不就是设置一个私有成员变量表示 unassembled_bytes
吗?(先不管怎么算,直接设置这个变量然后返回不香吗)
所以我们实现了这些,并且此时已有 index1 记作 size_t _reassembled_index
,unassembled_bytes
记作 _disassembled_count
。此时我们加上 index2,记作 _eof_index
。
size_t StreamReassembler::unassembled_bytes() const { return this->_disassembled_count; }
bool StreamReassembler::empty() const { return this->unassembled_bytes() == 0; }
最后我们还有一个问题,我们目前是只有「缓冲区」的(这部分项目已经写好)也就是 _output
:
ByteStream _output; //!< The reassembled in-order byte stream
但是我们还需要一个数据结构来保存「管道」,而且「管道」也必须要能记录当前特定 index 下的字符是否被 assembled。我们需要一个空间复杂度为 n 的数据结构,当然能满足的数据结构有很多,这也就是 lab1 实现方案不止一处的最大原因。
我们使用最稳妥(因为用的足够多,性能也有保障)的 vector,里面用 pair 存两个数据 {当前 char,是否被 assembled}。于是我们有了这些成员变量:
private:
// Your code here -- add private members as necessary.
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
std::vector<std::pair<char, bool>> _reassembler_stream; //!< The current part of the reassembled byte stream
size_t _eof_index = 0; //!< The index of Eof char
size_t _disassembled_count = 0; //!< The count of char has not been reassembled
之后我们按照对应的流程实现,就好了。我这里先用文字简单描述下我的过程:
- 首先计算
reassembler_stream
存储的开头和结尾 index。根据两边的 indexreassembler_stream
:
- 1.1 已经被 reassembled 的 index 就不必再重新 assemble 一遍,因此需要设置一个变量
first_disassembled
,并且从 index 和first_disassembled
中,选一个大的,作为开头 index,即_start_index
。 - 1.2 大于 eof 的不必 assemble;大于 capacity 的不能 assemble。因此我们计算此时剩余的 capacity
this->_first_disassembled_index + this->_capacity - this->_output.buffer_size();
,并且和eof
、data.length()
做对比,得到结尾 index,即_end_index
。 - 1.3 结尾 index 比实际遍历到的最后一个 index 大 1. 所以遍历实际上是左闭右开区间,符合国际惯例。
更新 eof。执行 1 中对
reassembler_stream
的 assemble 操作。这个操作实际上就是去更新_reassembler_stream
的false
值为 true。如果某个字符进入到了 代表着进入了_reassembler_stream
则意味着它进入了 reassembler 的窗口。接下来需要处理已经在缓冲区的新元素,
reassembled_index
在_reassembler_stream
「一路向前」,直到碰到没有 assembled 的元素为止。在实际操作中,没有 assembled 的元素在_reassembler_stream
中仍然对应则false
值。当我们收集了一定的 _str 后,向 ByteStream 中写入这些字符串:
this->_output.write(_str);
最后,如果
reassembled_index
「追上了」eof_index
,则表明当前当前读取完毕。执行_output
中停止写入的方法。
如果看的比较迷茫(不得不承认描述代码就是这么枯燥),结合下方的代码一起看,也许会更容易理解。
2.4 具体实现
stream_reassembler.hh
class StreamReassembler {
private:
// Your code here -- add private members as necessary.
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
std::vector<std::pair<char, bool>> _reassembler_stream; //!< The current part of the reassembled byte stream
bool _eof = false; //!< Eof
size_t _eof_index = 0; //!< The index of Eof char
size_t _first_disassembled_index = 0; //!< The first index of char not yet reassembled
size_t _disassembled_count = 0; //!< The count of char has not been reassembled
}
stream_reassembler.cc
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
// 1. 首先计算 reassembler_stream 存储的开头和结尾 index。根据两边的 index 更新 reassembler_stream
// 1.1 已经被 reassembled 的 index 就不必再重新 assemble 一遍
auto _start_index = max(index, this->_first_disassembled_index);
// 1.2 大于 eof 的不必 assemble;大于 capacity 的不能 assemble
// 根据上面两条规则,计算两边的 index
auto _index_of_remain_buffer_capacity = this->_first_disassembled_index + this->_capacity - this->_output.buffer_size();
// end 结尾不计算 eof,应该是当前 first_disassembled_index(已经读取到的 index)+ 剩余未进入 BufferStream 缓冲区的容量
auto _end_index = min(index + data.length(), min(_index_of_remain_buffer_capacity, this->_eof_index));
// 2. 更新 eof
if (eof) {
this->_eof_index = min(this->_eof_index, index + data.length());
}
// 执行 1 中对 reassembler_stream 的 assemble 操作
for (size_t i = _start_index, j = _start_index - index; i < _end_index; ++i, ++j) {
auto &t = this->_reassembler_stream.at(i % this->_capacity);
if (t.second) {
// 重复进入了缓冲区,代表着重复读取了元素,直接抛弃
if (t.first != data.at(j)) {
throw runtime_error("StreamReassembler::push_substring: Current char has been written!");
}
} else {
// 代表着进入了 reassembler 的缓冲区
t = make_pair(data.at(j), true);
this->_disassembled_count++;
}
}
string _str;
// 3. 接下来需要处理已经在缓冲区的新元素,first_disassembled_index「一路向前」,直到碰到没有 assembled 的元素为止。
while (this->_first_disassembled_index < this->_eof_index && this->_reassembler_stream.at(this->_first_disassembled_index % this->_capacity).second) {
_str.push_back(this->_reassembler_stream.at(this->_first_disassembled_index % this->_capacity).first);
this->_reassembler_stream[this->_first_disassembled_index % this->_capacity] = {0, false};
this->_first_disassembled_index++;
this->_disassembled_count--;
}
// 4. 当我们收集了一定的 _str 后,向 ByteStream 中写入这些字符串
this->_output.write(_str);
// 5. 最后,如果 first_disassembled_index「追上了」eof_index,则表明当前当前读取完毕
if (this->_first_disassembled_index == this->_eof_index) {
this->_eof = true;
this->_output.end_input();
}
}
size_t StreamReassembler::unassembled_bytes() const { return this->_disassembled_count; }
bool StreamReassembler::empty() const { return this->unassembled_bytes() == 0; }