okhttp源码分析

在前一段时间已经学习了Retrofit和Okhttp的基本用法,今天我就要开始第三部分Why的学习。第一次认真的阅读一个第三方开源库的内部代码,深入理解okttp和Retroft的实现过程,由于Retrofit的网络通讯的实现全部基于okhttp,因此首先我将开始对okhttp源码的学习。

按照惯例,我分三部分开始对okhttp的解析。

###1.What?
okhttp是什么?它有什么用?我直接引用官方的介绍如下

Overview

HTTP is the way modern applications network. It’s how we exchange data & media. Doing HTTP efficiently makes your stuff load faster and saves bandwidth.
OkHttp is an HTTP client that’s efficient by default:

HTTP/2 support allows all requests to the same host to share a socket.
Connection pooling reduces request latency (if HTTP/2 isn’t available).
Transparent GZIP shrinks download sizes.
Response caching avoids the network completely for repeat requests.
OkHttp perseveres when the network is troublesome: it will silently recover from common connection problems. If your service has multiple IP addresses OkHttp will attempt alternate addresses if the first connect fails. This is necessary for IPv4+IPv6 and for services hosted in redundant data centers. OkHttp initiates new connections with modern TLS features (SNI, ALPN), and falls back to TLS 1.0 if the handshake fails.

Using OkHttp is easy. Its request/response API is designed with fluent builders and immutability. It supports both synchronous blocking calls and async calls with callbacks.

OkHttp supports Android 2.3 and above. For Java, the minimum requirement is 1.7.

根据官方介绍:okhttp是一个应用网络库,它可以帮助我们用更少的带宽和更快的速度去交换数据,有用的使用HTTP。
它默认具有以下功能:

  • 允许我们发向同一个服务器的请求分享socket
  • 使用链接池的设计来减少请求的延迟
  • 使用GZIP压缩方式减少从服务端下载的数据量
  • 对服务端的回应使用cache避免多次发送完全相同的请求
    okhttp使用重连机制,当网络出现问题时,它会静默的恢复,并且当你连接的服务有多个IP地址时,它会在第一个连接失败后调用第二个去尝试。
    okhttp同时支持同步调用和异步调用。

###2&3.How&Why?

首先从我们的使用过程开始一步一步深入到内部的实现。
我们先来看每次使用okhttp的最简单流程。

获取一个URL的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//创建一个okhttpclient对象
OkHttpClient mOkHttpClient = new OkHttpClient();
//创建一个Request
final Request request = new Request.Builder()
.url("www.baidu.com")
.build();
//创建一个新的call
Call call = mOkHttpClient.newCall(request);
//令call进入队列等待执行
call.enqueue(new Callback()
{
@Override
public void onFailure(Request request, IOException e)
{
}

@Override
public void onResponse(final Response response) throws IOException
{

}
});
```
这是一个典型的最简单的使用方式。

来简单分析一下它的使用过程。
1.创建一个OkhttpClient对象,这是使用okhttp所有功能的第一步。
2.创建一个Request对象,这个过程使用了建造者模式,这个模式的优点是简化繁琐的初始化过程,使对象的创建与内部的实现分离,让客户端用户可以更精细的根据自己的需求创造对应的对象。我们可以看到,使用了建造者模式,request的创建逻辑变得非常清晰,在这里例子中我们只设置了url,但根据需求我们可以创建复杂得多的request。
3.调用client的方法将request封装成一个call,在okhttp中request并不能被直接执行,需要对request进行封装,在最后执行call
4.执行封装好的call。查看源码可以看到okhttp为我们提供了两种执行方式,execute()和enqueue(),这两种方式的区别从名字上就能看出来大概,稍后我详细分析。注意,查看源码的注释可以看到

/**

  • A call is a request that has been prepared for execution. A call can be
  • canceled. As this object represents a single request/response pair (stream),
  • it cannot be executed twice.
    */
1
2
3
4
这里明确的指出了一个call对象只能被执行一次,所以如果想要多次执行相同的request,我们还需要另想办法。

好了,下面我们开始从源码入手,分析一个request从创建到被执行的所有过程。
我们每一次使用都要创建OkhttpClient。

/**

  • Configures and creates HTTP connections. Most applications can use a single
  • OkHttpClient for all of their HTTP requests - benefiting from a shared
  • response cache, thread pool, connection re-use, etc.
    *
  • Instances of OkHttpClient are intended to be fully configured before they’re

  • shared - once shared they should be treated as immutable and can safely be used
  • to concurrently open new connections. If required, threads can call
  • {@link #clone()} to make a shallow copy of the OkHttpClient that can be
  • safely modified with further configuration changes.
    */

    1
    2
    3
    这是官方对于OkHttpClient的注释,我们可以看到OkHttpClient是用于配置我们的连接需求所设计出来的类,当OkHttpClient实例化后它应当是不可变的且可安全的同时开启多个连接,如果有修改OkHttpClient实例的需求,那么可以使用clone()来获取一份备份来进行安全的修改。

    这是被我们调用的构造函数。

    public OkHttpClient() {
    routeDatabase = new RouteDatabase();
    dispatcher = new Dispatcher();
    }

1
2

可以看到,每一个OkHttpClient实例都内嵌着一个routeDatabase实例和一个dispatcher实例,深入这两个类的内部,我们可以了解这两个对象的作用。

/**

  • A blacklist of failed routes to avoid when creating a new connection to a
  • target address. This is used so that OkHttp can learn from its mistakes: if
  • there was a failure attempting to connect to a specific IP address or proxy
  • server, that failure is remembered and alternate routes are preferred.
    */
    1
    可以看到,RouteDatabase类正如它的名字一样,是一个路线数据库,它负责OkHttpClient发起的失败的Connection,将它加入黑名单,我们在之前提到的多IP重连就是通过这样的记录的方式来实现。

public final class RouteDatabase {
private final Set failedRoutes = new LinkedHashSet<>();
}

1
2
3
4
RouteDatabase内含一个LinkedHashSet来记录所有的失败连接,这个类的实现到这里已经很清楚了。


接下来我们来看看Dispatcher类的内部实现。

/**

  • Policy on when async requests are executed.
    *
  • Each dispatcher uses an {@link ExecutorService} to run calls internally. If you

  • supply your own executor, it should be able to run {@linkplain #getMaxRequests the
  • configured maximum} number of calls concurrently.
    */
    1
    2
    顾名思义,Dispatcher是一个调度器,当我们的OkHttpClient执行异步任务时,Dispatcher负责控制各个线程间的关系。
    看到ExecutorService时就应该知道了,这个Dispatcher的内部实现应该就是线程池,并且okhttp还支持我们自定义executor,自己来控制各个异步任务。

public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;

/** Ready calls in the order they'll be run. */

private final Deque readyCalls = new ArrayDeque<>();

/* Running calls. Includes canceled calls that haven’t finished yet. /
private final Deque runningCalls = new ArrayDeque<>();

/* In-flight synchronous calls. Includes canceled calls that haven’t finished yet. /
private final Deque executedCalls = new ArrayDeque<>();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
这是Dispatcher内部的一段代码,我们可以看到在默认的情况下okhttp最多执行的任务是64个,每个服务端是5个。
Dispatcher内部使用Deque这种双端队列管理AsyncCall,即异步Call,对Dispatcher的分析到这里结束。

简单的看了一下OkHttpClient的内部实现,下面我们进入Request,分析一下被我们创建的Request。

```
private final HttpUrl url;
private final String method;
private final Headers headers;
private final RequestBody body;
private final Object tag;

private volatile URL javaNetUrl; // Lazily initialized.
private volatile URI javaNetUri; // Lazily initialized.
private volatile CacheControl cacheControl; // Lazily initialized.

private Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tag = builder.tag != null ? builder.tag : this;
}

进入Request内部,我们发现这里并没有什么实际的逻辑,Request类本身只是我们的任务的抽象集合,里面包含了大量的成员变量才是真正有效的逻辑实现体。下面我们去查看Call这个类,去观察一个任务被执行的过程。

1
2
3
4
5
/**
* A call is a request that has been prepared for execution. A call can be
* canceled. As this object represents a single request/response pair (stream),
* it cannot be executed twice.
*/

通过注释我们可以知道,一个Call就是一个可以被执行的Request,Call可以被取消执行,因为一个Call代表一个Request和Response对,所以一个call不能被执行两次。
创建Call的构造函数

1
2
3
4
5
6
protected Call(OkHttpClient client, Request originalRequest) {
// Copy the client. Otherwise changes (socket factory, redirect policy,
// etc.) may incorrectly be reflected in the request when it is executed.
this.client = client.copyWithDefaults();
this.originalRequest = originalRequest;
}

注意到这个构造函数是protected的,因此我们并不能手动new出一个call,这符合逻辑,我们每次想要创建一个call都要通过OkHttpClient的newCall()方法,这个构造函数在这里被调用。

1
2
3
4
5
6
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
public Call newCall(Request request) {
return new Call(this, request);
}

接下来来观察Call的execute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.getDispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.getDispatcher().finished(this);
}
}

到这里Call的执行流程已经很清楚了,一个Call要被执行首先要被创建这个Call的OkHttpClient里的Dispatcher调用加入队列等待,下面我们深入Dispatcher的executed的内部阅读其实现。

1
2
3
synchronized void executed(Call call) {
executedCalls.add(call);
}

这个方法里只有一行代码,看来还要继续向下找它的真正实现,注意,这里的executedCalls就是我们前面提到的Deque,它实际上是一个队列。这个抽象接口的实现是ArrayDeque,来看看add方法。

1
2
3
4
 public boolean add(E e) {
addLast(e);
return true;
}

显然,这个add方法把我们要执行的Call加入到了队列的队尾排队,让这个request等待执行。
那么队列里的Call什么时候才会被真正执行呢?下面返回Call()的execute()方法,我看可以看到在finish和executed之间的getResponseWithInterceptorChain方法,看来这里就是实际执行部分,进入观察。

1
2
3
4
5
6
7
8
9
10
@Override public Response proceed(Request request) throws IOException {
if (index < client.interceptors().size()) {
// There's another interceptor in the chain. Call that.
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
return client.interceptors().get(index).intercept(chain);
} else {
// No more interceptors. Do HTTP.
return getResponse(request, forWebSocket);
}
}

getResponse就是实际执行的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* Performs the request and returns the response. May return null if this
* call was canceled.
*/

Response getResponse(Request request, boolean forWebSocket) throws IOException {
// Copy body metadata to the appropriate request headers.
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();

MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}

request = requestBuilder.build();
}

// Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null, null);

int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseConnection();
throw new IOException("Canceled");
}

try {
engine.sendRequest();
engine.readResponse();
} catch (RequestException e) {
// The attempt to interpret the request failed. Give up.
throw e.getCause();
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
HttpEngine retryEngine = engine.recover(e);
if (retryEngine != null) {
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e.getLastConnectException();
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
engine = retryEngine;
continue;
}

// Give up; recovery is not possible.
throw e;
}

Response response = engine.getResponse();
Request followUp = engine.followUpRequest();

if (followUp == null) {
if (!forWebSocket) {
engine.releaseConnection();
}
return response;
}

if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (!engine.sameConnection(followUp.httpUrl())) {
engine.releaseConnection();
}

Connection connection = engine.close();
request = followUp;
engine = new HttpEngine(client, request, false, false, forWebSocket, connection, null, null,
response);

}
}

这一部分代码内容非常多,但是我们可以粗略的分为两部分
1.重建request,对其做一些修正,不深入探讨。
2.创建HttpEngine,在这里发送请求,取得Response。engine里是网络通讯的具体业务逻辑,GZIP压缩也在这里面发挥作用,这部分逻辑非常复杂。
到这里一个同步的request调用就分析完了,下面我们再来看看一部的request调用。

1
2
3
4
5
6
7
8

void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.getDispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}

与同步调用相同,这里还是调用Dispatcher让Request进入队列等待执行。

1
2
3
4
5
6
7
public synchronized ExecutorService getExecutorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

这里发现了client里excutorService的实现类,原来它就是ThreadPoolExecutor。
我截取ThreadPoolExecutor的execute方法的注释

1
* Executes the given task sometime in the future.  The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.

public void execute(Runnable command) {
}

execute接收一个Runnable接口,它真正执行的实际上是Runnable的逻辑,而AyncCall正是一个实现了Runnable的Call的内部类,从而得知执行的就是AyncCall的逻辑。也就是AyncCall的execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(originalRequest, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(engine.getRequest(), e);
}
} finally {
client.getDispatcher().finished(this);
}
}

到了这里,又看到了熟悉的代码。

1
Response response = getResponseWithInterceptorChain(forWebSocket);

以后的逻辑和同步调用完全相同。
我们梳理整个过程。
Alt text
本次到此结束,更多细节以后再进行学习。