rust网络框架Pingora源码阅读2
上次主要阅读了Pingora
的Server
部分的代码, 本文阅读Pingora
另一个比较重要的部分Service
, Pingora
内置两种服务background(后台)和Listening(监听), 本文着重后者。
Pingora
代码版本: v0.1.0
internals.md在https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md
代码入口
下文展示之前例子中的一小部分代码。
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]
impl ProxyHttp for LB {/*实现略*/}
// 1.
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
// 2.
lb.add_tcp("0.0.0.0:10080");
// 3.
my_server.add_service(lb);
// 4.
my_server.run_forever();
代码分解如下
- 创建一个
http_proxy_service
, 即listening service
(监听服务) - 为
http_proxy_service
增加监听端口, 也就是本次创建的监听(代理)服务监听哪个端口,可以多次调用以添加不同的监听端口 - 将服务添加的
Server
对象中 - 运行
Server
对象
上面的代码可以分为两个部分,Service
的创建和初始化,以及如何将Service
添加到Server
对象并启动Server
对象,后面这一部分已经在前面介绍过了,让我们将前者再细化一点,即Service初始化, Service配置。
Service初始化
Service
初始化就是创建一个实现Service Trait
的对象。
初始化代码如下
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {
Service::new(
"Pingora HTTP Proxy Service".into(),
HttpProxy::new(inner, conf.clone()),
)
}
impl<SV> HttpProxy<SV> {
fn new(inner: SV, conf: Arc<ServerConf>) -> Arc<Self> {
Arc::new(HttpProxy {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
})
}
}
上面的代码并没有太多可讲的部分,因为只是不断的填充对应的结构体而已。
在继续深入调用链之前,先看看运行前的Service
配置。
Service配置
Service
的配置主要跟服务的类型有关,比如健康检查的后台服务就需要设置要检查的后端,监听服务自然最重要的就是设置监听入口啦,监听服务支持两种协议, HTTP和HTTPS,两者的区别主要在于是否有TLS握手的阶段,当握手之后,两者处理的逻辑都会落到HttpProxy
,这里主要看监听服务的HTTP的代码。
监听服务支持三种监听类型,HTTP, HTTPS, UDS(Unix Socket)(这个应该不怎么会用吧。。。).
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:10080");
impl<A> Service<A> {
pub fn add_tcp(&mut self, addr: &str) {
self.listeners.add_tcp(addr);
}
}
impl Listeners {
pub fn add_tcp(&mut self, addr: &str) {
self.add_address(ServerAddress::Tcp(addr.into(), None));
}
pub fn add_address(&mut self, addr: ServerAddress) {
self.add_endpoint(addr, None);
}
pub fn add_endpoint(&mut self, l4: ServerAddress, tls: Option<TlsSettings>) {
self.stacks.push(TransportStackBuilder { l4, tls })
}
}
监听服务里面有个listeners
字段似乎不是那么让人意外,listeners
对象的内部主要是一个个的stack
, 相当于传输层的抽象。
其内部的结构如下:
至此,Service
的配置差不多了。你可能有疑问,这就完了? 也没看到具体的构建逻辑呀,就只是不断的封装对象而已。这其实主要是因为大多数框架对于资源的获取或者说创建都是惰性的,即不会在启动之前尝试获取资源,Pingora
自然也不例外,具体的构建流程会在启动时构建,所以看看Service
的启动流程吧,也就是Service
的调用链。
Service调用链
Service
的调用链要从之前的Server
启动开始看。
my_server.run_forevler();
pub fn run_forever(&mut self) {
while let Some(service) = self.services.pop() {
let threads = service.threads().unwrap_or(conf.threads);
let runtime = Server::run_service(
service,
self.listen_fds.clone(),
self.shutdown_recv.clone(),
threads,
conf.work_stealing,
);
runtimes.push(runtime);
}
}
fn run_service() -> Runtime
{
let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
service_runtime.get_handle().spawn(async move {
service.start_service(fds, shutdown).await;
});
service_runtime
}
上面的代码总结起来就是最终创建一个异步任务运行Service
的start_service
方法。
start_service
因为Service
包括了多个stack(协议栈)/endpoint(监听端口), 所以start_service
就负责Service
的整个生命周期就行,而具体的转发逻辑就交给自己的stack(协议栈)/endpoint(监听端口)就行了。
async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) {
// 1.
let runtime = current_handle();
// 2.
let endpoints = self.listeners.build(fds);
let handlers = endpoints.into_iter().map(|endpoint| {
// 3.
let app_logic = self.app_logic.clone();
let shutdown = shutdown.clone();
// 4.
runtime.spawn(async move {
Self::run_endpoint(app_logic, endpoint, shutdown).await;
})
});
// 5.
futures::future::join_all(handlers).await;
self.listeners.cleanup();
self.app_logic.cleanup();
}
代码分解如下:
- 获取当前线程的
tokio
运行时 - 开始构建各个协议栈,还是在构建对象,没有具体的资源获取或者初始化的操作。
- 也就是我们写的业务逻辑对象,即实现了
ProxyHttp trait
的那部分。 - 启动endpoint,也就是启动协议栈。
Service
对象的收尾操作,等待请求处理完成(如果是升级或者优雅关闭的话)。
run_endpoint
run_endpoint
就是开始监听了。
async fn run_endpoint() {
// 1.
loop {
// 2.
let new_io = tokio::select! { // TODO: consider biased for perf reason?
new_io = stack.accept() => new_io,
shutdown_signal = shutdown.changed() => {
match shutdown_signal {
// 处理关闭信号...
}
};
// 3.
match new_io {
Ok(io) => {
let app = app_logic.clone();
let shutdown = shutdown.clone();
current_handle().spawn(async move {
// 4.
match io.handshake().await {
// 5.
Ok(io) => Self::handle_event(io, app, shutdown).await,
Err(e) => {
// 错误处理忽略
}
}
});
}
Err(e) => {
// 错误处理忽略...
}
}
// 当前版本只是一个占位符,啥都没做。。。
stack.cleanup();
}
pub async fn handle_event(event: Stream, app_logic: Arc<A>, shutdown: ShutdownWatch) {
// 6.
let mut reuse_event = app_logic.process_new(event, &shutdown).await;
while let Some(event) = reuse_event {
reuse_event = app_logic.process_new(event, &shutdown).await;
}
}
代码分解如下:
- 监听端点的请求肯定是不断循环啦
- 轮训新请求和失败的信号
- 处理新获取的连接
- 先握手,如果是HTTP协议则对应一个空函数,也就是HTTP不需要握手
- 开始处理HTTP请求
- 用
app_logic
也就是HttpProxy
对象处理新请求,如果是长连接(也就是可复用)就不断的处理。
至此stack(协议栈)的活干完了,它的活就是获取连接,如果连接是TLS就处理一下握手,之后的HTTP/HTTPS协议就交给app_logic
了,也就是HttpProxy
对象。
小结
总结一下就是,监听服务启动时根据添加的endpoint(端点)创建对应数量的stack对象,这个对象是传输层的抽象,它包括TCP以及TLS, 当传输层处理完之后就是HTTP协议的事了,所以将连接传给app_logic
处理,不触碰应用层协议,如果app_logic
觉得可复用就再维护一个循环不断处理后续的请求,而这个处理的逻辑就交给process_new
函数了。
实现链
在深入process_new
之前,我们得先梳理一下HttpProxy
的调用链,或者说实现链,如果你看代码你会发现HttpProxy
结构体的impl
代码块是没有实现process_new
函数的,而Service
的app_logic
类型需要实现ServerApp + Send + Sync + 'static
的, 而ServerApp
的trait并没有为HttpProxy
实现, 总的来说它们之间的实现中间还隔了一个HttpServerApp
,它们之间的实现关系如下图。
而相关的代码如下:
// pingora-core\src\services\listening.rs
impl<A: ServerApp + Send + Sync + 'static> Service<A> {}
// pingora-core\src\apps\mod.rs
impl<T> ServerApp for T
where
T: HttpServerApp + Send + Sync + 'static,
{
async fn process_new() -> Option<Stream> {}
}
// pingora-proxy\src\lib.rs
impl<SV> HttpServerApp for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_new_http() -> Option<Stream> {}
}
为啥要这么设计? 应该是为了分层和后续的扩展吧。
还有就是这两个trait都预留了一个
cleanup
的函数,分别叫做cleanup
和http_cleanup
。
下面是ServerApp
和HttpServerApp
的介绍。
ServerApp
: 这个trait定义了传输层(TCP或TLS)应用接口。HttpServerApp
: 这个trait定义了HTTP应用接口。
既然知道了对应的调用链,那么我们可以继续深入后续的处理函数了。
process_new
在看这段代码的时候需要之前一些HTTP2的知识, HTTP2与HTTP1有很多不同,其中一点就是前者默认保持长连接,而后者需要额外的请求头keepalived
来设置(虽然现在几乎默认设置保持长连接…),然后就是HTTP2可以在同一个TCP连接发起多个请求, 这个请求的传输单位在HTTP2的上下文中被称为stream
,而HTTP1(不考虑pipeline)需要一个一个请求的依次发送并接受.
好吧,让我看看实际的代码是啥。
async fn process_new() -> Option<Stream> {
// 1.
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
let h2_options = self.h2_options();
// 2.
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
// 忽略错误.
}
Ok(c) => c,
};
// 3.
loop {
// 获取HTTP2里面的链接
let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await;
let h2_stream = match h2_stream {
Err(e) => {
// 忽略错误处理
}
Ok(s) => s?, // None means the connection is ready to be closed
};
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
// 4. // 注意这里是new_htttp2
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
}
// 5.
_ => {
// 注意这里是new_htttp1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
}
}
}
代码分解如下:
- 根据协议处理处理不同的连接,这个协议版本从哪来? HTTPS就从TLS握手阶段时的
ALPN
字段来,HTTP协议就从请求头来。 - HTTP2协议需要再次握手, 这个握手是HTTP2应用协议的握手,跟TCP和TLS不同。
- 前面说过HTTP2在一个TCP连接中可以发起多个请求,每个请求的承载单位叫做
stream
,所以这里通过一个循环不断的获取对应的连接(非TCP连接)。 - 请求传递给下一层(
HttpServerApp
)。 - 非HTTP2都默认是作为HTTP1协议处理。
process_new_http
上面的代码看起来平平无奇,都是调用的process_new_http
,那么被调用方怎么区分协议呢?答案是,process_new_http
不处理协议本身,只关心各个处理阶段。协议的解析的由session
对象处理,至于怎么处理协议各种细节的,有兴趣的可以自己看哈。
async fn process_new_http() -> Option<Stream> {
// 1.
let session = Box::new(session);
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session),
None => return None, // bad request
};
// 2.
if *shutdown.borrow() {
// stop downstream from reusing if this service is shutting down soon
session.set_keepalive(None);
} else {
// default 60s
session.set_keepalive(Some(60));
}
// 3.
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await
}
代码分解如下:
- 通过
session
开始读取客户端的请求头 - 判断是否关闭并设置保持长连接的默认时间(60s), HTTP2这个调用不会生效
- 后面几乎交给了
HttpProxy
的各种钩子函数了。
总结
本文主要是具体逻辑处理(钩子函数)之前的各个阶段代码,至于后面业务逻辑(各种钩子函数),再写文章吧,文章已经太长了。