Rust HTTP客户端reqwest源码阅读
这篇文章主要是阅读reqwest的源代码, 而底层关于hyper的实现这篇文章浅尝辄止, 仅仅只是过一下hyper请求的大致流程,以后对rust的掌握程度更高之后在看hyper的源代码。
reqwest代码版本: 0.11.22
个人水平有限,如有错误还请指正。
概括
一般来说,一个客户端的实现至少会包含两个部分,协议解析
和连接池管理
, 这点无论是python的requests
还是golang的net/http
都是差不多的,但是reqwest
并不需要处理这两个部分,因为reqwest
是对hyper
的封装, 脏活累活都被hyper
承包了,reqwest
要做的就是提供尽可能优雅和便于使用的用户体验。
如果对reqwest的使用不太熟悉,可以参考我的这篇文章: https://youerning.top/post/reqwest-tutorial
快速入门
无论是写代码还是阅读代码,我们都需要一个入口,本篇文章的代码入口是以下代码。
use reqwest::Result;
#[tokio::main]
async fn main() -> Result<()>{
let body = reqwest::get("https://youerning.top")
.await?
.text()
.await?;
println!("body: {}", body);
Ok(())
}
代码入口
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
在深入代码前,需要先对上面最简单的这部分代码做一些解析, 上面的代码可以分为三个部分
- 构造client
- 构造请求
- 发送请求并等待响应
然后我们可以按照这三个部分分别阅读代码。
构造client
首先来看看reqwest
是怎么构造client的。
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
// 1.
let mut headers: HeaderMap<HeaderValue> = HeaderMap::with_capacity(2);
headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
// 2.
ClientBuilder {
// 内部的config对象保存所有clientBuilder的配置项
config: Config {
error: None,
accepts: Accepts::default(),
// 省略其他配置项
}
}
}
pub fn build(self) -> crate::Result<Client> {
// 3.
let config = self.config;
if let Some(err) = config.error {
return Err(err);
}
// connector和builder的构造过程后文再说明
// 4.
Ok(Client {
inner: Arc::new(ClientRef {
// 省略其他参数
hyper: builder.build(connector),
}),
})
}
代码分解如下:
- 创建一个HeaderMap用于保存发送请求的请求体,并设置
accept
请求体为"*/*"
- 构造一个默认的
ClientBuilder
对象, client的默认参数都可以在这里看到 - 检查config是否出错。
- 构造和检查各种参数,其中比较重要的两个对象是
connector
和builder
,这部分后续展开
ClientBuilder
的参数都是私有的,所以想要设置它的参数需要调用它提供的公开函数或者说接口,比较常用的有以下方法
- user_agent 设置user-agent
- default_headers 设置默认请求头, 这样就可以复用一组请求头
- gzip, brotli, deflate reqwest支持的各种压缩算法, 这个些方法需要在依赖配置项那里启用才能用
- proxy 设置代理
- timeout, connect_timeout设置超时时间
- pool_idle_timeout, pool_max_idle_per_host连接池超时和每个请求主机的最大空闲数
- danger_accept_invalid_certs 禁用ssl证书检查,对于自签名证书服务器很有用
- resolve, resolve_to_addrs, dns_resolver设置解析域名的dns, 有时候很有用。
connector
connector正如其名字表示的那样,它是一个连接器,负责连接远端,然后返回一个Connection, 这Connection后续可以放在连接池中复用。
根据依赖选项中的启用的特性不同,rust会编译不同的connector, 我们这里只看开启默认特性的代码。
pub fn build(self) -> crate::Result<Client> {
let mut connector = {
// http connector 处理http协议
let mut http = HttpConnector::new_with_resolver(DynResolver::new(resolver.clone()));
#[cfg(feature = "__tls")]
match config.tls {
#[cfg(feature = "default-tls")]
TlsBackend::Default => {
// TlsConnectorBuilder
let mut tls = TlsConnector::builder();
// 设置tls相关属性
tls.danger_accept_invalid_certs(!config.certs_verification);
tls.use_sni(config.tls_sni);
tls.disable_built_in_roots(!config.tls_built_in_root_certs);
}
// 在http connector外部包一层tls,用于处理tls连接
Connector::new_default_tls(
http,
tls,
// 其他参数省略
)?
}
pub(crate) fn new_default_tls<T>() -> crate::Result<Connector> {
// 省略tls connector的构造过程
}
connector
的底层构造过程这里不做深入,仅仅粗略的过一遍。
builder
无论是reqwest
还是hyper
的构造逻辑都是类似的,在请求前通过一个builder
对象来设置属性,最后build
出最终的目标对象。
pub fn build(self) -> crate::Result<Client> {
let mut builder = hyper::Client::builder();
pub fn builder() -> Builder {
Builder::default()
}
// 省略其他的参数设置
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Ok(Client {
inner: Arc::new(ClientRef {
hyper: builder.build(connector),
// 省略其他参数
}
}
}
}
pub fn build<C, B>(&self, connector: C) -> Client<C, B>{
Client {
config: self.client_config,
conn_builder: self.conn_builder.clone(),
connector,
// 创建连接池,用于复用连接
pool: Pool::new(self.pool_config, &self.conn_builder.exec),
}
}
Client::builder().build()
方法最终构建出了一个包含hyper Client
的Client
,至此我们已经有Client
了,可以开始构造请求。
构造请求
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
// 默认使用的http协议是http/1.1
let req = url.into_url().map(move |url| Request::new(method, url));
RequestBuilder::new(self.clone(), req)
}
Client::builder().build()?.get(url)
最后生成了一个RequestBuilder对象
,当我们得到builder
对象之后可以通过它暴露的接口设置各种属性,当设置完毕后才发送请求。
RequestBuilder
提供了很多有用的接口,这里简单的列举一些比较常用的方法
- query 设置请求参数
- header, header_sensitive, headers设置请求头
- timeout 设置超时, 会覆盖
ClientBuilder::timeout()
的设置。 - body, multipart, form, json 发送各种请求体
发送请求
当请求构造好之后就可以发送请求了。
pub async fn get<T: IntoUrl>(url: T) -> crate::Result<Response> {
Client::builder().build()?.get(url).send().await
}
pub fn send(self) -> impl Future<Output = Result<Response, crate::Error>> {
// 因为在设置request参数的时候可能出错,所以这里需要判断一下request是否正确
match self.request {
Ok(req) => self.client.execute_request(req),
Err(err) => Pending::new_err(err),
}
}
pub(super) fn execute_request(&self, req: Request) -> Pending {
// 1.
let (method, url, mut headers, body, timeout, version) = req.pieces();
// 2.
let builder = hyper::Request::builder()
.method(method.clone())
.uri(uri)
.version(version);
// 基于http协议版本构造不同的ResponseFuture
let in_flight = match version {
_ => {
// 3.
let mut req = builder
.body(body.into_stream())
.expect("valid request parts");
*req.headers_mut() = headers.clone();
// 4.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
// 将responseFuture包一层
Pending {
inner: PendingInner::Request(PendingRequest {
method,
url,
headers,
body: reusable,
urls: Vec::new(),
retry_count: 0,
client: self.inner.clone(),
in_flight,
timeout,
}),
}
}
代码分解如下:
- 将
reqwest
构造的请求分解,便于后续将参数传给底层的hyper
- 构造
hyper
的builder
对象,这个对象和reqwest
的requestbuilder
对象类似,用于构造请求 - 将参数传给
hyper
的request
对象 - 使用底层的
hyper client
请求,注意,这里返回的是Future
, 所以不会立即执行
至此请求已经全部准备好也已经生成了ResponseFuture
,就差最后的发送(.await
)了。
poll
rust
的异步编程在于Future
这个trait
, 只要实现了这个trait
就是Future
, 也就可以.await
, 而.await
的最终调用就是调用实现Future trait
的poll
方法,所以我们可以查看对应的poll
方法来查看其对应的逻辑。
再次之前,我们简单的看下send()
方法返回的对象的调用链
Pending -> PendingRequest -> ResponseFuture
Pending
impl Future for Pending {
type Output = Result<Response, crate::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.inner();
// 之所以包一层是为了避免被多次调用
// 不知道什么情况下会触发这种情况....
match inner.get_mut() {
// poll PendingRequest
PendingInner::Request(ref mut req) => Pin::new(req).poll(cx),
PendingInner::Error(ref mut err) => Poll::Ready(Err(err
.take()
.expect("Pending error polled more than once"))),
}
}
}
Pending的作用就是简单的封装, 避免多次调用。
什么情况会触发多次调用?
PendingRequest
PendingRequest
开始真正的调用hyper
之前返回的Future
对象,并处理不同的响应码。
impl Future for PendingRequest {
type Output = Result<Response, crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1.
if let Some(delay) = self.as_mut().timeout().as_mut().as_pin_mut() {
if let Poll::Ready(()) = delay.poll(cx) {
return Poll::Ready(Err(
crate::error::request(crate::error::TimedOut).with_url(self.url.clone())
));
}
}
// 2.
loop {
let res = match self.as_mut().in_flight().get_mut() {
// 3.
ResponseFuture::Default(r) => match Pin::new(r).poll(cx) {
// 请求完成但是失败
Poll::Ready(Err(e)) => {
if self.as_mut().retry_error(&e) {
continue;
}
return Poll::Ready(Err(
crate::error::request(e).with_url(self.url.clone())
));
}
// 请求完成并成功
Poll::Ready(Ok(res)) => res,
// 还未完成响应
Poll::Pending => return Poll::Pending,
},
// http3的情况
}
// 重定向的处理
// 4.
// 返回结果
let res = Response::new(
res,
self.url.clone(),
self.client.accepts,
self.timeout.take(),
);
return Poll::Ready(Ok(res));
}
}
代码分解如下:
- 首先判断是否超时
- 因为可能存在重定向,所以需要使用loop来不断处理重定向请求(默认是最多允许10次重定向.)
- 调用
ResponseFuture
的poll
方法 - 最后返回结果
至此请求已经通过底层的hyper
发送,只等其返回结果了。
hyper
hyper的代码我就简单的过一下了,因为我也不是看得太懂T_T。
首先回到之前的调用路径, 上文的ResponseFuture
就是这里的in_flight
,也就是ResponseFuture::Default(self.inner.hyper.request(req))
let in_flight = match version {
// 忽略http3情况的处理代码
_ => {
// 1.
ResponseFuture::Default(self.inner.hyper.request(req))
}
};
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
// 处理connect方法, 这在代理https时有用
// 2.
let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
Ok(s) => s,
Err(err) => {
return ResponseFuture::new(future::err(err));
}
};
// 3.
ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
}
async fn retryably_send_request() -> crate::Result<Response<Body>> {
loop {
req = match self.send_request(req, pool_key.clone()).await {
Ok(resp) => return Ok(resp),
// 省略错误处理代码
}
}
}
}
async fn send_request() -> Result<Response<Body>, ClientError<B>> {
//
let mut pooled = match self.connection_for(pool_key).await {
Ok(pooled) => pooled,
// 省略错误处理代码
};
//
let mut res = match pooled.send_request_retryable(req).await {
// 省略错误处理代码
Ok(res) => res,
};
Ok(res)
}
代码分解如下:
- 通过hyper请求并返回
ResponseFuture
对象 - 基于请求的协议和域名构造
pool_key
,pool_key
用来区分连接池中的连接 - 将
retryably_send_request
异步函数调用后生成的Future
对象包装起来, 也就是说,当responseFuture
对象被poll
的时候就会调用retryably_send_request
函数里的代码 - 获取连接池里的client.
- 将请求通过获得client发送出去
总结
reqwest
和hyper
的代码都用到了构造者(builder
)设计模式, 所以两者的请求步骤是差不多的,首先创建一个builder
, 基于这个builder
暴露的接口可以设置各种参数,最后调用builder
方法生成对应的对象,比如Client
对象或者Request
对象。
个人认为,如果要尽可能的掌握一个编程库,查看其源代码是必不可少的,再者为了学习rust, 多看一下对应语言的编程库也是有必要的。