OkHttp 网络框架源码解析

Author Avatar
wshunli 9月 13, 2018
  • 在其它设备中阅读本文章

本文介绍 OkHttp 网络框架,包含简单的使用和源码解析。本文内容基于 OkHttp 3.11.0 版本

网上关于 OkHttp 源码解析的文章有很多,我在这里参考他们的资料,形成自己的知识体系。

只是停留在应用层面,会使用一些框架是不行的,还需要深入源码、剖析结构。

An HTTP+HTTP/2 client for Android and Java applications. http://square.github.io/okhttp/

支持 HTTP/2 协议,允许连接到同一个主机地址的所有请求共享 Socket 。
在 HTTP/2 协议不可用的情况下,通过连接池减少请求的延迟。
支持 GZip 透明压缩,减少传输的数据包大小。
支持响应缓存,避免同一个重复的网络请求。

OkHttp 的简单使用

一般情况下,对于网络框架有两种常见的使用场景,同步请求和异步请求。

同步请求

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("https://wshunli.com").build();
Call call = okHttpClient.newCall(request);
Response response = call.execute();
Log.d(TAG, "onCreate: " + response.body().string());

异步请求

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("https://wshunli.com").build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }
    @Override
    public void onResponse(Call call, Response response) throws IOException {
        Log.d(TAG, "onCreate: " + response.body().string());
    }
});

同步请求和异步请求类似,先实例化 OkHttpClient 和 Request 对象,然后使用 OkHttpClient 对象的 newCall() 方法创建 Call 对象,只不过最后执行 enqueue() 方法,整体和网络请求的思路相似。

OkHttp 的源码分析

OkHttp 网络请求完整的流程图如下:

OkHttp 流程图

下面详细介绍。

同步请求

同步请求,先实例化 OkHttpClient 和 Request 对象,然后使用 OkHttpClient 对象的 newCall() 方法创建 Call 对象,最后执行 execute() 方法,整体和网络请求的思路相似。

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("https://wshunli.com").build();
Call call = okHttpClient.newCall(request);
Response response = call.execute();

创建 OkHttpClient 对象

我们先看 OkHttp 的构造函数:

public OkHttpClient() {
  this(new Builder());
}

这里是直接实例化,实质上是使用 建造者模式 构建 OkHttpClient 实例。

下面是 OkHttpClient 内部类 Builder 的构造方法:

public Builder() {
  dispatcher = new Dispatcher();
  protocols = DEFAULT_PROTOCOLS;
  connectionSpecs = DEFAULT_CONNECTION_SPECS;
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  proxySelector = ProxySelector.getDefault();
  cookieJar = CookieJar.NO_COOKIES;
  socketFactory = SocketFactory.getDefault();
  hostnameVerifier = OkHostnameVerifier.INSTANCE;
  certificatePinner = CertificatePinner.DEFAULT;
  proxyAuthenticator = Authenticator.NONE;
  authenticator = Authenticator.NONE;
  connectionPool = new ConnectionPool();
  dns = Dns.SYSTEM;
  followSslRedirects = true;
  followRedirects = true;
  retryOnConnectionFailure = true;
  connectTimeout = 10_000;
  readTimeout = 10_000;
  writeTimeout = 10_000;
  pingInterval = 0;
}
public OkHttpClient build() {
  return new OkHttpClient(this);
}

这里 OkHttpClient.Builder 有很多参数,后面再介绍。

创建 Request 对象

和 OkHttpClient 类似,Request 也是是使用 建造者模式 创建实例。

public Builder() {
  this.method = "GET";
  this.headers = new Headers.Builder();
}
public Request build() {
  if (url == null) throw new IllegalStateException("url == null");
  return new Request(this);
}

其中配置默认请求方法为 GET ,还有一些头部的默认参数。

创建 Call 对象

OkHttpClient 实现了 Call.Factory ,负责根据请求创建新的 Call 对象。

@Override public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}

Call 只是个接口,实际是实例化的 RealCall 对象。

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  this.client = client;
  this.originalRequest = originalRequest;
  this.forWebSocket = forWebSocket;
  this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.eventListener = client.eventListenerFactory().create(call);
  return call;
}

发送同步网络请求

发送请求也是在 RealCallexecute() 方法中执行的。

// RealCall#execute()
@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  try {
    client.dispatcher().executed(this);
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } catch (IOException e) {
    eventListener.callFailed(this, e);
    throw e;
  } finally {
    client.dispatcher().finished(this);
  }
}

在这里主要做了四件事:

1、检查 Call 是否执行过,没有执行将 executed 赋值为 true ,保证每个请求只执行一次;
2、使用 client.dispatcher().executed(this) 来进行实际的请求;
3、调用 getResponseWithInterceptorChain() 方法,获取请求响应的结果;
4、最后 dispatcher 结束自己。

// Dispatcher#executed()
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

在同步请求中 dispatcher 只是负责判断请求执行的状态,在异步请求中参与内容过多。

下面我们来看 getResponseWithInterceptorChain() 方法:

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  interceptors.addAll(client.interceptors()); // 加入用户自定义的拦截器
  interceptors.add(retryAndFollowUpInterceptor); // 重试和重定向拦截器
  interceptors.add(new BridgeInterceptor(client.cookieJar())); // 加入转化请求响应的拦截器
  interceptors.add(new CacheInterceptor(client.internalCache())); // 加入缓存拦截器
  interceptors.add(new ConnectInterceptor(client)); // 加入连接拦截器
  if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors()); // 加入用户自定义的网络拦截器
  }
  interceptors.add(new CallServerInterceptor(forWebSocket)); // 加入请求响应的拦截器

  Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
          originalRequest, this, eventListener, client.readTimeoutMillis());
  // 利用 chain 来链式调用拦截器,最后的返回结果就是 Response 对象
  return chain.proceed(originalRequest);
}

我们都知道,拦截器是 OkHttp 的精髓。

1、client.interceptors() ,首先加入 interceptors 的是用户自定义的拦截器,比如修改请求头的拦截器等;
2、RetryAndFollowUpInterceptor 是用来重试和重定向的拦截器,在下面我们会讲到;
3、BridgeInterceptor 是用来将用户友好的请求转化为向服务器的请求,之后又把服务器的响应转化为对用户友好的响应;
4、CacheInterceptor 是缓存拦截器,若存在缓存并且可用就直接返回该缓存,否则会向服务器请求;
5、ConnectInterceptor 用来建立连接的拦截器;
6、client.networkInterceptors() 加入用户自定义的 networkInterceptors
7、CallServerInterceptor是真正向服务器发出请求且得到响应的拦截器;

最后在聚合了这些拦截器后,利用 RealInterceptorChain 来链式调用这些拦截器,利用的就是 责任链模式

下面介绍 OkHttp 中的 拦截器

拦截器 Interceptor 是 OkHttp 的核心,实际上它把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求。

OkHttp 拦截器

1、RealInterceptorChain 拦截器链

拦截器链 RealInterceptorChain 是真正把这些拦截器串起来的一个角色,调用 proceed() 方法

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  // If we already have a stream, confirm that the incoming request will use it.
  if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.httpCodec != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // Call the next interceptor in the chain.
  // 得到下一次对应的 RealInterceptorChain
  RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
      connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
      writeTimeout);
  // 当前次数的 interceptor
  Interceptor interceptor = interceptors.get(index);
  // 进行拦截处理,并且在 interceptor 链式调用 next 的 proceed 方法
  Response response = interceptor.intercept(next);

  // Confirm that the next interceptor made its required call to chain.proceed().
  // 确认下一次的 interceptor 调用过 chain.proceed()
  if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  if (response.body() == null) {
    throw new IllegalStateException(
        "interceptor " + interceptor + " returned a response with no body");
  }

  return response;
}

在代码中是一次次链式调用拦截器。

2、RetryAndFollowUpInterceptor 重试和重定向的拦截器

@Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
  EventListener eventListener = realChain.eventListener();

  StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
      createAddress(request.url()), call, eventListener, callStackTrace);
  this.streamAllocation = streamAllocation;

  int followUpCount = 0;
  Response priorResponse = null;
  while (true) {
    // 如果取消,就释放资源
    if (canceled) {
      streamAllocation.release();
      throw new IOException("Canceled");
    }

    Response response;
    boolean releaseConnection = true;
    try {
      // 调用下一个拦截器
      response = realChain.proceed(request, streamAllocation, null, null);
      releaseConnection = false;
    } catch (RouteException e) {
      // The attempt to connect via a route failed. The request will not have been sent.
      // 路由连接失败,请求将不会被发送
      if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
        throw e.getFirstConnectException();
      }
      releaseConnection = false;
      continue;
    } catch (IOException e) {
      // An attempt to communicate with a server failed. The request may have been sent.
      // 服务器连接失败,请求可能已被发送
      boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
      if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
      releaseConnection = false;
      continue;
    } finally {
      // We're throwing an unchecked exception. Release any resources.
      // 抛出未检查的异常,释放资源
      if (releaseConnection) {
        streamAllocation.streamFailed(null);
        streamAllocation.release();
      }
    }

    // Attach the prior response if it exists. Such responses never have a body.
    if (priorResponse != null) {
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
          .build();
    }
    // 如果不需要重定向,那么 followUp 为空,会根据响应码判断
    Request followUp;
    try {
      followUp = followUpRequest(response, streamAllocation.route());
    } catch (IOException e) {
      streamAllocation.release();
      throw e;
    }
    // 释放资源,返回 response
    if (followUp == null) {
      if (!forWebSocket) {
        streamAllocation.release();
      }
      return response;
    }
    // 关闭 response 的 body
    closeQuietly(response.body());

    if (++followUpCount > MAX_FOLLOW_UPS) {
      streamAllocation.release();
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    if (followUp.body() instanceof UnrepeatableRequestBody) {
      streamAllocation.release();
      throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
    }
    // response 和 followUp 比较是否为同一个连接
    // 若为重定向就销毁旧连接,创建新连接
    if (!sameConnection(response, followUp.url())) {
      streamAllocation.release();
      streamAllocation = new StreamAllocation(client.connectionPool(),
          createAddress(followUp.url()), call, eventListener, callStackTrace);
      this.streamAllocation = streamAllocation;
    } else if (streamAllocation.codec() != null) {
      throw new IllegalStateException("Closing the body of " + response
          + " didn't close its backing stream. Bad interceptor?");
    }
    // 将重定向操作得到的新请求设置给 request
    request = followUp;
    priorResponse = response;
  }
}

总体来说,RetryAndFollowUpInterceptor 是用来失败重试以及重定向的拦截器。

3、BridgeInterceptor 桥街和适配拦截器

@Override public Response intercept(Chain chain) throws IOException {
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder();
  // 将用户友好的 request 构造为发送给服务器的 request
  RequestBody body = userRequest.body();
  // 若有请求体,则构造
  if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString());
    }

    long contentLength = body.contentLength();
    if (contentLength != -1) {
      requestBuilder.header("Content-Length", Long.toString(contentLength));
      requestBuilder.removeHeader("Transfer-Encoding");
    } else {
      requestBuilder.header("Transfer-Encoding", "chunked");
      requestBuilder.removeHeader("Content-Length");
    }
  }

  if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
  }

  if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
  }

  // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
  // the transfer stream.
  // 使用 gzip 压缩
  boolean transparentGzip = false;
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
  }
  // 设置 cookie
  List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
  if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
  }
  // 设置 UA
  if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
  }
  // 构造完后,将 request 交给下一个拦截器去处理。最后又得到服务端响应 networkResponse
  Response networkResponse = chain.proceed(requestBuilder.build());
  // 保存 networkResponse 的 cookie
  HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
  // 将 networkResponse 构造为对用户友好的 response
  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);
  // 如果 networkResponse 使用 gzip 并且有响应体的话,给用户友好的 response 设置响应体
  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
  }

  return responseBuilder.build();
}

BridgeInterceptor 这一步,先把用户友好的请求进行重新构造,变成了向服务器发送的请求。

之后调用 chain.proceed(requestBuilder.build()) 进行下一个拦截器的处理。

等到后面的拦截器都处理完毕,得到响应。再把 networkResponse 转化成对用户友好的 response

4、CacheInterceptor 缓存拦截器

分析 CacheInterceptor 拦截器 intercept() 方法的源代码

@Override public Response intercept(Chain chain) throws IOException {
    // 得到 request 对应缓存中的 response
    Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    // 获取当前时间,会和之前缓存的时间进行比较
    long now = System.currentTimeMillis();
    // 得到缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    // 追踪缓存,其实就是计数
    if (cache != null) {
        cache.trackResponse(strategy);
    }
    // 缓存不适用,关闭
    if (cacheCandidate != null && cacheResponse == null) {
        closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 禁止网络并且没有缓存的话,返回失败
    if (networkRequest == null && cacheResponse == null) {
        return new Response.Builder()
                .request(chain.request())
                .protocol(Protocol.HTTP_1_1)
                .code(504)
                .message("Unsatisfiable Request (only-if-cached)")
                .body(Util.EMPTY_RESPONSE)
                .sentRequestAtMillis(-1L)
                .receivedResponseAtMillis(System.currentTimeMillis())
                .build();
    }

    // If we don't need the network, we're done.
    // 不用网络请求,返回缓存
    if (networkRequest == null) {
        return cacheResponse.newBuilder()
                .cacheResponse(stripBody(cacheResponse))
                .build();
    }

    Response networkResponse = null;
    try {
        // 交给下一个拦截器,返回 networkResponse
        networkResponse = chain.proceed(networkRequest);
    } finally {
        // If we're crashing on I/O or otherwise, don't leak the cache body.
        if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
        }
    }

    // 如果我们同时有缓存和 networkResponse ,根据情况使用
    if (cacheResponse != null) {
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                    .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                    .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                    .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                    .cacheResponse(stripBody(cacheResponse))
                    .networkResponse(stripBody(networkResponse))
                    .build();
            networkResponse.body().close();
            // 更新原来的缓存至最新
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
        } else {
            closeQuietly(cacheResponse.body());
        }
    }

    Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    // 保存之前未缓存的缓存
    if (cache != null) {
        if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
        }

        if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
                cache.remove(networkRequest);
            } catch (IOException ignored) {
                // The cache cannot be written.
            }
        }
    }

    return response;
}

CacheInterceptor 做的事情就是根据请求拿到缓存,若没有缓存或者缓存失效,就进入网络请求阶段,否则会返回缓存。

5、ConnectInterceptor 拦截器

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  StreamAllocation streamAllocation = realChain.streamAllocation();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
  RealConnection connection = streamAllocation.connection();

  return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

实际上建立连接就是创建了一个 HttpCodec 对象,它是对 HTTP 协议操作的抽象,有两个实现:Http1CodecHttp2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。

6、CallServerInterceptor 拦截器,发送和接收数据

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  HttpCodec httpCodec = realChain.httpStream();
  StreamAllocation streamAllocation = realChain.streamAllocation();
  RealConnection connection = (RealConnection) realChain.connection();
  Request request = realChain.request();    

  long sentRequestMillis = System.currentTimeMillis();
  // 整理请求头并写入
  httpCodec.writeRequestHeaders(request);

  Response.Builder responseBuilder = null;
  // 检查是否为有 body 的请求方法
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      // 如果有 Expect: 100-continue 在请求头中,那么要等服务器的响应
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
          httpCodec.flushRequest();
          responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          // 写入请求体
          Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
          BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
          // being reused. Otherwise we're still obligated to transmit the request body to leave the
          // connection in a consistent state.
          streamAllocation.noNewStreams();
      }
  }

  httpCodec.finishRequest();
  // 得到响应头
  if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
  }
  // 构造 response
  Response response = responseBuilder
          .request(request)
          .handshake(streamAllocation.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

  int code = response.code();
  // 如果为 web socket 且状态码是 101 ,那么 body 为空
  if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
  } else {
      // 读取 body
      response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
  }
  // 如果请求头中有 close 那么断开连接
  if ("close".equalsIgnoreCase(response.request().header("Connection"))
          || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
  }
  // 抛出协议异常
  if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
  }

  return response;
}

CallServerInterceptor 中可见,关于请求和响应部分都是通过 HttpCodec 来实现的。而在 HttpCodec 内部又是通过 sinksource 来实现的。所以说到底还是 IO 流在起作用。

异步请求

和同步请求类似,先实例化 OkHttpClient 和 Request 对象,然后使用 OkHttpClient 对象的 newCall() 方法创建 Call 对象,只不过最后执行 enqueue() 方法,整体和网络请求的思路相似。

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("https://wshunli.com").build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }
    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

异步请求在 Callback 回调中获取响应,有 onResponse()onFailure() 两个方法。

发送异步网络请求

前面三个步骤完全一致,我们从发送异步网络请求开始,异步请求是调用 RealCall 实例的 enqueue() 方法。。

// RealCall#enqueue()
@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

这里使用 Dispatcher 分发器我来处理请求。

// Dispatcher#enqueue()
synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    readyAsyncCalls.add(call);
  }
}

实质上异步网络请求是在 Dispatcher 中做到任务调度。

下面介绍 OkHttp 中的 任务调度

我们来看 Dispatcher 类的源代码。

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;
  /** Executes calls. Created lazily. */
  // 线程池的实现
  private @Nullable ExecutorService executorService;
  /** Ready async calls in the order they'll be run. */
  // 就绪等待网络请求的异步队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在执行网络请求的异步队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在执行网络请求的同步队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }
  public Dispatcher() {
  }
  // 创建线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  /* 省略部分无关代码*/
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  /* 省略部分无关代码*/
}

异步请求是放在线程池中执行的,如果最大异步请求数小于 64 并且 单个 HOST 的异步请求数小于 5 ,将请求添加到 runningAsyncCalls 中,否则添加到 readyAsyncCalls 中。

我们来看添加进线程池的 AsyncCall 类,实际上 AsyncCall 是继承自 NamedRunnableRealCall 内部类。NamedRunnable 是实现了 Runnable 接口的抽象类。

final class AsyncCall extends NamedRunnable {
  private final Callback responseCallback;

  AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
  }

  String host() {
    return originalRequest.url().host();
  }

  Request request() {
    return originalRequest;
  }

  RealCall get() {
    return RealCall.this;
  }

  @Override protected void execute() {
    boolean signalledCallback = false;
    try {
      // 和同步请求相同,调用拦截器,得到响应
      Response response = getResponseWithInterceptorChain();
      if (retryAndFollowUpInterceptor.isCanceled()) {
        signalledCallback = true;
        responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
      } else {
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      }
    } catch (IOException e) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
      } else {
        eventListener.callFailed(RealCall.this, e);
        responseCallback.onFailure(RealCall.this, e);
      }
    } finally {
      // 在 runningAsyncCalls 中移除
      client.dispatcher().finished(this);
    }
  }
}

AsyncCallexecute() 方法中,也是调用了 getResponseWithInterceptorChain() 方法来得到 Response 对象。从这里开始,就和同步请求的流程是一样的,就没必要讲了。

不同的是在得到 Response 后,进行结果的回调。

AsyncCall 的最后调用了 Dispatcherfinished() 方法。

// Dispatcher#finished()
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    // 将 readyAsyncCalls 中的 call 移动到 runningAsyncCalls 中,并加入到线程池中
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }

  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

这里所做的工作就是把执行过的 Call 移除,然后将 readyAsyncCalls 中的 Call 移动到 runningAsyncCalls 中并加入线程池中。

基本上 OkHttp 的请求响应的流程就介绍完了,主要是关于 OkHttp 的 拦截器链任务调度 原理。

还有很多细节没有涉及,需要花费很大的精力,才能理解分析透彻,后面有机会再介绍。

参考资料:
1、拆轮子系列:拆 OkHttp - Piasy的博客 | Piasy Blog
https://blog.piasy.com/2016/07/11/Understand-OkHttp/
2、OkHttp源码解析 | 俞其荣的博客 | Qirong Yu’s Blog
http://yuqirong.me/2017/07/25/OkHttp源码解析/
3、OkHttp源码分析 - 掘金
https://juejin.im/post/5af4482951882567286064e6
4、okhttp源码分析(一)——基本流程(超详细) - 简书
https://www.jianshu.com/p/37e26f4ea57b
5、OKHttp源码解析 | Frodo’s Blog
http://frodoking.github.io/2015/03/12/android-okhttp/
6、OkHttp 源码解析(一):基本流程 - Coding - SegmentFault 思否
https://segmentfault.com/a/1190000012656606
7、【Android】OkHttp源码分析 - CSDN博客
https://blog.csdn.net/u010983881/article/details/79175824
8、深入浅出 OkHttp 源码 - DiyCode
https://www.diycode.cc/topics/640
9、Okhttp框架源码分析 - 简书
https://www.jianshu.com/p/18a4861600d1
10、OkHttp 3.7源码分析(一)——整体架构 - CSDN博客
https://blog.csdn.net/asiaLIYAZHOU/article/details/72598320
11、okhttp网络框架源码解析 - CSDN博客
https://blog.csdn.net/fanguangjun123/article/details/78621585
12、OKHttp网络框架源码解析(一)okHttp框架同步异步请求流程和源码分析 - CSDN博客
https://blog.csdn.net/qq_24675479/article/details/79483193

如果本文对您有所帮助,且您手头还很宽裕,欢迎打赏赞助我,以支付网站服务器和域名费用。 https://paypal.me/wshunli 您的鼓励与支持是我更新的最大动力,我会铭记于心,倾于博客。
本文链接:https://www.wshunli.com/posts/5bd2f229.html