整理技术笔记才发现还有一些文章草稿没有收尾,2018年写到一半的文章准备发博客却因为一些不可控制的外力导致搁置,现在重新拿出来并且整理一下。Okhttp系列 ,I‘m back.

前面我们在Okhttp和Volley这两个开源项目比较重试/重定向、缓存、请求池的设计 ,出门左转文件传送门网络 — Volley vs. OkHttp,接下来我们只讲Okhttp怎么设计连接池。

这还得从一个call调用开始说起,当应用层调用请求,紧接而来的就是责任链的一系列调用,对于责任链模式可以看这里的JavaChainOfResponsibility。简单理解就是一个请求经过一条链式时,链上的每个节点会根据情况选择性的处理,如果其中有一节点不处理了往下的节点也就不会处理了。咦?这味道是不是有点熟悉,Android View树事件的分发。我们再把话题重新拉回来,在Okhttp中称呼这条链上的节点叫做拦截器,从应用层往下分别是:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor,这一节我们要将的就是ConnectInterceptor这个拦截器了。

ConnectInterceptor的代码不多,但是其实都是被封装起来了,让我们来一层层剥开它。

object ConnectInterceptor : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }
}

先来认识几个重要的角色

  • Exchange:数据交换器
  • ExchangeCodec:数据交换器的流编解码模块,用于处理http1.x还是http2.x协议
  • ExchangeFinder:数据交换器的流编解码模块的finder
  • Connection:连接器
  • Transmitter:发射器(由数据交换器+连接器构成)
  • Route/RouteSelector:网络路由/路由选择器,网络路由持有代理对象,如果当前网络不可用可以切换Proxy提供的线路,我们经常看到视频网站的某个视频会提供很多个数据源线路
  • Proxy/ProxySelector:代理/代理选择器

首先Transmitter(OkHttp最新代码(commit-id 4ebc5f644c92ad08e41908db2ccaff4819cd0cbe)已经没有这个类了,被合并到RealCall,不过对于那些被封装的代码来说不过是换了个妈,本质没变)是应用层和网络层的桥(这里应用层和网络层指的是Okhttp定义的,BridgeInterceptor以上为应用层,往下为网络层),它管理者Exchange和Connection,可以认为一个call就会有一个Transmitter用于创造属于这个call调用的交互器Exchange和端与端的连接器Connection,所以上面的代码不难看出newExchange方法的意思。

Transmitter

  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
    }

    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }

紧接着我们看到了通过finder找到了交换器的编解码模块,那么如何找到的呢?

ExchangeFinder

  fun find(
    ...
  ): ExchangeCodec {
    ...
    try {
      val resultConnection = findHealthyConnection(
        ...
      )
      return resultConnection.newCodec(client, chain)
    ...
  }
  ...
  @Throws(IOException::class)
  private fun findHealthyConnection(
    ...
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges()
        continue
      }

      return candidate
    }
  }

代码也很简单,真正难点在于findConnection方法如何找有有效的备胎。

ExchangeFinder

 @Throws(IOException::class)
  private fun findConnection(
    ...
  ): RealConnection {
    ...
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.

      releasedConnection = transmitter.connection
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
        transmitter.releaseConnectionNoEvents()
      } else {
        null
      }

      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection
        releasedConnection = null
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route()
        }
      }
    }
    ...
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }

    // If we need a route selection, make one. This is a blocking operation.
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }
    ...
  }
  1. transmitter.connection
  2. RealConnectionPool#transmitterAcquirePooledConnection
  3. new RealConnection

查看可用的备胎遵循这一定的规则,不是什么阿猫阿狗都能当备胎。首先最好是能有当备胎的经验,从零开始培养一个备胎远比已经当过备胎更加耗费资源,创建一个连接是需要调用系统资源的。如果在Transmitter中找不到这个连接,再去连接池找到(RealConnectionPool#transmitterAcquirePooledConnection方法找),毕竟身边有现成的在跑到备胎池到处找来得快,最后真不行只能自己动用一切资源培养一个备胎。

比较难的地方在于连接池的查找,第一次查找时仅仅通过请求的host和复用的连接host相同就能找复用的连接,如果不存在这样的连接,那么这时候就要切换路由,路由线路的数量来源于上层应用提供了多少代理。

RouteSelector

  private fun resetNextInetSocketAddress(proxy: Proxy) {
    // Clear the addresses. Necessary if getAllByName() below throws!
    val mutableInetSocketAddresses = mutableListOf<InetSocketAddress>()
    inetSocketAddresses = mutableInetSocketAddresses

    val socketHost: String
    val socketPort: Int
    if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
      socketHost = address.url.host
      socketPort = address.url.port
    } else {
      val proxyAddress = proxy.address()
      require(proxyAddress is InetSocketAddress) {
        "Proxy.address() is not an InetSocketAddress: ${proxyAddress.javaClass}"
      }
      socketHost = proxyAddress.socketHost
      socketPort = proxyAddress.port
    }

    if (socketPort !in 1..65535) {
      throw SocketException("No route to $socketHost:$socketPort; port is out of range")
    }

    if (proxy.type() == Proxy.Type.SOCKS) {
      mutableInetSocketAddresses += InetSocketAddress.createUnresolved(socketHost, socketPort)
    } else {
      eventListener.dnsStart(call, socketHost)

      // Try each address for best behavior in mixed IPv4/IPv6 environments.
      val addresses = address.dns.lookup(socketHost)
      if (addresses.isEmpty()) {
        throw UnknownHostException("${address.dns} returned no addresses for $socketHost")
      }

      eventListener.dnsEnd(call, socketHost, addresses)

      for (inetAddress in addresses) {
        mutableInetSocketAddresses += InetSocketAddress(inetAddress, socketPort)
      }
    }
  }

当然了也不是所有的代理都能用,如果是http代理和http直连,RouteSelector就会用host通过dns查找可用的ip地址集合;如果是socks代理,RouteSelector就提供一条ip线路

找到了可用的路由,接下来就再次去连接池查找连接,由于向连接池提供了可用的路由,那么不在像第一次只提供host而找不到连接,连接池还会在通过路由这一筛选条件,不过可用路由必须和复用的连接一样是直连并且地址相同。这里还有一点对于需要主机验证的连接是不能被复用的

如果还是找不到怎么办?构造一个全新连接,构造新的连接,必然要和服务器进行三次握手有可能也要进行TLS连接。

ExchangeFinder

  @Throws(IOException::class)
  private fun findConnection(
    ...
  ): RealConnection {
    ...
    // Do TCP + TLS handshakes. This is a blocking operation.
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    connectionPool.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection
      } else {
        connectionPool.put(result!!)
        transmitter.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
    ...
  }

到这里我们就看到了,连接之后就放入连接池,成为她人的备胎。那么对于tcp的握手过程,究竟做了什么?为什么成为备胎要经历这么多的艰辛。

RealConnection

  fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
  ) {
    check(protocol == null) { "already connected" }

    var routeException: RouteException? = null
    val connectionSpecs = route.address.connectionSpecs
    val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
    ...

    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break
          }
        } else {
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        ...

        if (routeException == null) {
          routeException = RouteException(e)
        } else {
          routeException.addConnectException(e)
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException
        }
      }
    }
    ...
    val http2Connection = this.http2Connection
    if (http2Connection != null) {
      synchronized(connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams()
      }
    }
  }

前面我们说过路由的查找,路由的线路连接主要通过代理,而代理又分为直连、http、socks。如果只是使用这三种代理那么网络默认是明文传输的,如果开启隧道(http代理+tls)就要求必须加密,这里说一下隧道,开启前会先发CONNECT报文进行连接。当然了这三种代理我们也可以选择性的配置tls,取决于上层应用给的规格connectionSpecs。在开启隧道过程中,客户端和代理服务器会进行鉴权,客户端要在Proxy-Authenticate字段中加入鉴权的数据。

连接socket之后,就要看看接下来要不要tls连接了。

RealConnection

private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
      val address = route.address
    val sslSocketFactory = address.sslSocketFactory
    var success = false
    var sslSocket: SSLSocket? = null
    try {
      // Create the wrapper over the connected socket.
      sslSocket = sslSocketFactory!!.createSocket(
          rawSocket, address.url.host, address.url.port, true /* autoClose */) as SSLSocket

      // Configure the socket's ciphers, TLS versions, and extensions.
      val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
      if (connectionSpec.supportsTlsExtensions) {
        Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
      }

      // Force handshake. This can throw!
      sslSocket.startHandshake()
      // block for session establishment
      val sslSocketSession = sslSocket.session
      val unverifiedHandshake = sslSocketSession.handshake()

      // Verify that the socket's certificates are acceptable for the target host.
      if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) {
        ...
      }

      val certificatePinner = address.certificatePinner!!

      handshake = Handshake(unverifiedHandshake.tlsVersion, unverifiedHandshake.cipherSuite,
          unverifiedHandshake.localCertificates) {
        certificatePinner.certificateChainCleaner!!.clean(unverifiedHandshake.peerCertificates,
            address.url.host)
      }

      // Check that the certificate pinner is satisfied by the certificates presented.
      certificatePinner.check(address.url.host) {
        handshake!!.peerCertificates.map { it as X509Certificate }
      }

      // Success! Save the handshake and the ALPN protocol.
      val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
        Platform.get().getSelectedProtocol(sslSocket)
      } else {
        null
      }
}

通过外部提供的sslSocketFactory我们构建了SSLSocket,并和服务器进行的握手,ssl握手的过程可以看这一博文,传送门抓包原理。握手之后拿到证书,通过Okhttp提供的OkHostnameVerifier验证主机,当然你也可以自定义验证主机的逻辑。那么之后就完成连接。这里还要提一嘴,Okhttp项目中还提供了okhttp-tls,帮助我们去实现客户端服务端证书的管理。到这里我们就知道了培养备胎多么浪费资源了吧,先socket握手然后sslsocket握手,握手之后验证证书,所以能从复用就复用千万不要自己搞。

在进行连接的过程中会出现各种不能恢复的异常

  • 一些致命的异常,ProtocolException SSLHandshakeException/CertificateException SSLPeerUnverifiedException
  • 客户端配置要求禁止重试
  • 没有更多可用的路由
  • request的body没有数据了:如果是本地路由问题,request还没有被发送body还有数据,如果是服务器问题,那么request已经被发送并且body没有了数据

与上面的不可恢复的异常相对的是可以恢复的异常SocketTimeoutException,还有本地路由出现了问题,那么对于Okhttp来说会在RetryAndFollowUpInterceptor重新尝试去连接服务。

这里我们还需要扩展一个东西。http1.x和http2的发送请求和接受响应的不同,http1.x使用了管道化发送,一个tcp连接可以多个请求可以同时发送,但是服务器却按照顺序响应;http2由于采用了stream模式,一个tcp连接上面有多个stream,可以同时多个请求的同时发送响应。上面的连接复用对于http1.x来说我们很好理解,对于http2在代码的设计上OkHttp有一点不太一样。被复用的RealConnection对象其实是持有Http2Connection对象,当然了前提是使用http2协议,复用了RealConnection对象也就相当于复用了Http2Connection对象,Http2Connection对象存储这一堆的stream,每个stream处理一对请求响应。

你以为成为备胎就完事了?放在池子里的备胎也会只是被供挑选的对象也存在竞争,池子也遵循着末尾淘汰机制。那么这个末位淘汰机制是如何设置的?接下来看看。

  private val cleanupTask = object : Task("$okHttpName ConnectionPool") {
    override fun runOnce() = cleanup(System.nanoTime())
  }

每次有个新连接被put到连接池中,都会触发clean任务。它会清理那些超过保活时间5min的连接或者超过6以上个处于空闲的连接,简单说就是那些到35岁的人或者团队超过6以上长时间不干活的人。你以为这样就clean up只会发生一次,太年轻了。如果clean完一个连接,紧接着马不停蹄没有delay的又开始下一次清理。如果发现这次清理的没有超过35岁或者不干活的人低于5以下,那么就会采取两种判断,如果0< 不干活人数 <=5,那么就会用保活时间减去取空闲时间最长人空闲的时间(keepAliveDurationNs - longestIdleDurationNs),其差作为下次clean任务的delay,等下次clean时,如果还不干活,就clean这个人。还有一种是既然35岁还没有到,比如34岁,就等一年在把他clean掉。那么有没有一种办法终止这种末位淘汰机制,还真有,公司倒闭了没有人了,clean循环停止。

参考资料

Java使用SSLSocket通信

如果您觉得写得还不错或者对您有所启发,那就赶紧动动您的小指头,点击下面的红色按钮,狠狠地打赏一番吧。