Fork me on GitHub

安卓面试清单----OKHttp源码解析(一)

本文基于OKHttp3.2 。

一个典型 OKhttp 同步请求的发起是这样的:

1
Response response = client.newCall(request).execute();

在 OkHttpClient 类中, newCall 方法是这样的:

1
2
3
public Call newCall(Request request) {
return new RealCall(this, request);
}

RealCall 是 Call 的实现类,那我们去 RealCall 中找,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**来自 RealCall 类*/
public Response execute() throws IOException {
synchronized (this) {
if (this.executed) {
throw new IllegalStateException("Already Executed");
}
this.executed = true;
}
Response arg1;
try {
this.client.dispatcher().executed(this);
Response result = this.getResponseWithInterceptorChain(false);
if (result == null) {
throw new IOException("Canceled");
}
arg1 = result;
} finally {
this.client.dispatcher().finished(this);
}
return arg1;
}

很轻松就找到了 execute() 方法,上面代码第10行用到了一个从 OkHttpClient 获得的 Dispatcher 然后把它加入到分发器里面的队列 runningSyncCalls 中,在完成的时候会remove掉。这个队列是一个ArrayDeque。

1
private final Deque<RealCall> runningSyncCalls = new ArrayDeque();
1
2
3
4
5
6
7
8
9
10
11
12
/**来自 Dispatcher 类*/
synchronized void executed(RealCall call) {
this.runningSyncCalls.add(call);
}
synchronized void finished(Call call) {
if(!this.runningSyncCalls.remove(call)) {
throw new AssertionError("Call wasn\'t in-flight!");
}
}

其次会执行:

1
Response result = this.getResponseWithInterceptorChain(false);

先到这儿,记住 getResponseWithInterceptorChain() 方法,我们再来看异步请求:

也在 RealCall 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**来自 RealCall 类*/
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (this.executed) {
throw new IllegalStateException("Already Executed");
}
this.executed = true;
}
this.client.dispatcher().enqueue(
new RealCall.AsyncCall(responseCallback, forWebSocket));
}

是不是和同步很像,但是有两点不同:

1、同步调用的 executed 方法,而异步调用的是分发器的 enqueue 方法。

2、同步传入 execute 方法的参数是 Call,异步传入 enqueue 方法的是AsyncCall,这个是什么呢,这个是 Call 里面的一个内部类,而且是一个继承了Runnable的内部类。

先看第一个不同点:分发器的 enqueue 方法是干什么的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**来自 Dispatcher 类*/
synchronized void enqueue(AsyncCall call) {
//判断当前运行的线程是否超过最大线程数,以及同一个请求是否要超过相同请求同时存在的最大数目
if(this.runningAsyncCalls.size() < this.maxRequests && this.runningCallsForHost(call) < this.maxRequestsPerHost) {
this.runningAsyncCalls.add(call);
//将请求放到线程池里运行
this.executorService().execute(call);
} else {
//不满足运行条件放到后备队列里
this.readyAsyncCalls.add(call);
}
}
1
2
3
4
5
6
7
8
9
/**来自 Dispatcher 类*/
public synchronized ExecutorService executorService() {
if(this.executorService == null) {
this.executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
}
return this.executorService;
}

很明显,这儿用线程池直接提交了这个实现了 Runable 的 AsyncCall 。
这是一个可缓存的线程池。

从上面代码我们看到异步请求是有条件限制的,默认最多64个请求,而对同一个主机的最大请求默认最多同时存在5个。这两个值都是可以更改的,Dispatcher 提供了相关方法。

1
2
3
4
5
6
7
8
9
10
11
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private ExecutorService executorService;
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque();
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**来自 Dispatcher 类*/
private int runningCallsForHost(AsyncCall call) {
int result = 0;
Iterator arg2 = this.runningAsyncCalls.iterator();
while(arg2.hasNext()) {
AsyncCall c = (AsyncCall)arg2.next();
通过比较每个请求的主机地址,一样代表同一个请求
if(c.host().equals(call.host())) {
++result;
}
}
return result;
}
1
2
3
4
5
/**来自 RealCall类*/
String host() {
return RealCall.this.originalRequest.url().host();
}

OK,第一个不同点已经分析完,再来看看第二个不同点 AsyncCall 是个什么东东?

AsyncCall 继承自 NamedRunnable ,NamedRunnable 实现了 Runnable 。NamedRunnable 只是给这个 Runnable 起了个名字而已。

1
2
3
public NamedRunnable(String format, Object... args) {
this.name = String.format(format, args);
}

再来看看AsyncCall 的run里面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
/**来自 NamedRunnable 类*/
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(this.name);
try {
this.execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();

显然AsyncCall的execute才是核心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**来自 RealCall 的内部类 AsyncCall 类,继承自 NamedRunnable */
protected void execute() {
boolean signalledCallback = false;
try {
Response e = RealCall.this.getResponseWithInterceptorChain(this.forWebSocket);
if (RealCall.this.canceled) {
signalledCallback = true;
//若请求被取消,则回调 onFailure
this.responseCallback.onFailure(RealCall.this,
new IOException("Canceled"));
} else {
signalledCallback = true;
//若成功返回Response,则回调 onResponse
this.responseCallback.onResponse(RealCall.this, e);
}
} catch (IOException arg5) {
if (signalledCallback) {
Internal.logger.log(Level.INFO, "Callback failure for "
+ RealCall.this.toLoggableString(), arg5);
} else {
this.responseCallback.onFailure(RealCall.this, arg5);
}
} finally {
RealCall.this.client.dispatcher().finished(this);
}
}

在代码第八行我们又看到了getResponseWithInterceptorChain()方法。 可以看到,异步和同步一样,最后都执行到了这个方法并返回 Response 。
那我们就来看一下这个方法的实现:

1
2
3
4
5
6
7
/**来自 RealCall类 */
private Response getResponseWithInterceptorChain(boolean forWebSocket)throws IOException {
RealCall.ApplicationInterceptorChain chain = new RealCall.ApplicationInterceptorChain(
0, this.originalRequest, forWebSocket);
return chain.proceed(this.originalRequest);
}

创建了一个ApplicationInterceptorChain ,并且第一个参数传入0,这个0是有特殊用法的,涉及到OKHttp里面的一个功能叫做拦截器,从getResponseWithInterceptorChain这个名字里其实也能看出一二。先看看proceed做了什么:

OKHttp增加的拦截器机制,先来看看官方文档对Interceptors 的解释 :

Interceptors are a powerful mechanism that can monitor, rewrite, and
retry calls.

解释下就是拦截器可以用来转换,重试,重写请求的机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**来自 RealCall 的内部类 ApplicationInterceptorChain,实现了 Chain 接口 */
public Response proceed(Request request) throws IOException {
/**先判断是否有拦截器,如果有则首先执行拦截器重写的 intercept 方法,执行完自己写的代码之后,并手动调用 proceed()方法再次判断是否还有拦截器。
若已经没有拦截器存在的话就执行 getResponse()方法*/
if (this.index < RealCall.this.client.interceptors().size()) {
RealCall.ApplicationInterceptorChain chain = RealCall.this.new ApplicationInterceptorChain(
this.index + 1, request, this.forWebSocket);
Interceptor interceptor = (Interceptor) RealCall.this.client
.interceptors().get(this.index);
Response interceptedResponse = interceptor.intercept(chain);
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor "
+ interceptor + " returned null");
} else {
return interceptedResponse;
}
} else {
return RealCall.this.getResponse(request, this.forWebSocket);
}
}

创建 ApplicationInterceptorChain 的时候第一个参数为0,则this.index==0;

若没有拦截器的话走 else,执行:

1
return RealCall.this.getResponse(request, this.forWebSocket);

若有1个拦截器的话:

则0<1,回调拦截器中的 intercept 方法。

当我们在拦截器中手动调用 process 后再次回到方法中检查是否有拦截器,此时不满足条件,走 else,最终还是回到了 getResponse 方法。

1
2
3
4
5
6
ApplicationInterceptorChain(int index, Request request,
boolean forWebSocket) {
this.index = index;
this.request = request;
this.forWebSocket = forWebSocket;
}

看下我当时用的一个用于获取打印http请求信息的拦截器(包括请求头,body,url等等,直接打印):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Created by QHT on 2017-04-05.
*/
public class OK_LoggingInterceptor implements Interceptor{
@SuppressLint("DefaultLocale")
@Override
public Response intercept(Interceptor.Chain chain) throws IOException {
//这个chain里面包含了request和response,所以你要什么都可以从这里拿
Request request = chain.request();
long t1 = System.nanoTime();//请求发起的时间
LogUtil.e(String.format("发送请求 %s on %s%n%s",
request.url(), chain.connection(), request.headers()));
//自定义拦截器必须执行 proceed 方法,以便继续判断是否还存在拦截器
Response response = chain.proceed(request);
long t2 = System.nanoTime();//收到响应的时间
//这里不能直接使用response.body().string()的方式输出日志
//因为response.body().string()之后,response中的流会被关闭,程序会报错,我们需要创建出一个新的response给应用层处理
ResponseBody responseBody = response.peekBody(1024 * 1024);
LogUtil.e(String.format("接收响应: [%s] %n返回json:【%s】 %.1fms%n%s",
response.request().url(),
responseBody.string(),
(t2 - t1) / 1e6d,
response.headers()));
return response;
}
}

这个拦截器发送完请求后打印的效果是这样的:


H快递: com.qht.blog2.Net.OK_LoggingInterceptor.intercept(OK_LoggingInterceptor.java:25)
H快递: 发送请求 http://www.kuaidi100.com/query?type=yunda&postid=7700008953907 on null
H快递: com.qht.blog2.Net.OK_LoggingInterceptor.intercept(OK_LoggingInterceptor.java:37)
H快递: 接收响应: [http://www.kuaidi100.com/query?type=yunda&postid=7700008953907]
返回json:【】 370.2ms
Server: nginx
Date: Tue, 13 Jun 2017 15:21:58 GMT
Content-Type: text/html;charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
P3P: CP=”IDC DSP COR ADM DEVi TAIi PSA PSD IVAi IVDi CONi HIS OUR IND CNT”
Cache-Control: no-cache
Vary: Accept-Encoding


在处理完拦截器操作后,就进入到重要的getResponse方法,真正的去进行发送请求,处理请求,接收返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**来自RealCall类 */
Response getResponse(Request request, boolean forWebSocket)
throws IOException {
RequestBody body = request.body();
if (body != null) {
Builder followUpCount = request.newBuilder();
MediaType releaseConnection = body.contentType();
if (releaseConnection != null) {
followUpCount.header("Content-Type",
releaseConnection.toString());
}
long response = body.contentLength();
if (response != -1L) {
followUpCount.header("Content-Length", Long.toString(response));
followUpCount.removeHeader("Transfer-Encoding");
} else {
followUpCount.header("Transfer-Encoding", "chunked");
followUpCount.removeHeader("Content-Length");
}
request = followUpCount.build();
}
//新建HttpEngine,用于进行发送请求和读取答复的细节处理
this.engine = new HttpEngine(this.client, request, false, false,
forWebSocket, (StreamAllocation) null, (RetryableSink) null,
(Response) null);
int arg19 = 0;
while (!this.canceled) {
boolean arg20 = true;
boolean arg14 = false;
StreamAllocation streamAllocation;
label173: {
label172: {
try {
HttpEngine followUp;
try {
arg14 = true;
this.engine.sendRequest();
this.engine.readResponse();
arg20 = false;
arg14 = false;
break label173;
} catch (RequestException arg15) {
throw arg15.getCause();
} catch (RouteException arg16) {
followUp = this.engine.recover(
arg16.getLastConnectException(),
(Sink) null);
if (followUp == null) {
throw arg16.getLastConnectException();
}
} catch (IOException arg17) {
followUp = this.engine.recover(arg17, (Sink) null);
if (followUp != null) {
arg20 = false;
this.engine = followUp;
arg14 = false;
break label172;
}
throw arg17;
}
arg20 = false;
this.engine = followUp;
arg14 = false;
} finally {
if (arg14) {
if (arg20) {
StreamAllocation streamAllocation1 = this.engine
.close();
streamAllocation1.release();
}
}
}
if (arg20) {
streamAllocation = this.engine.close();
streamAllocation.release();
}
continue;
}
if (arg20) {
streamAllocation = this.engine.close();
streamAllocation.release();
}
continue;
}
if (arg20) {
StreamAllocation arg21 = this.engine.close();
arg21.release();
}
Response arg22 = this.engine.getResponse();
//得到该请求对应的后续请求,比如重定向之类的
Request arg23 = this.engine.followUpRequest();
if (arg23 == null) {
if (!forWebSocket) {
this.engine.releaseStreamAllocation();
}
return arg22;
}
streamAllocation = this.engine.close();
++arg19;
if (arg19 > 20) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: "
+ arg19);
}
if (!this.engine.sameConnection(arg23.url())) {
streamAllocation.release();
streamAllocation = null;
}
this.engine = new HttpEngine(this.client, arg23, false, false,
forWebSocket, streamAllocation, (RetryableSink) null, arg22);
}
this.engine.releaseStreamAllocation();
throw new IOException("Canceled");
}

没错,就是这么长。
可以看到如果是post请求,先做一定的头部处理,然后新建一个HttpEngine去处理具体的操作,通过sendRequest发送具体请求操作,readResponse对服务器的答复做一定处理,最后得到从服务器返回的Response,讲到这里,我们整个的流程大概疏通了,代码贴了很多,简单的可以用下面一张图概括 :
这里写图片描述

getResponse() 方法的内容还有很多,下篇再分析。