Skip to content

Kafka 存储

Kafka 中每个Topic的Partition被分开存储在数据目录的独立文件夹中,文件夹名称为:topic名称+partition序号。每个目录中包含以下文件:

  • xxx.index
  • xxx.log
  • xxx.snapshot
  • xxx.timeindex
  • leader-epoch-checkpoint

虽然上层应用来说,Partition可以看成是Kafka的最小存储单元,但是存储层面每个Partition都是由若干Segment拼接而成的。每一个Segment包含:xxx.index 和 xxx.log 两种类型的数据。

Segment文件的命名方式为:Partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充!

实际生产环境中,Kafka清理过期记录时会删除对应的Segment文件。通常单个Segment的大小和老化策略可以通过配置控制。

“.index”是“.log”的索引文件,实际message数据保存在“.log”文件中。从Partition中查找message时,kafka先根据offset大小定位message所在的Segment,然后从索引文件中获取Message的物理偏移地址,再到log文件中读取相应数据。

Kafka 副本机制

Kafka每个Partition包含若干Replica,并且这些Replica会选举中一个Leader用来接收用户数据,其他Replica被动从Leader同步数据,称为follower。

通常情况下,每个Partition包含一个ISR列表(in-sync replicas),列表中包含所以处于同步状态的Replica,当Leader选举时优先从ISR中的Replica选择。

ISR 的成员是动态的,通过replica.lag.max.messages replica.lag.time.max.ms,前者似乎已经被废弃了。当replica重新追上Leader后,会被重新加入ISR。

  • replica.lag.time.max.ms:replicas响应partition leader的最长等待时间,如果超过这个时间replicas将会被踢出ISR。

  • replica.lag.max.messages:ISR中Replica的最大滞后,2.0版本中这个参数已经被删去了

LEO和HW

LEO是LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。

HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置。

Kafka有两套follower的LEO信息

  • Follower保存各自LEO值:用来帮助Follower更新HW值;
  • Leader保存所有Replica的LEO值:用来帮助Leader更新HW值;

LEO&HW更新机制

Follower向Leader发送FETCH请求,Leader将数据返回给Follower,此时Follower开始向底层log写数据,从而自动地更新自身的LEO值。

Leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给Follower返回数据之前它先去更新Follower的LEO(Leader保存所有Replica的LEO)。

Follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它会尝试更新它自己的HW值。比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值

Leader的HW值即为分区的HW值,更新时Leader比较ISR中所有的LEO值,选取最小值作为分区HW。

以下4种情况Leader会尝试更新分区HW:

  • 副本成为leader副本;
  • broker出现崩溃导致副本被踢出ISR;
  • producer向leader副本写入消息;
  • leader处理follower FETCH请求;

Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。(参考连接有比较详细的讨论)

Leader Epoch

Leader Epoch在Kafka 0.11之后的版本用于替代HW机制。该机制中,Leader保存以下两个值:

  • epoch:Leader的版本号,从0开始变更过1次时epoch就+1
  • offset:该epoch版本的Leader写入的第一条消息的位移

Leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件(在对应的Partiton文件夹中)。

当Leader写底层log时它会尝试更新整个缓存,如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。

acks配置

配置request.required.acks参数可以设置数据可靠性的级别:

request.required.acks=1(默认):仅Leader写入确认。

request.required.acks=0:不进行任何确认

request.required.acks=-1/all:含leader在内确认min.insync.replicas个replica。

Leader选举

Kafka中只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。

在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 等待ISR中任意一个replica“活”过来,并且选它作为leader(unclean.leader.election.enable=false)
  • 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader(unclean.leader.election.enable=true)

参考Kafka数据可靠性深度解读中,【3.6 Leader选举】 描述了不同条件下Replica掉线对Kafka可用性的影响。

官方关于该机制的介绍KIP-101(太长不看版)。

参考

Kafka数据可靠性深度解读

Kafka水位(high watermark)与leader epoch的讨论