// Dispatcher#executed() /** Used by {@code Call#execute} to signal it is in-flight. */ synchronizedvoidexecuted(RealCall call) { runningSyncCalls.add(call); }
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection)throws IOException { if (index >= interceptors.size()) thrownewAssertionError();
calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { thrownewIllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); }
// If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { thrownewIllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // Call the next interceptor in the chain. // 得到下一次对应的 RealInterceptorChain RealInterceptorChainnext=newRealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); // 当前次数的 interceptor Interceptorinterceptor= interceptors.get(index); // 进行拦截处理,并且在 interceptor 链式调用 next 的 proceed 方法 Responseresponse= interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed(). // 确认下一次的 interceptor 调用过 chain.proceed() if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { thrownewIllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); }
// Confirm that the intercepted response isn't null. if (response == null) { thrownewNullPointerException("interceptor " + interceptor + " returned null"); }
if (response.body() == null) { thrownewIllegalStateException( "interceptor " + interceptor + " returned a response with no body"); }
intfollowUpCount=0; ResponsepriorResponse=null; while (true) { // 如果取消,就释放资源 if (canceled) { streamAllocation.release(); thrownewIOException("Canceled"); }
Response response; booleanreleaseConnection=true; try { // 调用下一个拦截器 response = realChain.proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. // 路由连接失败,请求将不会被发送 if (!recover(e.getLastConnectException(), streamAllocation, false, request)) { throw e.getFirstConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. // 服务器连接失败,请求可能已被发送 booleanrequestSendStarted= !(e instanceof ConnectionShutdownException); if (!recover(e, streamAllocation, requestSendStarted, request)) throw e; releaseConnection = false; continue; } finally { // We're throwing an unchecked exception. Release any resources. // 抛出未检查的异常,释放资源 if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } }
// Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } // 如果不需要重定向,那么 followUp 为空,会根据响应码判断 Request followUp; try { followUp = followUpRequest(response, streamAllocation.route()); } catch (IOException e) { streamAllocation.release(); throw e; } // 释放资源,返回 response if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } // 关闭 response 的 body closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); thrownewProtocolException("Too many follow-up requests: " + followUpCount); }
if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); thrownewHttpRetryException("Cannot retry streamed HTTP body", response.code()); } // response 和 followUp 比较是否为同一个连接 // 若为重定向就销毁旧连接,创建新连接 if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = newStreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } elseif (streamAllocation.codec() != null) { thrownewIllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } // 将重定向操作得到的新请求设置给 request request = followUp; priorResponse = response; } }
// If we're forbidden from using the network and the cache is insufficient, fail. // 禁止网络并且没有缓存的话,返回失败 if (networkRequest == null && cacheResponse == null) { returnnewResponse.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); }
// If we don't need the network, we're done. // 不用网络请求,返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); }
ResponsenetworkResponse=null; try { // 交给下一个拦截器,返回 networkResponse networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } }
// 如果我们同时有缓存和 networkResponse ,根据情况使用 if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Responseresponse= cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // 更新原来的缓存至最新 // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } }
Responseresponse= networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); // 保存之前未缓存的缓存 if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. CacheRequestcacheRequest= cache.put(response); return cacheWritingResponse(cacheRequest, response); }
if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } }
// We need the network to satisfy this request. Possibly for validating a conditional GET. booleandoExtensiveHealthChecks= !request.method().equals("GET"); HttpCodechttpCodec= streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnectionconnection= streamAllocation.connection();
Response.BuilderresponseBuilder=null; // 检查是否为有 body 的请求方法 if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return what // we did get (such as a 4xx response) without ever transmitting the request body. // 如果有 Expect: 100-continue 在请求头中,那么要等服务器的响应 if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); responseBuilder = httpCodec.readResponseHeaders(true); }
if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. // 写入请求体 SinkrequestBodyOut= httpCodec.createRequestBody(request, request.body().contentLength()); BufferedSinkbufferedRequestBody= Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } elseif (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from // being reused. Otherwise we're still obligated to transmit the request body to leave the // connection in a consistent state. streamAllocation.noNewStreams(); } }
// Dispatcher#finished() /** Used by {@code AsyncCall#run} to signal completion. */ voidfinished(AsyncCall call) { finished(runningAsyncCalls, call, true); }
/** Used by {@code Call#execute} to signal completion. */ voidfinished(RealCall call) { finished(runningSyncCalls, call, false); }
private <T> voidfinished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) thrownewAssertionError("Call wasn't in-flight!"); // 将 readyAsyncCalls 中的 call 移动到 runningAsyncCalls 中,并加入到线程池中 if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; }