etcd client v3 连接流程
首先需要了解 grpc 框架的一些概念,这边引用网上的一张图
Resolver
提供一个用户自定义的解析、修改地址的方法,使得用户可以自己去实现地址解析的逻辑、做服务发现、地址更新等等功能。
- 将 Endpoints 里的 ETCD 服务器地址(127.0.0.1:2379 这种格式)做一次转换传给 grpc 框架。也可以自己重新写此 resolver,做服务发现功能。例如 etcd 服务器地址写 nacos 之类的地址,在 resolver 中写好转换逻辑。
- 调用 ClientConn 的 ParseServiceConfig 接口告诉 endpoints 的负载策略是轮询
Balancer
- 管理 subConns,并收集各 conn 信息,更新状态至 ClientConn
- 生成 picker(balancer)的快照,从而 ClientConn 可以选择发送 rpc 请求的 subConn
此处 etcd client 没有实现 balancer,默认使用 grpc 提供的轮询的 balancer
重试策略
与一般的 c-s 模型不同,etcd client 的重试是针对集群的重试。单个节点的断连不会造成所有节点的重连。
重试机制
一般的重试是对同一个节点进行重试,但 etcd client 的自动重试不会在 ETCD 集群的同一节点上进行,是轮询重试集群的每个节点。重试时不会重新建连,而是使用 balancer 提供的 transport。transport 的状态更新与这一块的重试是通过 balancer 解耦的。
重试条件
etcd unary 拦截器
拦截器类似 http 里的中间件的概念,在发送实际请求之前对报文进行篡改。一般用来添加认证,日志记录,缓存之类的功能。
此处 etcd 的一元拦截器主要做了自动重试的功能,且只会重试一些特定的错误(DeadlineExceeded, Canceled,ErrInvalidAuthToken)
1 | func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor { |
重试次数
此处 Invoke 的大循环里,默认 callOpts.max 是 0,也就是说尝试一次 Invoke 后就会 return 错误
1 | var ( |
重试时间
若重试次数已经达到 quorum,则真正的计算间隔时长,间隔时长(25ms 左右)到期后,才进行重试。
否则,直接返回 0,也就是马上重试。
1 | func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, |
问题分析
根据以上基础,分析 etcd client 启动后修改 hosts 文件产生错误的原因
测试环境
etcd 集群 1:10.50.44.196(未对 client 开放端口),10.50.44.174(未对 client 开放端口), 10.50.44.170
etcd 集群 2:10.50.44.65,10.50.44.69,10.50.44.53
其中 etcd 集群 1 的 10.50.44.196 与 10.50.44.174 未对 etcd client 所在的机器开放端口
初始阶段
最开始时,/etc/hosts 文件配置的地址如下
1 | 10.50.44.196 etcdserver1.com |
此时从日志中可以看到未开放端口的两个节点对应的 subchannel 一直处于 TRANSIENT_FAILURE 和 CONNECTING 状态。
1 | time="2022-10-20 14:06:05.93148" level=warning msg="[core] grpc: addrConn.createTransport failed to connect to {etcdserver3.com:42379 etcdserver3.com <nil> 0 <nil>}. Err: |
subchannel 的状态在框架内由 addrConn 控制,创建 ClientConn 后会从 balancer 中返回一系列 endpoint 的地址(状态是不可用的)。之后对于每个地址建立连接(addrConn.resetTransport),并更新状态。balancer 将状态为 READY 的连接(只有 etcdserver3.com)封装成 picker 提供给 client.Conn 来连接。
addrConn.resetTransport 里面是一个大循环,对 endpoint 列表里所有地址重试建立 http2 连接直到成功(newHTTP2Client)。
- net.dial 建立 tcp 连接,失败的话返回 TransientFailure
- http2 的流操作:单独启动 goroutine,循环读取所有的 frame,并发送到相应的 stream 中去
默认情况下连接失败会将 addrConn 的状态修改为 TransientFailure,暂时不让 ClientConn 使用,并等待 sleeptime 后重试。整个连接尝试持续 minConnectTimeout(20s)。连接成功后将 addrConn 状态置为 ready,下次就可以从 balancer 中读取到这个地址。
sleeptime 计算规则如下,默认 BaseDelay = 1s,Multiplier=1.6,且有 0.2 的 Jitter
1 | // Backoff returns the amount of time to wait before the next retry given the |
测试可以看到每次重连以 1,2,4,8 的间隔进行重试,且每 20s 使用一个新端口建连。
1 | 02:59:53.590387 IP 10.20.141.244.44904 > 10.50.66.67.42379: Flags [S], seq 2780622427, win 29200, options [mss 1460,sackOK,TS val 5912872 ecr 0,nop,wscale 7], length 0 |
变更 hosts 地址
第二步,修改 hosts 文件
1 | 10.50.44.65 etcdserver1.com |
此时连接建立(grpc.dial)成功,调用 etcd 的 put,get 等请求时本质上是发起 grpc.Invoke 请求。
grpc 请求分为 stream 和 unary,这里属于 unary 请求
- 调用拦截器 unaryInterceptor
- 在拦截器里调用 Invoke 处理请求过程
- clientConn.getTransport 获取一个连接,出错直接返回
- 轮询从 balancer 中获取一个可用的地址
- adressConn.Wait 等待连接
- sendRequest 发起请求,生成一个 stream 对象
- recvResponse 接收响应,阻塞等待
- clientConn.getTransport 获取一个连接,出错直接返回
可以发现日志的前一部分报 code = DeadlineExceeded,context deadline exceeded,后一部分开始报 code=InvalidArgument, user name is empty,且之后一直报 user name is empty 这个错误。报错的 endpoint 为之前未连接成功的 etcderver1.com。
1 | {"level":"warn","ts":"2022-10-20T14:06:53.839+0800","logger":"etcd-client","caller":"v3@v3.5.1/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":" |
可以看到每次 retry 时使用的 client 是同一个 0xc000511340,表示没有重新创建 clientConn。每次的 attempt 都是 0,表示每次尝试都只尝试一次直接退出了
前一部分错误是因为 client 使用老的 transport,但修改了 hosts 文件,导致无法建连,因此超过 opTimeout(这里设置的 3s)。客户端代码如下
1 | func (db *BytesConnectionEtcd) Put(key string, binData []byte, opts ...datasync.PutOption) error { |
后一部分的 user name is empty,由于使用旧的 token 连接新的集群,首次会报 code = Unauthenticated desc = etcdserver: invalid auth token”。之后走下面分支重新获取 token
1 | if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { |
重新获取 token 时,显示 user name 被清空,但走读代码没有发现清空 user name 的部分。该问题先 mark,需要后续 debug 一下代码来分析。