rust网络框架Pingora源码阅读2

文章目录

上次主要阅读了PingoraServer部分的代码, 本文阅读Pingora另一个比较重要的部分Service, Pingora内置两种服务background(后台)和Listening(监听), 本文着重后者。

Pingora代码版本: v0.1.0

internals.md在https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md

代码入口

下文展示之前例子中的一小部分代码。

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

#[async_trait]
impl ProxyHttp for LB {/*实现略*/}

// 1. 
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));

// 2. 
lb.add_tcp("0.0.0.0:10080");

// 3.
my_server.add_service(lb);

// 4.
my_server.run_forever();

代码分解如下

  1. 创建一个http_proxy_service, 即listening service(监听服务)
  2. http_proxy_service增加监听端口, 也就是本次创建的监听(代理)服务监听哪个端口,可以多次调用以添加不同的监听端口
  3. 将服务添加的Server对象中
  4. 运行Server对象

上面的代码可以分为两个部分,Service的创建和初始化,以及如何将Service添加到Server对象并启动Server对象,后面这一部分已经在前面介绍过了,让我们将前者再细化一点,即Service初始化, Service配置

Service初始化

Service初始化就是创建一个实现Service Trait的对象。

初始化代码如下

let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));

pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {
    Service::new(
        "Pingora HTTP Proxy Service".into(),
        HttpProxy::new(inner, conf.clone()),
    )
}

impl<SV> HttpProxy<SV> {
    fn new(inner: SV, conf: Arc<ServerConf>) -> Arc<Self> {
        Arc::new(HttpProxy {
            inner,
            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
            shutdown: Notify::new(),
        })
    }
}

上面的代码并没有太多可讲的部分,因为只是不断的填充对应的结构体而已。

在继续深入调用链之前,先看看运行前的Service配置。

Service配置

Service的配置主要跟服务的类型有关,比如健康检查的后台服务就需要设置要检查的后端,监听服务自然最重要的就是设置监听入口啦,监听服务支持两种协议, HTTP和HTTPS,两者的区别主要在于是否有TLS握手的阶段,当握手之后,两者处理的逻辑都会落到HttpProxy,这里主要看监听服务的HTTP的代码。

监听服务支持三种监听类型,HTTP, HTTPS, UDS(Unix Socket)(这个应该不怎么会用吧。。。).

let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));

lb.add_tcp("0.0.0.0:10080");

impl<A> Service<A> {
	pub fn add_tcp(&mut self, addr: &str) {
        self.listeners.add_tcp(addr);
    }
}

impl Listeners {
	pub fn add_tcp(&mut self, addr: &str) {
        self.add_address(ServerAddress::Tcp(addr.into(), None));
    }
    
    pub fn add_address(&mut self, addr: ServerAddress) {
        self.add_endpoint(addr, None);
    }
    
    pub fn add_endpoint(&mut self, l4: ServerAddress, tls: Option<TlsSettings>) {
        self.stacks.push(TransportStackBuilder { l4, tls })
    }
}

监听服务里面有个listeners字段似乎不是那么让人意外,listeners对象的内部主要是一个个的stack, 相当于传输层的抽象。

其内部的结构如下:

service-internals

至此,Service的配置差不多了。你可能有疑问,这就完了? 也没看到具体的构建逻辑呀,就只是不断的封装对象而已。这其实主要是因为大多数框架对于资源的获取或者说创建都是惰性的,即不会在启动之前尝试获取资源,Pingora自然也不例外,具体的构建流程会在启动时构建,所以看看Service的启动流程吧,也就是Service的调用链。

Service调用链

Service的调用链要从之前的Server启动开始看。

my_server.run_forevler();

pub fn run_forever(&mut self) {
    while let Some(service) = self.services.pop() {
            let threads = service.threads().unwrap_or(conf.threads);
            let runtime = Server::run_service(
                service,
                self.listen_fds.clone(),
                self.shutdown_recv.clone(),
                threads,
                conf.work_stealing,
            );
            runtimes.push(runtime);
        }
}

fn run_service() -> Runtime
    {
        let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
        service_runtime.get_handle().spawn(async move {
            service.start_service(fds, shutdown).await;
        });
        service_runtime
    }

上面的代码总结起来就是最终创建一个异步任务运行Servicestart_service方法。

start_service

因为Service包括了多个stack(协议栈)/endpoint(监听端口), 所以start_service就负责Service的整个生命周期就行,而具体的转发逻辑就交给自己的stack(协议栈)/endpoint(监听端口)就行了。

async fn start_service(&mut self, fds: Option<ListenFds>, shutdown: ShutdownWatch) {
    // 1.
    let runtime = current_handle();
    // 2. 
    let endpoints = self.listeners.build(fds);

    let handlers = endpoints.into_iter().map(|endpoint| {
        // 3. 
        let app_logic = self.app_logic.clone();
        let shutdown = shutdown.clone();
        // 4. 
        runtime.spawn(async move {
            Self::run_endpoint(app_logic, endpoint, shutdown).await;
        })
    });

    // 5. 
    futures::future::join_all(handlers).await;
    self.listeners.cleanup();
    self.app_logic.cleanup();
}

代码分解如下:

  1. 获取当前线程的tokio运行时
  2. 开始构建各个协议栈,还是在构建对象,没有具体的资源获取或者初始化的操作。
  3. 也就是我们写的业务逻辑对象,即实现了ProxyHttp trait的那部分。
  4. 启动endpoint,也就是启动协议栈。
  5. Service对象的收尾操作,等待请求处理完成(如果是升级或者优雅关闭的话)。

run_endpoint

run_endpoint就是开始监听了。

async fn run_endpoint() {
    	// 1. 
        loop {
            // 2. 
            let new_io = tokio::select! { // TODO: consider biased for perf reason?
                new_io = stack.accept() => new_io,
                shutdown_signal = shutdown.changed() => {
                    match shutdown_signal {
                        // 处理关闭信号...
                }
            };
            // 3.
            match new_io {
                Ok(io) => {
                    let app = app_logic.clone();
                    let shutdown = shutdown.clone();
                    current_handle().spawn(async move {
                        // 4.
                        match io.handshake().await {
                            // 5.
                            Ok(io) => Self::handle_event(io, app, shutdown).await,
                            Err(e) => {
                                // 错误处理忽略
                            }
                        }
                    });
                }
                Err(e) => {
                    // 错误处理忽略...
            }
        }
		
        // 当前版本只是一个占位符,啥都没做。。。
        stack.cleanup();
    }
            

pub async fn handle_event(event: Stream, app_logic: Arc<A>, shutdown: ShutdownWatch) {
    	// 6.
        let mut reuse_event = app_logic.process_new(event, &shutdown).await;
        while let Some(event) = reuse_event {
            reuse_event = app_logic.process_new(event, &shutdown).await;
        }
    }

代码分解如下:

  1. 监听端点的请求肯定是不断循环啦
  2. 轮训新请求和失败的信号
  3. 处理新获取的连接
  4. 先握手,如果是HTTP协议则对应一个空函数,也就是HTTP不需要握手
  5. 开始处理HTTP请求
  6. app_logic也就是HttpProxy对象处理新请求,如果是长连接(也就是可复用)就不断的处理。

至此stack(协议栈)的活干完了,它的活就是获取连接,如果连接是TLS就处理一下握手,之后的HTTP/HTTPS协议就交给app_logic了,也就是HttpProxy对象。

小结

总结一下就是,监听服务启动时根据添加的endpoint(端点)创建对应数量的stack对象,这个对象是传输层的抽象,它包括TCP以及TLS, 当传输层处理完之后就是HTTP协议的事了,所以将连接传给app_logic处理,不触碰应用层协议,如果app_logic觉得可复用就再维护一个循环不断处理后续的请求,而这个处理的逻辑就交给process_new函数了。

实现链

在深入process_new之前,我们得先梳理一下HttpProxy的调用链,或者说实现链,如果你看代码你会发现HttpProxy结构体的impl代码块是没有实现process_new函数的,而Serviceapp_logic类型需要实现ServerApp + Send + Sync + 'static的, 而ServerApp的trait并没有为HttpProxy实现, 总的来说它们之间的实现中间还隔了一个HttpServerApp,它们之间的实现关系如下图。

service-implements

而相关的代码如下:

// pingora-core\src\services\listening.rs
impl<A: ServerApp + Send + Sync + 'static> Service<A> {}

// pingora-core\src\apps\mod.rs
impl<T> ServerApp for T
where
    T: HttpServerApp + Send + Sync + 'static,
{
    async fn process_new() -> Option<Stream> {}
}

// pingora-proxy\src\lib.rs
impl<SV> HttpServerApp for HttpProxy<SV>
where
    SV: ProxyHttp + Send + Sync + 'static,
    <SV as ProxyHttp>::CTX: Send + Sync,
{
    async fn process_new_http() -> Option<Stream> {}
}

为啥要这么设计? 应该是为了分层和后续的扩展吧。

还有就是这两个trait都预留了一个cleanup的函数,分别叫做cleanuphttp_cleanup

下面是ServerAppHttpServerApp的介绍。

  • ServerApp: 这个trait定义了传输层(TCP或TLS)应用接口。
  • HttpServerApp: 这个trait定义了HTTP应用接口。

既然知道了对应的调用链,那么我们可以继续深入后续的处理函数了。

process_new

在看这段代码的时候需要之前一些HTTP2的知识, HTTP2与HTTP1有很多不同,其中一点就是前者默认保持长连接,而后者需要额外的请求头keepalived来设置(虽然现在几乎默认设置保持长连接…),然后就是HTTP2可以在同一个TCP连接发起多个请求, 这个请求的传输单位在HTTP2的上下文中被称为stream,而HTTP1(不考虑pipeline)需要一个一个请求的依次发送并接受.

至于一些其他HTTP2的细节参考WikipediaRFC或者自行搜索吧。

好吧,让我看看实际的代码是啥。

async fn process_new() -> Option<Stream> {
    	// 1.
        match stream.selected_alpn_proto() {
            Some(ALPN::H2) => {
                let h2_options = self.h2_options();
                // 2.
                let h2_conn = server::handshake(stream, h2_options).await;
                let mut h2_conn = match h2_conn {
                    Err(e) => {
                        // 忽略错误.
                    }
                    Ok(c) => c,
                };
                
                // 3.
                loop {
                    // 获取HTTP2里面的链接
                    let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await;
                    let h2_stream = match h2_stream {
                        Err(e) => {
                            // 忽略错误处理
                        }
                        Ok(s) => s?, // None means the connection is ready to be closed
                    };
                    
                    let app = self.clone();
                    let shutdown = shutdown.clone();
                    pingora_runtime::current_handle().spawn(async move {
                        // 4.                               // 注意这里是new_htttp2
                        app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
                            .await;
                    });
                }
            }
            // 5.
            _ => {
                                                     // 注意这里是new_htttp1
                self.process_new_http(ServerSession::new_http1(stream), shutdown)
                    .await
            }
        }
    }

代码分解如下:

  1. 根据协议处理处理不同的连接,这个协议版本从哪来? HTTPS就从TLS握手阶段时的ALPN字段来,HTTP协议就从请求头来。
  2. HTTP2协议需要再次握手, 这个握手是HTTP2应用协议的握手,跟TCP和TLS不同。
  3. 前面说过HTTP2在一个TCP连接中可以发起多个请求,每个请求的承载单位叫做stream,所以这里通过一个循环不断的获取对应的连接(非TCP连接)
  4. 请求传递给下一层(HttpServerApp)。
  5. 非HTTP2都默认是作为HTTP1协议处理。

process_new_http

上面的代码看起来平平无奇,都是调用的process_new_http,那么被调用方怎么区分协议呢?答案是,process_new_http不处理协议本身,只关心各个处理阶段。协议的解析的由session对象处理,至于怎么处理协议各种细节的,有兴趣的可以自己看哈。

async fn process_new_http() -> Option<Stream> {
        // 1.
        let session = Box::new(session);
        let mut session = match self.handle_new_request(session).await {
            Some(downstream_session) => Session::new(downstream_session),
            None => return None, // bad request
        };

    	// 2.
        if *shutdown.borrow() {
            // stop downstream from reusing if this service is shutting down soon
            session.set_keepalive(None);
        } else {
            // default 60s
            session.set_keepalive(Some(60));
        }
		
        // 3.
        let ctx = self.inner.new_ctx();
        self.process_request(session, ctx).await
    }

代码分解如下:

  1. 通过session开始读取客户端的请求头
  2. 判断是否关闭并设置保持长连接的默认时间(60s), HTTP2这个调用不会生效
  3. 后面几乎交给了HttpProxy的各种钩子函数了。

总结

本文主要是具体逻辑处理(钩子函数)之前的各个阶段代码,至于后面业务逻辑(各种钩子函数),再写文章吧,文章已经太长了。

参考链接