Rust HTTP客户端reqwest源码阅读

文章目录

这篇文章主要是阅读reqwest的源代码, 而底层关于hyper的实现这篇文章浅尝辄止, 仅仅只是过一下hyper请求的大致流程,以后对rust的掌握程度更高之后在看hyper的源代码。

reqwest代码版本: 0.11.22

个人水平有限,如有错误还请指正。

概括

一般来说,一个客户端的实现至少会包含两个部分,协议解析连接池管理, 这点无论是python的requests还是golang的net/http都是差不多的,但是reqwest并不需要处理这两个部分,因为reqwest是对hyper的封装, 脏活累活都被hyper承包了,reqwest要做的就是提供尽可能优雅和便于使用的用户体验。

如果对reqwest的使用不太熟悉,可以参考我的这篇文章: https://youerning.top/post/reqwest-tutorial

快速入门

无论是写代码还是阅读代码,我们都需要一个入口,本篇文章的代码入口是以下代码。

use reqwest::Result;

#[tokio::main]
async fn main() -> Result<()>{
    let body = reqwest::get("https://youerning.top")
    .await?
    .text()
    .await?;

    println!("body: {}", body);
    Ok(())
}

代码入口

pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
    Client::builder().build()?.get(url).send().await
}

在深入代码前,需要先对上面最简单的这部分代码做一些解析, 上面的代码可以分为三个部分

  • 构造client
  • 构造请求
  • 发送请求并等待响应

然后我们可以按照这三个部分分别阅读代码。

构造client

首先来看看reqwest是怎么构造client的。

pub fn builder() -> ClientBuilder {
        ClientBuilder::new()
}

impl ClientBuilder {
    pub fn new() -> ClientBuilder {
        // 1.
        let mut headers: HeaderMap<HeaderValue> = HeaderMap::with_capacity(2);
        headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
		
        // 2.
        ClientBuilder {
            // 内部的config对象保存所有clientBuilder的配置项
            config: Config {
                error: None,
                accepts: Accepts::default(),
                // 省略其他配置项
             }
        }
    }

    pub fn build(self) -> crate::Result<Client> {
        // 3.
        let config = self.config;
        if let Some(err) = config.error {
            return Err(err);
        }
        
        // connector和builder的构造过程后文再说明

        // 4.
        Ok(Client {
            inner: Arc::new(ClientRef {
                // 省略其他参数
                hyper: builder.build(connector),
            }),
        })
}

代码分解如下:

  1. 创建一个HeaderMap用于保存发送请求的请求体,并设置accept请求体为"*/*"
  2. 构造一个默认的ClientBuilder对象, client的默认参数都可以在这里看到
  3. 检查config是否出错。
  4. 构造和检查各种参数,其中比较重要的两个对象是connectorbuilder,这部分后续展开

ClientBuilder的参数都是私有的,所以想要设置它的参数需要调用它提供的公开函数或者说接口,比较常用的有以下方法

  • user_agent 设置user-agent
  • default_headers 设置默认请求头, 这样就可以复用一组请求头
  • gzip, brotli, deflate reqwest支持的各种压缩算法, 这个些方法需要在依赖配置项那里启用才能用
  • proxy 设置代理
  • timeout, connect_timeout设置超时时间
  • pool_idle_timeout, pool_max_idle_per_host连接池超时和每个请求主机的最大空闲数
  • danger_accept_invalid_certs 禁用ssl证书检查,对于自签名证书服务器很有用
  • resolve, resolve_to_addrs, dns_resolver设置解析域名的dns, 有时候很有用。

connector

connector正如其名字表示的那样,它是一个连接器,负责连接远端,然后返回一个Connection, 这Connection后续可以放在连接池中复用。

根据依赖选项中的启用的特性不同,rust会编译不同的connector, 我们这里只看开启默认特性的代码。

pub fn build(self) -> crate::Result<Client> {
    
let mut connector = {
    // http connector 处理http协议
    let mut http = HttpConnector::new_with_resolver(DynResolver::new(resolver.clone()));
    #[cfg(feature = "__tls")]
    match config.tls {
        #[cfg(feature = "default-tls")]
        TlsBackend::Default => {
            // TlsConnectorBuilder
            let mut tls = TlsConnector::builder();
            // 设置tls相关属性
            tls.danger_accept_invalid_certs(!config.certs_verification);
            tls.use_sni(config.tls_sni);
            tls.disable_built_in_roots(!config.tls_built_in_root_certs);
        }
        // 在http connector外部包一层tls,用于处理tls连接
        Connector::new_default_tls(
            http,
            tls,
           // 其他参数省略
        )?
}
    
    pub(crate) fn new_default_tls<T>() -> crate::Result<Connector> {
        // 省略tls connector的构造过程
    }
    

connector的底层构造过程这里不做深入,仅仅粗略的过一遍。

builder

无论是reqwest还是hyper的构造逻辑都是类似的,在请求前通过一个builder对象来设置属性,最后build出最终的目标对象。

pub fn build(self) -> crate::Result<Client> {
    let mut builder = hyper::Client::builder();
        pub fn builder() -> Builder {
            Builder::default()
        }
 	
    // 省略其他的参数设置
    builder.pool_idle_timeout(config.pool_idle_timeout);
    builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
    connector.set_keepalive(config.tcp_keepalive);

    Ok(Client {
       inner: Arc::new(ClientRef {
           hyper: builder.build(connector),
           // 省略其他参数
           }
        }
    }
}
    
    pub fn build<C, B>(&self, connector: C) -> Client<C, B>{
        Client {
            config: self.client_config,
            conn_builder: self.conn_builder.clone(),
            connector,
            // 创建连接池,用于复用连接
            pool: Pool::new(self.pool_config, &self.conn_builder.exec),
        }
    }

Client::builder().build()方法最终构建出了一个包含hyper ClientClient,至此我们已经有Client了,可以开始构造请求。

构造请求

pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
    Client::builder().build()?.get(url).send().await
}

pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
        self.request(Method::GET, url)
    }

pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
    	// 默认使用的http协议是http/1.1
        let req = url.into_url().map(move |url| Request::new(method, url));
        RequestBuilder::new(self.clone(), req)
    }

Client::builder().build()?.get(url)最后生成了一个RequestBuilder对象,当我们得到builder对象之后可以通过它暴露的接口设置各种属性,当设置完毕后才发送请求。

RequestBuilder提供了很多有用的接口,这里简单的列举一些比较常用的方法

  • query 设置请求参数
  • header, header_sensitive, headers设置请求头
  • timeout 设置超时, 会覆盖ClientBuilder::timeout()的设置。
  • body, multipart, form, json 发送各种请求体

发送请求

当请求构造好之后就可以发送请求了。

pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
    Client::builder().build()?.get(url).send().await
}

pub fn send(self) -> impl Future<Output = Result<Response, crate::Error>> {
    	// 因为在设置request参数的时候可能出错,所以这里需要判断一下request是否正确
        match self.request {
            Ok(req) => self.client.execute_request(req),
            Err(err) => Pending::new_err(err),
        }
    }

pub(super) fn execute_request(&self, req: Request) -> Pending {
    // 1.
    let (method, url, mut headers, body, timeout, version) = req.pieces();
    
    // 2.
    let builder = hyper::Request::builder()
            .method(method.clone())
            .uri(uri)
            .version(version);
    
    // 基于http协议版本构造不同的ResponseFuture
    let in_flight = match version {
            _ => {
                // 3.
                let mut req = builder
                    .body(body.into_stream())
                    .expect("valid request parts");
                *req.headers_mut() = headers.clone();
                // 4.
                ResponseFuture::Default(self.inner.hyper.request(req))
            }
        };
    
    // 将responseFuture包一层
    Pending {
            inner: PendingInner::Request(PendingRequest {
                method,
                url,
                headers,
                body: reusable,
                urls: Vec::new(),
                retry_count: 0,
                client: self.inner.clone(),
                in_flight,
                timeout,
            }),
        }
}

代码分解如下:

  1. reqwest构造的请求分解,便于后续将参数传给底层的hyper
  2. 构造hyperbuilder对象,这个对象和reqwestrequestbuilder对象类似,用于构造请求
  3. 将参数传给hyperrequest对象
  4. 使用底层的hyper client请求,注意,这里返回的是Future, 所以不会立即执行

至此请求已经全部准备好也已经生成了ResponseFuture,就差最后的发送(.await)了。

poll

rust的异步编程在于Future这个trait, 只要实现了这个trait就是Future, 也就可以.await, 而.await的最终调用就是调用实现Future traitpoll方法,所以我们可以查看对应的poll方法来查看其对应的逻辑。

再次之前,我们简单的看下send()方法返回的对象的调用链

Pending -> PendingRequest -> ResponseFuture

Pending

impl Future for Pending {
    type Output = Result<Response, crate::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let inner = self.inner();
        // 之所以包一层是为了避免被多次调用
        // 不知道什么情况下会触发这种情况....
        match inner.get_mut() {
            // poll PendingRequest
            PendingInner::Request(ref mut req) => Pin::new(req).poll(cx),
            PendingInner::Error(ref mut err) => Poll::Ready(Err(err
                .take()
                .expect("Pending error polled more than once"))),
        }
    }
}

Pending的作用就是简单的封装, 避免多次调用。

什么情况会触发多次调用?

PendingRequest

PendingRequest开始真正的调用hyper之前返回的Future对象,并处理不同的响应码。

impl Future for PendingRequest {
    type Output = Result<Response, crate::Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 1.
        if let Some(delay) = self.as_mut().timeout().as_mut().as_pin_mut() {
            if let Poll::Ready(()) = delay.poll(cx) {
                return Poll::Ready(Err(
 crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
                ));
            }
        }
        
        // 2.
        loop {
            let res = match self.as_mut().in_flight().get_mut() {
                // 3.
                ResponseFuture::Default(r) => match Pin::new(r).poll(cx) {
                    // 请求完成但是失败
                    Poll::Ready(Err(e)) => {
                        if self.as_mut().retry_error(&e) {
                            continue;
                        }
                        return Poll::Ready(Err(
                            crate::error::request(e).with_url(self.url.clone())
                        ));
                    }
                    // 请求完成并成功
                    Poll::Ready(Ok(res)) => res,
                    // 还未完成响应
                    Poll::Pending => return Poll::Pending,
                },
                // http3的情况
            }
            
        	// 重定向的处理
            
            // 4.
            // 返回结果
            let res = Response::new(
                res,
                self.url.clone(),
                self.client.accepts,
                self.timeout.take(),
            );
            return Poll::Ready(Ok(res));
        }
    }

代码分解如下:

  1. 首先判断是否超时
  2. 因为可能存在重定向,所以需要使用loop来不断处理重定向请求(默认是最多允许10次重定向.)
  3. 调用ResponseFuturepoll方法
  4. 最后返回结果

至此请求已经通过底层的hyper发送,只等其返回结果了。

hyper

hyper的代码我就简单的过一下了,因为我也不是看得太懂T_T。

首先回到之前的调用路径, 上文的ResponseFuture就是这里的in_flight,也就是ResponseFuture::Default(self.inner.hyper.request(req))

let in_flight = match version {
    // 忽略http3情况的处理代码
    
    _ => {
        // 1.
        ResponseFuture::Default(self.inner.hyper.request(req))
    }
};

pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
	// 处理connect方法, 这在代理https时有用
    
    // 2.
    let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
        Ok(s) => s,
        Err(err) => {
            return ResponseFuture::new(future::err(err));
        }
    };
	
    // 3.
    ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
}

async fn retryably_send_request() -> crate::Result<Response<Body>> {
    loop {
            req = match self.send_request(req, pool_key.clone()).await {
                Ok(resp) => return Ok(resp),
                // 省略错误处理代码
            }
        }
    }
}

async fn send_request() -> Result<Response<Body>, ClientError<B>> {
    // 
    let mut pooled = match self.connection_for(pool_key).await {
            Ok(pooled) => pooled,
            // 省略错误处理代码
        };
    
    // 
    let mut res = match pooled.send_request_retryable(req).await {
            // 省略错误处理代码
            Ok(res) => res,
        };
    Ok(res)
}

代码分解如下:

  1. 通过hyper请求并返回ResponseFuture对象
  2. 基于请求的协议和域名构造pool_key, pool_key用来区分连接池中的连接
  3. retryably_send_request异步函数调用后生成的Future对象包装起来, 也就是说,当responseFuture对象被poll的时候就会调用retryably_send_request函数里的代码
  4. 获取连接池里的client.
  5. 将请求通过获得client发送出去

总结

reqwesthyper的代码都用到了构造者(builder)设计模式, 所以两者的请求步骤是差不多的,首先创建一个builder, 基于这个builder暴露的接口可以设置各种参数,最后调用builder方法生成对应的对象,比如Client对象或者Request对象。

个人认为,如果要尽可能的掌握一个编程库,查看其源代码是必不可少的,再者为了学习rust, 多看一下对应语言的编程库也是有必要的。

参考连接