查看其它 1 个回答nonetheless的回答

nonethelessnonetheless云原生架构师兴业数金

在容器云环境中设计高可用的Kafka集群,需要从存储,网络的高可用来方面考虑:

  1. 存储层面需要关注存储高可用的实现方案,可以通过高性能共享存储和Kafka多个Broker实例之间的复制两种方案实现,目前业内已有成熟的共享存储方案,主要介绍下第二个方案:
    Kafka通过Broker日志复制将Broker的日志文件复制到其他Broker上,从而实现数据的冗余备份和快速恢复。具体来说,Broker日志复制分为两个阶段:首先,源Broker会将数据写入本地磁盘上的日志文件中,并将其标记为可复制状态;然后,目标Broker会从源Broker获取数据,将其写入本地磁盘上的日志文件中,并将复制状态标记为已完成。在Kafka 3.0中,Broker日志复制使用了一种新的机制,称为“分片复制”分片复制将每个Broker的日志文件分为多个片段(Segment),每个片段大小固定,一般为1GB或者更小。当源Broker的一个片段被标记为可复制状态时,它会将该片段复制到一个或多个目标Broker上,每个目标Broker都会维护一个复制进度(Replica Log End Offset),用于记录它已经复制的片段。源Broker会定期检查目标Broker的复制进度,如果发现有目标Broker没有完成复制,则会重试。
    同时Kafka 3.0引入了一种新的分布式协议,称为Kraft,它是Kafka 3.0的一个重要特性。Kraft协议是一个基于Raft一致性算法的实现,用于管理Kafka集群的元数据和副本的选举、数据同步等操作,通过Kraft协议,Kafka不需要依赖于Zookeeper,在扩展性,性能,简化管理等方面有了很大提升。
  2. 网络层面需要关注Kafka如何二次寻址向Broker发送消息以及如何在容器云环境中实Rebalance,可以通过容器云CNI扩展方案支持静态PodIP固定加Underlay组网方案实现或者通过LoadBalancer方案实现。为了更好的理解网络高可用的必要性,先下Kafka客户端建立连接和Rebalance的原理,
    Kafka客户端连接到Kafka集群的流程:
      1)获取Broker元数据:客户端首先需要从Kafka集群获取Broker元数据,以了解可用的Broker和分区分配信息。Kafka客户端会向Kafka集群中的一个Broker发送一个元数据请求,获取可用的Broker列表和分区分配信息。这个元数据请求可以被任何一个Broker处理。
    2)连接Broker:客户端会从可用的Broker列表中选择一个Broker进行连接。连接可以是TCP连接或SSL连接,取决于Kafka集群的配置。
     3)发送请求:客户端连接到Broker后,可以发送请求来获取、生产或消费消息。请求包括请求类型、主题、分区、消息等信息。
     4)处理请求:Broker接收到请求后,会根据请求类型进行处理,例如读取或写入消息。
     5)返回响应:Broker处理完请求后,会向客户端返回响应。响应包括响应状态、主题、分区、消息等信息。
    6)断开连接:客户端可以选择断开连接或者保持连接。如果客户端断开连接,则需要重新执行上述步骤重新连接到Broker。
    在Kafka Broker宕机恢复后, Kafka 客户端会自动触发重平衡(rebalancing)流程,以重新分配消费者实例和分区之间的关系 :
     1)Kafka 客户端会检测到 broker 宕机新拉起节点并刷新Borker元数据 。
     2)Kafka 客户端会向 Kafka 集群协调器发送心跳,以确保它仍然是一个活跃的消费者组成员。如果协调器在一定的时间内没有收到心跳,则会将该消费者标记为失效,并将其分区重新分配给其他消费者。
     3) Kafka 客户端会向协调器请求重新分配消费者实例和分区的关系。协调器会根据消费者组的配置和现有的消费者实例列表,重新计算分区的分配结果。
     4)协调器会将新的分区分配方案发送给消费者组中的所有消费者实例。消费者实例会根据新的分配方案,重新分配自己负责的分区。
     5)消费者实例会根据新的分配方案,重新分配自己负责的分区,并在重新分配完成后重新开始消费消息。
    可以看到Kafka客户端和Server端建立连接都是通过Kakfa Broker元数据进行二次连接,需要通过broker 的 bootstrap 地址是通过 listeners 配置参数来指定的,它可以设置一个或多个监听器,每个监听器可以有一个或多个地址。 以下是一个示例配置文件:

    broker:
      id: 0
      rack: dc1
      listeners:
     INTERNAL://:9092
     EXTERNAL://:9093
     CLIENT://:9094
      advertised.listeners:
     INTERNAL://kafka1.internal:9092
     EXTERNAL://kafka1.example.com:9093
     CLIENT://kafka1.example.com:9094

    在容器云环境中需要将 EXTERNAL 地址配置为静态PodIP地址或LoadBalancer地址,来确保Kafka Broker可以正常寻址:
    1)基于静态PodIP可以通过环境变量注入,然后启动时通过环境变量渲染broker配置即可达到效果,以下例子可参考注入POD_IP:

    apiVersion: v1
    kind: Pod
    metadata:
      name: my-pod
    spec:
      containers:
      - name: my-container
     image: my-image
     env:
     - name: POD_IP
       valueFrom:
         fieldRef:
           fieldPath: status.podIP

    2) 基于LoadBalancer方案则更为复杂,通常需要通过开发单独的Kafka Operator服务来管理Kafka在容器云上的生命周期,当然该方案也是各类中间件服务容器云上的推荐方案,从可用性、兼容性、可移植性上均会更好,Kafka社区也开源了Operator,可以参考https://github.com/strimzi/strimzi-kafka-operator直接使用,也可以根据各容器云平台特点开发Kafka Operator服务。

当然Kafka在容器云上的高可用性还有很多地方可以考虑,比如基于集群联邦或Kafka MirrorMaker2 实现Kafka的机房级别容灾、Kafka容灾场景下的可观测性等,也可以参考近期信通院发布的《云原生能力成熟度模型-第5部分:中间件》行业标准进行比对。

银行 · 2023-05-13
浏览646

回答者

nonetheless
云原生架构师兴业数金
擅长领域: 云计算容器云容器

nonetheless 最近回答过的问题

回答状态

  • 发布时间:2023-05-13
  • 关注会员:3 人
  • 回答浏览:646
  • X社区推广