讲道理,计算机也是学了那么多年了,对于计算机网络的认识和理解始终停留在教科书上的那些理论层面上,这一年多来开发的android项目中的http请求都是用的httpUrlConnection封装好的方法,除了post与get的写法不同,并没有接触到http更深层次的内容,因此借着学习okhttp的这个机会,来深入看看java下的HttpClient到底应该是怎么样的

okhttp

它是square公司开源的一个面向Android/Java应用的 http client,它的优点在于:

支持http2和spdy,允许连接同一个服务器的请求共享一个socket
当http2不可用时,会有一个conntion pool来减少请求延迟
使用gzip压缩下载内容
有缓存机制
如果服务器有多个ip地址,当第一个请求失败后,它会尝试别的ip

因此第一篇文章,就来分析一下okhttp作为一个网络请求框架,大致的流程和结构是怎么样的,如果我们自己要实现一个类似于这种并发请求事务的操作,它能给我们什么启示。

我们还是从使用场景来入手:

 1 private final OkHttpClient client = new OkHttpClient();
 2 
 3 Request request = new Request.Builder()
 4         .url("http://publicobject.com/helloworld.txt")
 5         .build();
 6 
 7 client.newCall(request).enqueue(new Callback() {
 8   @Override public void onFailure(Call call, IOException e) {
 9     e.printStackTrace();
10   }
11 
12   @Override public void onResponse(Call call, Response response) throws IOException {
13     System.out.println(response.body().string());
14   }
15 });

首先要有一个OkHttpClient, 它相当于是一个请求集合的总管,管理着协议版本、cache、cookie、ssl、connectingPoll、proxy等一系列跟请求有关的信息,它也是采用了builder的设计模式,代表一旦build完就不能再变了 然后request也是通过一个builder模式创建的,新建一个builder只会将method字段设置为默认的”get”并创建一个没有内容的header对象,然后设置了url,这样一个request就建成了,只有一个请求方法和url。 最后client会用这个request创建一个call,然后进行enqueue操作进行异步执行请求(execute方法是同步的执行)。

我们来看看这个newCall是什么:

 1 /**
 2 * Prepares the {@code request} to be executed at some point in the future.
 3 */
 4 @Override public Call newCall(Request request) {
 5 	return new RealCall(this, request);
 6 }
 7 
 8 protected RealCall(OkHttpClient client, Request originalRequest) {
 9 	this.client = client;
10 	this.originalRequest = originalRequest;
11 }

到目前为止,感觉就是把变量像踢皮球一样踢来踢去,也没干什么事,那么就来看看enqueue吧:

 1 @Override public void enqueue(Callback responseCallback) {
 2 	enqueue(responseCallback, false);
 3 }
 4 
 5 void enqueue(Callback responseCallback, boolean forWebSocket) {
 6 	synchronized (this) {
 7   		if (executed) throw new IllegalStateException("Already Executed");
 8   		executed = true;
 9 	}
10 	client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
11 }

默认是不使用webSocket(用来建立http服务器和客户端之间全双工通信的一种协议,可以不采用轮询的方式就能获得推送)的,并且把callback封装到了一个AsyncCall里面,我们在这里看到了synchronized,那可想而知这里一定有一条消息队列咯,没错,我们来看看client的dispatcher的enqueue干了些什么:

 1 synchronized void enqueue(AsyncCall call) {
 2     if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
 3       runningAsyncCalls.add(call);
 4       executorService().execute(call);
 5     } else {
 6       readyAsyncCalls.add(call);
 7     }
 8   }
 9 
10   /** Ready async calls in the order they'll be run. */
11   private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
12 
13   /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
14   private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
15 
16     public synchronized ExecutorService executorService() {
17     if (executorService == null) {
18       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
19           new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
20     }
21     return executorService;

啊哈,我们看到了 一个runningQueue,放已经扔给线程池的那些calls,因为它add之后马上会调用executorService(线程池)的execute,如果它满了或者请求同一主机的call太多,就放到readyQueue里 来看一下一个call finished之后干了什么:

 1 /** Used by {@code AsyncCall#run} to signal completion. */
 2   synchronized void finished(AsyncCall call) {
 3     if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
 4     promoteCalls();
 5   }
 6 
 7   private void promoteCalls() {
 8     if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
 9     if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
10 
11     for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
12       AsyncCall call = i.next();
13 
14       if (runningCallsForHost(call) < maxRequestsPerHost) {
15         i.remove();
16         runningAsyncCalls.add(call);
17         executorService().execute(call);
18       }
19 
20       if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
21     }
22   }

它会调用promoteCalls,代码很容易理解,就是把readyQueue里面的内容加到runningQueue里面

那剩下的工作就是在线程池里面做了。

我们看到,对于一个call,也就是我们理解的一个task,光加入线程池还不够,还要维护一个队列,存放这些个task。 其实Volley也是这样干的,会维护一个set来存放正在执行或者准备执行的task,也会维护一个cache的hashmap来存放要从cache取结果的task,并且这些task同时也要放到线程池中去执行。 仔细看看,这些个队列只是用来对这些task进行cancel和统计这些操作的,volley中也是如此,因为一个task一旦放入线程池,它就由不得我们控制了,因此这个队列是必须的。(之前说错的,okhttp是支持在已经建立链接的过程中去cancel的,而volley是不允许的) 而okhttp中,cache是在call内部控制的,而加了一个readyQueue也只是为了不让线程池处理太多向同一host发出的请求,做到了一种平衡

AsyncCall的run会调用自己的execute,我们看看里面来做了什么,首先要明确的是,AsycCall是RealCall的一个子类,因此new一个AsyncCall出来的时候,这个对象会持有一个RealCall的引用并能访问它的所有成员

 1 @Override protected void execute() {
 2       boolean signalledCallback = false;
 3       try {
 4         Response response = getResponseWithInterceptorChain(forWebSocket);
 5         if (canceled) {
 6           signalledCallback = true;
 7           responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
 8         } else {
 9           signalledCallback = true;
10           responseCallback.onResponse(RealCall.this, response);
11         }
12       } catch (IOException e) {
13         if (signalledCallback) {
14           // Do not signal the callback twice!
15           logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
16         } else {
17           responseCallback.onFailure(RealCall.this, e);
18         }
19       } finally {
20         client.dispatcher().finished(this);
21       }
22     }
23   }

逻辑还是很清晰,通过getResponseWithInterceptorChain来获得一个response,这里要注意,cancel只会在给callback投递结果的时候才起作用,意思就是不管取消没取消,这个request都会处理,这里跟volley是不一样的,volley是在执行的时候去判断,okhttp这种处理方式好处就是不会在请求处理的状态时去cancel而导致callback还是会调用这种情况而产生的内存泄漏或者是别的问题。并且,callback的调用也是在线程池里面做的,并不能直接修改ui界面,当然了,可以用handler或者eventbus来处理这些情况。

下面开始进入AsyncCall的具体操作了,要得到一个response,先会调用getResoponseWithInterceptorChain:

 1 private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
 2     Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
 3     return chain.proceed(originalRequest);
 4   }
 5 
 6   @Override public Response proceed(Request request) throws IOException {
 7   // If there's another interceptor in the chain, call that.
 8   if (index < client.interceptors().size()) {
 9     Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
10     Interceptor interceptor = client.interceptors().get(index);
11     Response interceptedResponse = interceptor.intercept(chain);
12 
13     if (interceptedResponse == null) {
14       throw new NullPointerException("application interceptor " + interceptor
15           + " returned null");
16     }
17 
18     return interceptedResponse;
19   }
20 
21   // No more interceptors. Do HTTP.
22   return getResponse(request, forWebSocket);
23 }

proceed函数中,首先判断client中还有没有拦截器了,如果有,则会用当前的拦截器index和request以及websocket标记创建一个新的chain,并调用interceptor的intercept方法把chain传进去,这样就能把request信息传递给interceptor,那interceptor可以做一些拦截的工作,并在intetercept(chain)方法中返回chain.proceed返回的response,这样在做完拦截工作,还会再一次调用chain的processd方法以进入下一个interceptor,最终index加满了最终调用getResponse,再把结果一层一层传递回去。有没有一点AOP的思想在里面呢?可以看看官方给的一个demo, 很好理解

最终还是会调用getResponse的,代码很长,一段段来看:

 1 /**
 2    * Performs the request and returns the response. May return null if this call was canceled.
 3    */
 4   Response getResponse(Request request, boolean forWebSocket) throws IOException {
 5     // Copy body metadata to the appropriate request headers.
 6     RequestBody body = request.body();
 7     if (body != null) {
 8       Request.Builder requestBuilder = request.newBuilder();
 9 
10       MediaType contentType = body.contentType();
11       if (contentType != null) {
12         requestBuilder.header("Content-Type", contentType.toString());
13       }
14 
15       long contentLength = body.contentLength();
16       if (contentLength != -1) {
17         requestBuilder.header("Content-Length", Long.toString(contentLength));
18         requestBuilder.removeHeader("Transfer-Encoding");
19       } else {
20         requestBuilder.header("Transfer-Encoding", "chunked");
21         requestBuilder.removeHeader("Content-Length");
22       }
23 
24       request = requestBuilder.build();
25     }
26 
27     // Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
28     engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);

出现了我们熟悉的格式,如果有body的话,会在头部加上Content-Type、Transfer-Encoding这些请求头,当然也是用了builder的设计模式 最后一行创建了一个HttpEngine,看看是什么鬼:

 1 public HttpEngine(OkHttpClient client, Request request, boolean bufferRequestBody,
 2       boolean callerWritesRequestBody, boolean forWebSocket, StreamAllocation streamAllocation,
 3       RetryableSink requestBodyOut, Response priorResponse) {
 4     this.client = client;
 5     this.userRequest = request;
 6     this.bufferRequestBody = bufferRequestBody;
 7     this.callerWritesRequestBody = callerWritesRequestBody;
 8     this.forWebSocket = forWebSocket;
 9     this.streamAllocation = streamAllocation != null
10         ? streamAllocation
11         : new StreamAllocation(client.connectionPool(), createAddress(client, request));
12     this.requestBodyOut = requestBodyOut;
13     this.priorResponse = priorResponse;
14   }

其他的参数都是一堆null和false,我们暂时不关心,不过StreamAllocation看起来蛮高大上的因为必须要new出一个,我们先来看看它们的参数,ConnectionPool:

 1 /**
 2  * Manages reuse of HTTP and SPDY connections for reduced network latency. HTTP requests that share
 3  * the same {@link Address} may share a {@link Connection}. This class implements the policy of
 4  * which connections to keep open for future use.
 5  */
 6 public final class ConnectionPool {
 7   /**
 8    * Create a new connection pool with tuning parameters appropriate for a single-user application.
 9    * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
10    * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
11    */
12 
13   private final Deque<RealConnection> connections = new ArrayDeque<>();
14 
15   public ConnectionPool() {
16     this(5, 5, TimeUnit.MINUTES);
17   }
18 
19   public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
20     this.maxIdleConnections = maxIdleConnections;
21     this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
22 
23     // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
24     if (keepAliveDuration <= 0) {
25       throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
26     }
27   }
28  }

大致意思就是用一个connection双端队列来保存一些连接,因为后续有可能有别的请求是针对同一地址的,那现在不立即关闭这些连接以备后续使用,可以减少延迟

在看看createAddress:

 1 private static Address createAddress(OkHttpClient client, Request request) {
 2     SSLSocketFactory sslSocketFactory = null;
 3     HostnameVerifier hostnameVerifier = null;
 4     CertificatePinner certificatePinner = null;
 5     if (request.isHttps()) {
 6       sslSocketFactory = client.sslSocketFactory();
 7       hostnameVerifier = client.hostnameVerifier();
 8       certificatePinner = client.certificatePinner();
 9     }
10 
11     return new Address(request.url().host(), request.url().port(), client.dns(),
12         client.socketFactory(), sslSocketFactory, hostnameVerifier, certificatePinner,
13         client.proxyAuthenticator(), client.proxy(), client.protocols(),
14         client.connectionSpecs(), client.proxySelector());
15   }

这里会根据request和client的配置信息,比如是否要用https呀proxy的选择以及支持的协议等等,最后生成的Address对象会表示这个请求的地址信息

然后是StreamAlloction, 它维护了三个东西:

1.	Connections:对远程服务器的物理socket连接,当然了,它的建立是
	一个相对缓慢的过程,因此我们要尽可能的复用他
2.	Streams:基于connections的逻辑层面上的request/response,
	这就是在socket连接上可以复用不同的请求,HTTP/1.X只能支持一个stream,
	而SPDY和HTTP/2.0就支持多个stream共用一个connection了
3.	Calls:就是一个个的request,一个call可能有好几个连续的request
	(比如返回一个html页面但是要继续发请求获取图片信息),这时候就可以在一个connection中使用
	多个stream来做这些事情而不用重新建立connection

不过它的构造函数里除了设置一个RouteSelector什么也没做:

1 public StreamAllocation(ConnectionPool connectionPool, Address address) {
2     this.connectionPool = connectionPool;
3     this.address = address;
4     this.routeSelector = new RouteSelector(address, routeDatabase());
5   }

其中RouteSelector会给我们一个连接失败后重新选择备用address和proxy的机会,确实想的很周到

回到getResponse, 来看剩下的部分:

 1 int followUpCount = 0;
 2     while (true) {
 3       if (canceled) {
 4         engine.releaseStreamAllocation();
 5         throw new IOException("Canceled");
 6       }
 7 
 8       boolean releaseConnection = true;
 9       try {
10         engine.sendRequest();
11         engine.readResponse();
12         releaseConnection = false;
13       } catch (RequestException e) {
14         // The attempt to interpret the request failed. Give up.
15         throw e.getCause();
16       } catch (RouteException e) {
17         // The attempt to connect via a route failed. The request will not have been sent.
18         HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
19         if (retryEngine != null) {
20           releaseConnection = false;
21           engine = retryEngine;
22           continue;
23         }
24         // Give up; recovery is not possible.
25         throw e.getLastConnectException();
26       } catch (IOException e) {
27         // An attempt to communicate with a server failed. The request may have been sent.
28         HttpEngine retryEngine = engine.recover(e, null);
29         if (retryEngine != null) {
30           releaseConnection = false;
31           engine = retryEngine;
32           continue;
33         }
34 
35         // Give up; recovery is not possible.
36         throw e;
37       } finally {
38         // We're throwing an unchecked exception. Release any resources.
39         if (releaseConnection) {
40           StreamAllocation streamAllocation = engine.close();
41           streamAllocation.release();
42         }
43       }
44 
45       Response response = engine.getResponse();
46       Request followUp = engine.followUpRequest();
47 
48       if (followUp == null) {
49         if (!forWebSocket) {
50           engine.releaseStreamAllocation();
51         }
52         return response;
53       }
54 
55       StreamAllocation streamAllocation = engine.close();
56 
57       if (++followUpCount > MAX_FOLLOW_UPS) {
58         streamAllocation.release();
59         throw new ProtocolException("Too many follow-up requests: " + followUpCount);
60       }
61 
62       if (!engine.sameConnection(followUp.url())) {
63         streamAllocation.release();
64         streamAllocation = null;
65       }
66 
67       request = followUp;
68       engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
69           response);
70     }

代码比较长,一点一点来看:

  • 先是一个while循环,代表处理这个connection一系列的streams

  • 如果这个call被标记为cancel,那就释放这个StreamAllocation,本质上是释放connection,但也有可能把它放入connectionsPool来供以后使用

  • 第10、11行尝试通过engine做具体的发送请求和读取响应的操作,但这其中可能会抛出一些异常

  • 如果网络不好,可能会抛出RequestExecpton

  • 如果路由失败或者是链接服务器失败,会将engine替换为Router中下个可选的retryEngine

  • 如果有其他异常,将这个engine关闭,并释放它的StreamAllocation,跟第二条的释放操作一样一样的

  • 拿到response,并判断有没有要继续发送的request,如果没有,并且不启用webSocket(默认不启用),那么同上,释放SteamAllocation并返回response

  • 如果还有followUp的request,先会把engine关闭,但不会释放他的streamAlloction,如果当前的connection的url跟followUp的url不同,那么就会释放原本的StreamAllocation,否则就会保持,最后再新建一个engine对下一次的request进行操作

看上去好像engine承担了很多的重任啊,刚才就new了一个,啥也没讲,现在来说一下它大概的作用:

一个HttpEngine处理一对request/response事件,也就一个单位的http请求,它先会通过sendRequeset发送
请求,然后通过readResponse获得响应。这就是为什么while循环中每次都要创建一个新的engine的原因,它的
作用域仅仅是一次http请求

那我们就来看看sendRequeset:

 1 public void sendRequest() throws RequestException, RouteException, IOException {
 2     if (cacheStrategy != null) return; // Already sent.
 3     if (httpStream != null) throw new IllegalStateException();
 4 
 5     Request request = networkRequest(userRequest);
 6 
 7     InternalCache responseCache = Internal.instance.internalCache(client);
 8     Response cacheCandidate = responseCache != null
 9         ? responseCache.get(request)
10         : null;
11 
12     long now = System.currentTimeMillis();
13     cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
14     networkRequest = cacheStrategy.networkRequest;
15     cacheResponse = cacheStrategy.cacheResponse;
16 
17     if (responseCache != null) {
18       responseCache.trackResponse(cacheStrategy);
19     }
20 
21     if (cacheCandidate != null && cacheResponse == null) {
22       closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
23     }
24 
25     // If we're forbidden from using the network and the cache is insufficient, fail.
26     if (networkRequest == null && cacheResponse == null) {
27       userResponse = new Response.Builder()
28           .request(userRequest)
29           .priorResponse(stripBody(priorResponse))
30           .protocol(Protocol.HTTP_1_1)
31           .code(504)
32           .message("Unsatisfiable Request (only-if-cached)")
33           .body(EMPTY_BODY)
34           .build();
35       return;
36     }
37 
38     // If we don't need the network, we're done.
39     if (networkRequest == null) {
40       userResponse = cacheResponse.newBuilder()
41           .request(userRequest)
42           .priorResponse(stripBody(priorResponse))
43           .cacheResponse(stripBody(cacheResponse))
44           .build();
45       userResponse = unzip(userResponse);
46       return;
47     }
48 
49     // We need the network to satisfy this request. Possibly for validating a conditional GET.
50     boolean success = false;
51     try {
52       httpStream = connect();
53       httpStream.setHttpEngine(this);
54 
55       if (writeRequestHeadersEagerly()) {
56         long contentLength = OkHeaders.contentLength(request);
57         if (bufferRequestBody) {
58           if (contentLength > Integer.MAX_VALUE) {
59             throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
60                 + "setChunkedStreamingMode() for requests larger than 2 GiB.");
61           }
62 
63           if (contentLength != -1) {
64             // Buffer a request body of a known length.
65             httpStream.writeRequestHeaders(networkRequest);
66             requestBodyOut = new RetryableSink((int) contentLength);
67           } else {
68             // Buffer a request body of an unknown length. Don't write request headers until the
69             // entire body is ready; otherwise we can't set the Content-Length header correctly.
70             requestBodyOut = new RetryableSink();
71           }
72         } else {
73           httpStream.writeRequestHeaders(networkRequest);
74           requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
75         }
76       }
77       success = true;
78     } finally {
79       // If we're crashing on I/O or otherwise, don't leak the cache body.
80       if (!success && cacheCandidate != null) {
81         closeQuietly(cacheCandidate.body());
82       }
83     }
84   }

又是一大坨,我们慢慢来看,先是networkRequest():

 1 /**
 2    * Populates request with defaults and cookies.
 3    *
 4    * <p>This client doesn't specify a default {@code Accept} header because it doesn't know what
 5    * content types the application is interested in.
 6    */
 7   private Request networkRequest(Request request) throws IOException {
 8     Request.Builder result = request.newBuilder();
 9 
10     if (request.header("Host") == null) {
11       result.header("Host", hostHeader(request.url(), false));
12     }
13 
14     if (request.header("Connection") == null) {
15       result.header("Connection", "Keep-Alive");
16     }
17 
18     if (request.header("Accept-Encoding") == null) {
19       transparentGzip = true;
20       result.header("Accept-Encoding", "gzip");
21     }
22 
23     List<Cookie> cookies = client.cookieJar().loadForRequest(request.url());
24     if (!cookies.isEmpty()) {
25       result.header("Cookie", cookieHeader(cookies));
26     }
27 
28     if (request.header("User-Agent") == null) {
29       result.header("User-Agent", Version.userAgent());
30     }
31 
32     return result.build();
33   }

看到我们熟悉的请求头了,之前的request一直都没有封装请求头,到现在才来干这件事,也是一种懒加载的方式嘛。从这几行代码中可以看出哪些是一个httpHeader所必须的字段:host、connection、Accept-Encoding、User-Agent,当然,如果有cookie的话也是要加上的

回到sendRequest,第7行尝试从cache中通过request获取一个候选的response,这个cache部署在okhttpClient中,默认是用它自己的一个DiskLruCache实现的,key是request的url的md5值,value则是response,里面有LinkedHashMap缓存以及磁盘缓存还有snapshot之类的,以后单独研究一下,现在知道它是个cache就行了

第13行,创建一个CacheStrategy:

 1 public Factory(long nowMillis, Request request, Response cacheResponse) {
 2   this.nowMillis = nowMillis;
 3   this.request = request;
 4   this.cacheResponse = cacheResponse;
 5 
 6   if (cacheResponse != null) {
 7     Headers headers = cacheResponse.headers();
 8     for (int i = 0, size = headers.size(); i < size; i++) {
 9       String fieldName = headers.name(i);
10       String value = headers.value(i);
11       if ("Date".equalsIgnoreCase(fieldName)) {
12         servedDate = HttpDate.parse(value);
13         servedDateString = value;
14       } else if ("Expires".equalsIgnoreCase(fieldName)) {
15         expires = HttpDate.parse(value);
16       } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
17         lastModified = HttpDate.parse(value);
18         lastModifiedString = value;
19       } else if ("ETag".equalsIgnoreCase(fieldName)) {
20         etag = value;
21       } else if ("Age".equalsIgnoreCase(fieldName)) {
22         ageSeconds = HeaderParser.parseSeconds(value, -1);
23       } else if (OkHeaders.SENT_MILLIS.equalsIgnoreCase(fieldName)) {
24         sentRequestMillis = Long.parseLong(value);
25       } else if (OkHeaders.RECEIVED_MILLIS.equalsIgnoreCase(fieldName)) {
26         receivedResponseMillis = Long.parseLong(value);
27       }
28     }
29   }
30 }
31 
32 /**
33  * Returns a strategy to satisfy {@code request} using the a cached response {@code response}.
34  */
35 public CacheStrategy get() {
36   CacheStrategy candidate = getCandidate();
37 
38   if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
39     // We're forbidden from using the network and the cache is insufficient.
40     return new CacheStrategy(null, null);
41   }
42 
43   return candidate;
44 }
45 
46   public final Request networkRequest;
47 
48   /** The cached response to return or validate; or null if this call doesn't use a cache. */
49   public final Response cacheResponse;
50 
51   private CacheStrategy(Request networkRequest, Response cacheResponse) {
52     this.networkRequest = networkRequest;
53     this.cacheResponse = cacheResponse;
54   }

factory里面除了保存一下request和当前的时间,还会提取cacheResponse里面的响应头信息,用来帮助以后判断这个cache的有效性,其中涉及到date、expires、last-modified等我们熟悉的字段;get方法会根据cacheResponse和request的特性来生成一个CacheStrategy,其中的networkRequest是否为null来判断是否使用网络来发送这个请求,cacheResponse是否为null来决定这个cache是否可用

再回到sendRequest的21行,如果cacheCandidate不可用,则会关闭它的body

26行,如果经过cacheStrategy的过滤,network和cache都不可用(说明在个request标明了only-if-cached),就创建一个504的response返回

39行,如果这个cache可用,那么就根据这个cacheResponse和userRequest(最初传进来的request,没有加host等头部)创建给userResopnse并返回

以上都是跟cacheResponse有关的操作,接下来的就是正儿八经只能通过网络去发送请求了

首先进入视线的是52行的connect,终于要连接起来了好兴奋~:

1 private HttpStream connect() throws RouteException, RequestException, IOException {
2     boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
3     return streamAllocation.newStream(client.connectTimeoutMillis(),
4         client.readTimeoutMillis(), client.writeTimeoutMillis(),
5         client.retryOnConnectionFailure(), doExtensiveHealthChecks);
6   }

啊,终于要给streamAllocation创建一个stream了:

 1 public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
 2       boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
 3       throws RouteException, IOException {
 4     try {
 5       RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
 6           writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
 7 
 8       HttpStream resultStream;
 9       if (resultConnection.framedConnection != null) {
10         resultStream = new Http2xStream(this, resultConnection.framedConnection);
11       } else {
12         resultConnection.socket().setSoTimeout(readTimeout);
13         resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
14         resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
15         resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink);
16       }
17 
18       synchronized (connectionPool) {
19         stream = resultStream;
20         return resultStream;
21       }
22     } catch (IOException e) {
23       throw new RouteException(e);
24     }
25   }

RealConnection维护一个真实的socket,看看是怎么findHealthyConnection的:

 1 /**
 2    * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
 3    * until a healthy connection is found.
 4    */
 5   private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
 6       int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
 7       throws IOException, RouteException {
 8     while (true) {
 9       RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
10           connectionRetryEnabled);
11 
12       // If this is a brand new connection, we can skip the extensive health checks.
13       synchronized (connectionPool) {
14         if (candidate.successCount == 0) {
15           return candidate;
16         }
17       }
18 
19       // Otherwise do a potentially-slow check to confirm that the pooled connection is still good.
20       if (candidate.isHealthy(doExtensiveHealthChecks)) {
21         return candidate;
22       }
23 
24       connectionFailed(new IOException());
25     }
26   }

大体就是一个while循环,不断的通过findConnection去找一个connection,再根据它的可用性判断是否要用它
那么还是要看看findConnection:

 1 /**
 2    * Returns a connection to host a new stream. This prefers the existing connection if it exists,
 3    * then the pool, finally building a new connection.
 4    */
 5   private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
 6       boolean connectionRetryEnabled) throws IOException, RouteException {
 7     Route selectedRoute;
 8     synchronized (connectionPool) {
 9       if (released) throw new IllegalStateException("released");
10       if (stream != null) throw new IllegalStateException("stream != null");
11       if (canceled) throw new IOException("Canceled");
12 
13       RealConnection allocatedConnection = this.connection;
14       if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
15         return allocatedConnection;
16       }
17 
18       // Attempt to get a connection from the pool.
19       RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
20       if (pooledConnection != null) {
21         this.connection = pooledConnection;
22         return pooledConnection;
23       }
24 
25       selectedRoute = route;
26     }
27 
28     if (selectedRoute == null) {
29       selectedRoute = routeSelector.next();
30       synchronized (connectionPool) {
31         route = selectedRoute;
32       }
33     }
34     RealConnection newConnection = new RealConnection(selectedRoute);
35     acquire(newConnection);
36 
37     synchronized (connectionPool) {
38       Internal.instance.put(connectionPool, newConnection);
39       this.connection = newConnection;
40       if (canceled) throw new IOException("Canceled");
41     }
42 
43     newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
44         connectionRetryEnabled);
45     routeDatabase().connected(newConnection.route());
46 
47     return newConnection;
48   }

19行会尝试从connectionPool中通过adress和this来获得一个connection,看看它干了什么:

 1 //ConnectionPool
 2   /** Returns a recycled connection to {@code address}, or null if no such connection exists. */
 3   RealConnection get(Address address, StreamAllocation streamAllocation) {
 4     assert (Thread.holdsLock(this));
 5     for (RealConnection connection : connections) {
 6       if (connection.allocations.size() < connection.allocationLimit
 7           && address.equals(connection.route().address)
 8           && !connection.noNewStreams) {
 9         streamAllocation.acquire(connection);
10         return connection;
11       }
12     }
13     return null;
14   }
15 
16 //StreamAllocation
17   /**
18    * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
19    * {@link #release} on the same connection.
20    */
21   public void acquire(RealConnection connection) {
22     connection.allocations.add(new WeakReference<>(this));
23   }

很简单,遍历自己的deque,找到与address匹配的connection,然后将streamAllocation加入到这个connection的allocations列表中
因此我们可以验证connectionPool是一个存放connection的池子,顺带提一下刚才提到的StreamAllocation的release方法:

 1 /** Remove this allocation from the connection's list of allocations. */
 2   private void release(RealConnection connection) {
 3     for (int i = 0, size = connection.allocations.size(); i < size; i++) {
 4       Reference<StreamAllocation> reference = connection.allocations.get(i);
 5       if (reference.get() == this) {
 6         connection.allocations.remove(i);
 7         return;
 8       }
 9     }
10     throw new IllegalStateException();
11   }

其实就是在connection的allocations队列中删掉自己,因此我们可以知道,StreamAllocation关闭不并代表connection关闭,只是说断开了它持有的stream与connection的关系

回到findConnection,如果在ConnectionPool中找不到可用的connection,那只能在34行new一个RealConncetion了,并在35行把自己acquire到这个connection了。于此同时,还要把这个connection放到ConnectionPool中去,因为对当前address的连接,它是唯一的
来看43行,这个newConnection终于要尝试去连接server了,激动:

 1 public void connect(int connectTimeout, int readTimeout, int writeTimeout,
 2       List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) throws RouteException {
 3     //blahblahblah
 4 
 5     while (protocol == null) {
 6       try {
 7         rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
 8             ? address.socketFactory().createSocket()
 9             : new Socket(proxy);
10         connectSocket(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
11     	//blahblahblah
12 		}
13 	}
14 }

7行终于给rawSocket创建了对象,紧接着去connectionSocket:

 1 /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
 2   private void connectSocket(int connectTimeout, int readTimeout, int writeTimeout,
 3       ConnectionSpecSelector connectionSpecSelector) throws IOException {
 4     rawSocket.setSoTimeout(readTimeout);
 5     try {
 6       Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
 7     } catch (ConnectException e) {
 8       throw new ConnectException("Failed to connect to " + route.socketAddress());
 9     }
10     source = Okio.buffer(Okio.source(rawSocket));
11     sink = Okio.buffer(Okio.sink(rawSocket));
12 
13     if (route.address().sslSocketFactory() != null) {
14       connectTls(readTimeout, writeTimeout, connectionSpecSelector);
15     } else {
16       protocol = Protocol.HTTP_1_1;
17       socket = rawSocket;
18     }
19 
20     if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) {
21       socket.setSoTimeout(0); // Framed connection timeouts are set per-stream.
22 
23       FramedConnection framedConnection = new FramedConnection.Builder(true)
24           .socket(socket, route.address().url().host(), source, sink)
25           .protocol(protocol)
26           .listener(this)
27           .build();
28       framedConnection.sendConnectionPreface();
29 
30       // Only assign the framed connection once the preface has been sent successfully.
31       this.allocationLimit = framedConnection.maxConcurrentStreams();
32       this.framedConnection = framedConnection;
33     } else {
34       this.allocationLimit = 1;
35     }
36   }

看第6行,这是最里面了,终于要用我们的rawSocket小朋友去执行connection操作了:

1 public void connectSocket(Socket socket, InetSocketAddress address,
2       int connectTimeout) throws IOException {
3     socket.connect(address, connectTimeout);
4   }

剩下的就由java底层接管了,我们分析到这里就可以了
回到connectSocket,source是一个Okio中用于读的buffer,而sink是用来写的buffer
13、14会判断是否配置了SSL安全协议,如果有的话会进入ssl协议的连接、验证等一系列工作
如果协议是SPDY3或者HTTP2.0的话,就支持全双共通信,因此在20行还要建立一个framedConnection来支持这种操作,并且ssl这种应用层协议使用的是socket,而底层tcp使用的是之前创建的rawSocket(最终socket也会指向rawSocket,这样对于上层来说,它们只需要操作socket就行了)

一连串的调用结束了,我们回到findHealthyConnection(可能需要一点耐心~)的20行,会检查这个candidate是否健康,那无非就是判断里面的socket是否关闭是否timeout

回到StreamAllocation(耐心…), 第9行,会看看通过ssl创建的framedConnection来创建不同的stream,它具体用来做什么的等下再看

回到HttpEngine的sendRequest,这个stream赋值给了httpStream,现在的状态已经是把连接建立起来了,接下来就要写入请求头部了,看到第55行,进入这个方法:

 1 /**
 2    * If the caller's control flow writes the request body, we need to create that stream
 3    * immediately. And that means we need to immediately write the request headers, so we can
 4    * start streaming the request body. (We may already have a request body if we're retrying a
 5    * failed POST.)
 6    */
 7   private boolean writeRequestHeadersEagerly() {
 8     return callerWritesRequestBody
 9         && permitsRequestBody(networkRequest)
10         && requestBodyOut == null;
11   }

一开始就判断callerWritesRequestBody这个boolean值,但是我们在新建第一个httpEngine的时候把它设为false,因此什么也不会做。

好了,sendRequest木有了, 我们回到了sendRequest里面,把代码再贴出来回顾一下:

 1 while (true) {
 2       if (canceled) {
 3         engine.releaseStreamAllocation();
 4         throw new IOException("Canceled");
 5       }
 6 
 7       boolean releaseConnection = true;
 8       try {
 9         engine.sendRequest();
10         engine.readResponse();
11         releaseConnection = false;

发现没有,刚才我们分析的一大摞关于sendRequest的,但到头来好像就干了connect这一件事情件事,header和body的数据是什么时候发呢?会不会在readResponse里面做这些事呢?let‘s find out,咱们先看一部分:

 1 /**
 2    * Flushes the remaining request header and body, parses the HTTP response headers and starts
 3    * reading the HTTP response body if it exists.
 4    */
 5   public void readResponse() throws IOException {
 6     if (userResponse != null) {
 7       return; // Already ready.
 8     }
 9     if (networkRequest == null && cacheResponse == null) {
10       throw new IllegalStateException("call sendRequest() first!");
11     }
12     if (networkRequest == null) {
13       return; // No network response to read.
14     }
15 
16     Response networkResponse;
17 
18     if (forWebSocket) {
19       httpStream.writeRequestHeaders(networkRequest);
20       networkResponse = readNetworkResponse();
21     } else if (!callerWritesRequestBody) {
22       networkResponse = new NetworkInterceptorChain(0, networkRequest,
23           streamAllocation.connection()).proceed(networkRequest);
24     }

看到方法注释我就放心了,还记得我们之前第一次构造httpEngine的时候传入的callerWritesRequestBody为false吗,这里就通过NetworkInterceptorChain.poceed来获取response:

 1 @Override public Response proceed(Request request) throws IOException {
 2       calls++;
 3 
 4       // something about intercepor chain
 5 
 6       httpStream.writeRequestHeaders(request);
 7 
 8       //Update the networkRequest with the possibly updated interceptor request.
 9       networkRequest = request;
10 
11       if (permitsRequestBody(request) && request.body() != null) {
12         Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
13         BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
14         request.body().writeTo(bufferedRequestBody);
15         bufferedRequestBody.close();
16       }
17 
18       Response response = readNetworkResponse();
19 
20       int code = response.code();
21       if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
22         throw new ProtocolException(
23             "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
24       }
25 
26       return response;
27     }
28   }

我们跳过前面有关于chain和intercepor的内容,啊哈,到了第6行,终于好像要发送header了呢:

 1 /**
 2    * Prepares the HTTP headers and sends them to the server.
 3    *
 4    * <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the
 5    * output stream has been written to. Otherwise the body would need to be buffered!
 6    *
 7    * <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the
 8    * output stream has been written to and closed. This ensures that the {@code Content-Length}
 9    * header field receives the proper value.
10    */
11   @Override public void writeRequestHeaders(Request request) throws IOException {
12     httpEngine.writingRequestHeaders();
13     String requestLine = RequestLine.get(
14         request, httpEngine.getConnection().route().proxy().type());
15     writeRequest(request.headers(), requestLine);
16   }

我们看看这么获得requestLine的:

 1 static String get(Request request, Proxy.Type proxyType) {
 2     StringBuilder result = new StringBuilder();
 3     result.append(request.method());
 4     result.append(' ');
 5 
 6     if (includeAuthorityInRequestLine(request, proxyType)) {
 7       result.append(request.url());
 8     } else {
 9       result.append(requestPath(request.url()));
10     }
11 
12     result.append(" HTTP/1.1");
13     return result.toString();
14   }

熟悉的身影,我们分析的Http1xStream,那么自然版本号就是HTTP/1.1了,返回一个请求行的string
再看writeRequest:

 1 /** Returns bytes of a request header for sending on an HTTP transport. */
 2   public void writeRequest(Headers headers, String requestLine) throws IOException {
 3     if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
 4     sink.writeUtf8(requestLine).writeUtf8("\r\n");
 5     for (int i = 0, size = headers.size(); i < size; i++) {
 6       sink.writeUtf8(headers.name(i))
 7           .writeUtf8(": ")
 8           .writeUtf8(headers.value(i))
 9           .writeUtf8("\r\n");
10     }
11     sink.writeUtf8("\r\n");
12     state = STATE_OPEN_REQUEST_BODY;
13   }

激动人心的时刻来了,看到先通过sink写入请求行(当然了,肯定是要把string转为byte的),在一行一行的写入header,最后会多加一个CRLF,这就把我们熟悉的请求头给拼出来并发出去了~ 搞了半天是在这里才真正发出请求头啊
回到proceed中11行,会先通过request请求方法判断是否允许有body并且body也不能为空,都满足的话,调用httpStream的createRequestBody:

 1 @Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
 2     if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
 3       // Stream a request body of unknown length.
 4       return newChunkedSink();
 5     }
 6 
 7     if (contentLength != -1) {
 8       // Stream a request body of a known length.
 9       return newFixedLengthSink(contentLength);
10     }
11 
12     throw new IllegalStateException(
13         "Cannot stream a request body without chunked encoding or a known content length!");
14   }

在这里会检查header中有没有设置transfer-encoding:chunked,设置这个属性代表使用分块技术传递数据,如果数据量很大可以避免分配超大的buffer,也可以在发送不确定长度的数据的时候使用; 如果没设就返回一个contentLength长度的sink接口

完了后,会通过Okio创建一个用来发送requestBody的buffer,然后request.body().writeTo(bufferedRequestBody),趁此机会我们可以看看一个RequestBody是怎么创建的:

 1 /** Returns a new request body that transmits {@code content}. */
 2   public static RequestBody create(final MediaType contentType, final byte[] content,
 3       final int offset, final int byteCount) {
 4     if (content == null) throw new NullPointerException("content == null");
 5     Util.checkOffsetAndCount(content.length, offset, byteCount);
 6     return new RequestBody() {
 7       @Override public MediaType contentType() {
 8         return contentType;
 9       }
10 
11       @Override public long contentLength() {
12         return byteCount;
13       }
14 
15       @Override public void writeTo(BufferedSink sink) throws IOException {
16         sink.write(content, offset, byteCount);
17       }
18     };
19   }

到此,header和body都发送完毕了,好欣慰,终于……..等一下,后面还有response呢!!!我们现在是在readResponse的NetworkInterceptorChain的proceed方法里啊,咱们写完了body之后紧接着的18行就是readNetworkResponse了:

 1 private Response readNetworkResponse() throws IOException {
 2     httpStream.finishRequest();
 3 
 4     Response networkResponse = httpStream.readResponseHeaders()
 5         .request(networkRequest)
 6         .handshake(streamAllocation.connection().handshake())
 7         .header(OkHeaders.SENT_MILLIS, Long.toString(sentRequestMillis))
 8         .header(OkHeaders.RECEIVED_MILLIS, Long.toString(System.currentTimeMillis()))
 9         .build();
10 
11     if (!forWebSocket) {
12       networkResponse = networkResponse.newBuilder()
13           .body(httpStream.openResponseBody(networkResponse))
14           .build();
15     }
16 
17     if ("close".equalsIgnoreCase(networkResponse.request().header("Connection"))
18         || "close".equalsIgnoreCase(networkResponse.header("Connection"))) {
19       streamAllocation.noNewStreams();
20     }
21 
22     return networkResponse;
23   }
  • finishRequest干的就是吧sink里面的数据给flush出去

  • 紧接着会获取响应报头

  • 如果不是WebSocket就接受响应body

  • 最后看看是否要关闭这个stream

我们先来看看怎么读取响应报头的:

 1 @Override public Response.Builder readResponseHeaders() throws IOException {
 2     return readResponse();
 3   }
 4 
 5 
 6   /** Parses bytes of a response header from an HTTP transport. */
 7   public Response.Builder readResponse() throws IOException {
 8     try {
 9       while (true) {
10         StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
11 
12         Response.Builder responseBuilder = new Response.Builder()
13             .protocol(statusLine.protocol)
14             .code(statusLine.code)
15             .message(statusLine.message)
16             .headers(readHeaders());
17 
18         if (statusLine.code != HTTP_CONTINUE) {
19           state = STATE_OPEN_RESPONSE_BODY;
20           return responseBuilder;
21         }
22       }
23     } catch (EOFException e) {
24     //blahblahblah
25   }

source.readUtf8LineStrict就是读到“\n”或者“\r\n”结束,可以看Okio BufferedSource
然后再把它解析成状态行,也就是protocol、code、message这三个参数,最好还要readHeaders():

1 /** Reads headers or trailers. */
2   public Headers readHeaders() throws IOException {
3     Headers.Builder headers = new Headers.Builder();
4     // parse the result headers until the first blank line
5     for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
6       Internal.instance.addLenient(headers, line);
7     }
8     return headers.build();
9   }

同样是一行一行的读入,直到空行CLRF,看看如何根据这些string构造headers的:

 1 /**
 2  * Add a header line without any validation. Only appropriate for headers from the remote peer
 3  * or cache.
 4  */
 5 Builder addLenient(String line) {
 6   int index = line.indexOf(":", 1);
 7   if (index != -1) {
 8     return addLenient(line.substring(0, index), line.substring(index + 1));
 9   } else if (line.startsWith(":")) {
10     // Work around empty header names and header names that start with a
11     // colon (created by old broken SPDY versions of the response cache).
12     return addLenient("", line.substring(1)); // Empty header name.
13   } else {
14     return addLenient("", line); // No header name.
15   }
16 }
17 
18 /**
19  * Add a field with the specified value without any validation. Only appropriate for headers
20  * from the remote peer or cache.
21  */
22 Builder addLenient(String name, String value) {
23   namesAndValues.add(name);
24   namesAndValues.add(value.trim());
25   return this;
26 }

不解释了,很简单,这样response header就构造完毕了

接下来回到readNetworkResponse来看body是怎么拿的,看httpStream.openResponseBody(networkResponse))

 1 @Override public ResponseBody openResponseBody(Response response) throws IOException {
 2     Source source = getTransferStream(response);
 3     return new RealResponseBody(response.headers(), Okio.buffer(source));
 4   }
 5 
 6   private Source getTransferStream(Response response) throws IOException {
 7     if (!HttpEngine.hasBody(response)) {
 8       return newFixedLengthSource(0);
 9     }
10 
11     if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
12       return newChunkedSource(httpEngine);
13     }
14 
15     long contentLength = OkHeaders.contentLength(response);
16     if (contentLength != -1) {
17       return newFixedLengthSource(contentLength);
18     }
19   }
20 
21   public RealResponseBody(Headers headers, BufferedSource source) {
22     this.headers = headers;
23     this.source = source;
24   }
  • 首先通过getTransferStream根据response.header的信息,创建一个合适的source,之前也说过了,这是用来读取数据的

  • 然后同样是通过Okio用这个source创建一个buffer来接收数据

  • 最后new一个RealResponseBody,这时候数据已经在BufferedSource里了

回到readNetworkResponse,根据header判断没有后续事项,就可以触发streamAllocation.noNewStreams(),最终调用deallocate:

 1 /**
 2    * Releases resources held by this allocation. If sufficient resources are allocated, the
 3    * connection will be detached or closed.
 4    */
 5   private void deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
 6     RealConnection connectionToClose = null;
 7     synchronized (connectionPool) {
 8       if (streamFinished) {
 9         this.stream = null;
10       }
11       if (released) {
12         this.released = true;
13       }
14       if (connection != null) {
15         if (noNewStreams) {
16           connection.noNewStreams = true;
17         }
18         if (this.stream == null && (this.released || connection.noNewStreams)) {
19           release(connection);
20           if (connection.allocations.isEmpty()) {
21             connection.idleAtNanos = System.nanoTime();
22             if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
23               connectionToClose = connection;
24             }
25           }
26           connection = null;
27         }
28       }
29     }
30     if (connectionToClose != null) {
31       Util.closeQuietly(connectionToClose.socket());
32     }
33   }

它会把自己持有的stream释放,并且把它使用的connection释放,这个connection还会看看还有没有别的stream在用它,如果没有,去connectionPool里看看还有没有在排队的stream需要这个connection,都不满足的话就关闭这个socket,还蛮残忍的是不是

好了,我们从readNetworkResponse返回,再从proceed返回,回到了readResponse中,补完之前给出的一半代码:

 1 receiveHeaders(networkResponse.headers());
 2 
 3     // If we have a cache response too, then we're doing a conditional get.
 4     if (cacheResponse != null) {
 5       if (validate(cacheResponse, networkResponse)) {
 6         userResponse = cacheResponse.newBuilder()
 7             .request(userRequest)
 8             .priorResponse(stripBody(priorResponse))
 9             .headers(combine(cacheResponse.headers(), networkResponse.headers()))
10             .cacheResponse(stripBody(cacheResponse))
11             .networkResponse(stripBody(networkResponse))
12             .build();
13         networkResponse.body().close();
14         releaseStreamAllocation();
15 
16         // Update the cache after combining headers but before stripping the
17         // Content-Encoding header (as performed by initContentStream()).
18         InternalCache responseCache = Internal.instance.internalCache(client);
19         responseCache.trackConditionalCacheHit();
20         responseCache.update(cacheResponse, stripBody(userResponse));
21         userResponse = unzip(userResponse);
22         return;
23       } else {
24         closeQuietly(cacheResponse.body());
25       }
26     }
27 
28     userResponse = networkResponse.newBuilder()
29         .request(userRequest)
30         .priorResponse(stripBody(priorResponse))
31         .cacheResponse(stripBody(cacheResponse))
32         .networkResponse(stripBody(networkResponse))
33         .build();
34 
35     if (hasBody(userResponse)) {
36       maybeCache();
37       userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
38     }
39   }

大致说一下,就是把priorResponse、cacheResponse以及我们刚才从server中拿到的networkResponse和起来放入userResponse里面(最后一行会构建一个新的response,换一个source,并解压,其实并没有看懂,因为Okio这块还是有知识点缺失),如果要cache的话也不要忘了加入到缓存中去

至此,readResponse就结束了,不过response的内容我们还没有拿到,继续看getResponse里面:

 1 while(true) {
 2 
 3       // blahblahblah
 4 
 5       Response response = engine.getResponse();
 6       Request followUp = engine.followUpRequest();
 7 
 8       if (followUp == null) {
 9         if (!forWebSocket) {
10           engine.releaseStreamAllocation();
11         }
12         return response;
13       }
14 
15       StreamAllocation streamAllocation = engine.close();
16 
17       if (++followUpCount > MAX_FOLLOW_UPS) {
18         streamAllocation.release();
19         throw new ProtocolException("Too many follow-up requests: " + followUpCount);
20       }
21 
22       if (!engine.sameConnection(followUp.url())) {
23         streamAllocation.release();
24         streamAllocation = null;
25       }
26 
27       request = followUp;
28       engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
29           response);
30     }

engine.getResponse只是把userResponse拿出来,关键是要看有没有followUpRequest:

 1 /**
 2    * Figures out the HTTP request to make in response to receiving this engine's response. This will
 3    * either add authentication headers, follow redirects or handle a client request timeout. If a
 4    * follow-up is either unnecessary or not applicable, this returns null.
 5    */
 6   public Request followUpRequest() throws IOException {
 7     if (userResponse == null) throw new IllegalStateException();
 8     Connection connection = streamAllocation.connection();
 9     Route route = connection != null
10         ? connection.route()
11         : null;
12     int responseCode = userResponse.code();
13 
14     final String method = userRequest.method();
15     switch (responseCode) {
16       case HTTP_PROXY_AUTH:
17         Proxy selectedProxy = route != null
18             ? route.proxy()
19             : client.proxy();
20         if (selectedProxy.type() != Proxy.Type.HTTP) {
21           throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
22         }
23         return client.proxyAuthenticator().authenticate(route, userResponse);
24 
25       case HTTP_UNAUTHORIZED:
26         return client.authenticator().authenticate(route, userResponse);
27 
28       case HTTP_PERM_REDIRECT:
29       case HTTP_TEMP_REDIRECT:
30         // "If the 307 or 308 status code is received in response to a request other than GET
31         // or HEAD, the user agent MUST NOT automatically redirect the request"
32         if (!method.equals("GET") && !method.equals("HEAD")) {
33           return null;
34         }
35         // fall-through
36       case HTTP_MULT_CHOICE:
37       case HTTP_MOVED_PERM:
38       case HTTP_MOVED_TEMP:
39       case HTTP_SEE_OTHER:
40         // Does the client allow redirects?
41         if (!client.followRedirects()) return null;
42 
43         String location = userResponse.header("Location");
44         if (location == null) return null;
45         HttpUrl url = userRequest.url().resolve(location);
46 
47         // Don't follow redirects to unsupported protocols.
48         if (url == null) return null;
49 
50         // If configured, don't follow redirects between SSL and non-SSL.
51         boolean sameScheme = url.scheme().equals(userRequest.url().scheme());
52         if (!sameScheme && !client.followSslRedirects()) return null;
53 
54         // Redirects don't include a request body.
55         Request.Builder requestBuilder = userRequest.newBuilder();
56         if (HttpMethod.permitsRequestBody(method)) {
57           if (HttpMethod.redirectsToGet(method)) {
58             requestBuilder.method("GET", null);
59           } else {
60             requestBuilder.method(method, null);
61           }
62           requestBuilder.removeHeader("Transfer-Encoding");
63           requestBuilder.removeHeader("Content-Length");
64           requestBuilder.removeHeader("Content-Type");
65         }
66 
67         // When redirecting across hosts, drop all authentication headers. This
68         // is potentially annoying to the application layer since they have no
69         // way to retain them.
70         if (!sameConnection(url)) {
71           requestBuilder.removeHeader("Authorization");
72         }
73 
74         return requestBuilder.url(url).build();
75 
76       case HTTP_CLIENT_TIMEOUT:
77         // 408's are rare in practice, but some servers like HAProxy use this response code. The
78         // spec says that we may repeat the request without modifications. Modern browsers also
79         // repeat the request (even non-idempotent ones.)
80         boolean retryableBody = requestBodyOut == null || requestBodyOut instanceof RetryableSink;
81         if (callerWritesRequestBody && !retryableBody) {
82           return null;
83         }
84 
85         return userRequest;
86 
87       default:
88         return null;
89     }
90   }

就不一条一条分析了,大致就是根据userResponse的responseCode来判断是否需要再发起一次request,如果需要的话就再构建一个

我们回到getResponse,如果没有followUp Request,就将stream和connection断开,释放engine的StreamAllocation,返回response 如果还有,那仅仅是将engine.close,在这之中只会将一些io端口关掉,并返回原来的streamAllocation,因为它是要保留进行下一次操作的,不过如果followUp的connection和当前request的url不一样,也就是属于不同的connection,那还是会释放这个streamAllocation,最后,再重新new一个engine,进行下一个循环的操作

好了,getResponse可以返回了,最终这个response会返回到AsycCall的getResponseWithInterceptorChain,然后调用responseCallback.onResponse(RealCall.this, response),这样就的到了最终的结果~ 当然了,还在client的dispatcher中把这个call给finish,不知道还记得起来这个dispatcher不~

最后我们来看看response.string()做了什么:

 1 /**
 2    * Returns the response as a string decoded with the charset of the Content-Type header. If that
 3    * header is either absent or lacks a charset, this will attempt to decode the response body as
 4    * UTF-8.
 5    */
 6   public final String string() throws IOException {
 7     return new String(bytes(), charset().name());
 8   }
 9 
10 
11   public final byte[] bytes() throws IOException {
12     long contentLength = contentLength();
13     if (contentLength > Integer.MAX_VALUE) {
14       throw new IOException("Cannot buffer entire body for content length: " + contentLength);
15     }
16 
17     BufferedSource source = source();
18     byte[] bytes;
19     try {
20       bytes = source.readByteArray();
21     } finally {
22       Util.closeQuietly(source);
23     }
24     if (contentLength != -1 && contentLength != bytes.length) {
25       throw new IOException("Content-Length and stream length disagree");
26     }
27     return bytes;
28   }

最后一块石头落地,这些数据确实是从source里read出来的

总结

整个okhttp其实是一个非常庞大的项目,我也只是追踪了冰山一角中的一条线的代码,但已经感受良多,分为一下几个方面:

  • 整个项目充斥着builder、单例、策略等设计模式,还有很多我可能认不出的,虽然给代码跟踪带来了一些小小的障碍,但是极大的降低了组建之间的耦合程度,这么大的一个project,不同的人写不同的模块,设计模式确实可以帮助更高效的开发和测试

  • 对于并行处理事件上又多了一点感悟,光依赖线程池是不够的,要灵活的控制task还要再开一个队列,由它来控制task的cancel与否,或者统计哪些在执行的哪些是不在执行的,并且要等这个task执行结束后再从这个队列里销毁,能够及时知道它的状态,对这个task有更好的掌控性,事实上Volley也是这么做的,但是也要注意处理多线程数据同步问题(AsyncTask现在变成串行的了,因此没有这个问题)

  • okhttp的interceptor拦截器给我的印象很深,我认为这是一种AOP编程思想,在这里画一个流程图会比较好的解释:

 1 chain.proceed(request)   <---------------------------------------------------
 2       nextChain = new Chain(request)                                         |
 3       interceptor = getNextInterceptor()                                     |
 4       if (interceptor)                                                       |
 5           return interceptor.intercept(chain)                                |
 6       else                               |                                   | 
 7           return defalut                 v                                   |
 8                                        do sth. before                        |   
 9                                        res = chain.proceed(chain.request)-----
10                                        do sth. after
11                                        return res
  • 会维护一个ConnectionPool,争取最大限度的复用connection这种socket物理连接,这跟Android里handler的msg以及inBitmap有相似的道理。同时用一个stream来维护一个逻辑上的request/response的pair,有种多路复用的感觉,这个优化应该是以后自己的项目中能够想的到的一种优化(假设多个事务同时操作文件,那个可以把对同一个文件的操作分为逻辑操作和物理操作两种,逻辑操作可以复用物理上的buffer而不用重新销毁再创建)

  • 对于cache的设计,okhttp不像universal-image-loader那样单独的设计两级缓存,我在代码中看到了snapshot之类的字眼,以后有空一定会专门去分析它来看看如何设计一个健壮高效的cache

  • 现在能体会到优秀开源代码的厉害之处了,以后一定要多多看看这种代码,长长见识才能更好的提高自己