Kafka 存储¶
Kafka 中每个Topic的Partition被分开存储在数据目录的独立文件夹中,文件夹名称为:topic名称+partition序号。每个目录中包含以下文件:
- xxx.index
- xxx.log
- xxx.snapshot
- xxx.timeindex
- leader-epoch-checkpoint
虽然上层应用来说,Partition可以看成是Kafka的最小存储单元,但是存储层面每个Partition都是由若干Segment拼接而成的。每一个Segment包含:xxx.index 和 xxx.log 两种类型的数据。
Segment文件的命名方式为:Partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充!
实际生产环境中,Kafka清理过期记录时会删除对应的Segment文件。通常单个Segment的大小和老化策略可以通过配置控制。
“.index”是“.log”的索引文件,实际message数据保存在“.log”文件中。从Partition中查找message时,kafka先根据offset大小定位message所在的Segment,然后从索引文件中获取Message的物理偏移地址,再到log文件中读取相应数据。
Kafka 副本机制¶
Kafka每个Partition包含若干Replica,并且这些Replica会选举中一个Leader用来接收用户数据,其他Replica被动从Leader同步数据,称为follower。
通常情况下,每个Partition包含一个ISR列表(in-sync replicas),列表中包含所以处于同步状态的Replica,当Leader选举时优先从ISR中的Replica选择。
ISR 的成员是动态的,通过replica.lag.max.messages和 replica.lag.time.max.ms,前者似乎已经被废弃了。当replica重新追上Leader后,会被重新加入ISR。
-
replica.lag.time.max.ms:replicas响应partition leader的最长等待时间,如果超过这个时间replicas将会被踢出ISR。
-
replica.lag.max.messages:ISR中Replica的最大滞后,2.0版本中这个参数已经被删去了
LEO和HW¶
LEO是LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。
HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置。
Kafka有两套follower的LEO信息:
- Follower保存各自LEO值:用来帮助Follower更新HW值;
- Leader保存所有Replica的LEO值:用来帮助Leader更新HW值;
LEO&HW更新机制¶
Follower向Leader发送FETCH请求,Leader将数据返回给Follower,此时Follower开始向底层log写数据,从而自动地更新自身的LEO值。
Leader接收到follower发送的FETCH请求,它首先会从自己的log中读取相应的数据,但是在给Follower返回数据之前它先去更新Follower的LEO(Leader保存所有Replica的LEO)。
Follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它会尝试更新它自己的HW值。比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。
Leader的HW值即为分区的HW值,更新时Leader比较ISR中所有的LEO值,选取最小值作为分区HW。
以下4种情况Leader会尝试更新分区HW:
- 副本成为leader副本;
- broker出现崩溃导致副本被踢出ISR;
- producer向leader副本写入消息;
- leader处理follower FETCH请求;
Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。(参考连接有比较详细的讨论)
Leader Epoch¶
Leader Epoch在Kafka 0.11之后的版本用于替代HW机制。该机制中,Leader保存以下两个值:
- epoch:Leader的版本号,从0开始变更过1次时epoch就+1
- offset:该epoch版本的Leader写入的第一条消息的位移
Leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件(在对应的Partiton文件夹中)。
当Leader写底层log时它会尝试更新整个缓存,如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。
acks配置¶
配置request.required.acks参数可以设置数据可靠性的级别:
request.required.acks=1(默认):仅Leader写入确认。
request.required.acks=0:不进行任何确认
request.required.acks=-1/all:含leader在内确认min.insync.replicas个replica。
Leader选举¶
Kafka中只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中任意一个replica“活”过来,并且选它作为leader(unclean.leader.election.enable=false)
- 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader(unclean.leader.election.enable=true)
参考Kafka数据可靠性深度解读中,【3.6 Leader选举】 描述了不同条件下Replica掉线对Kafka可用性的影响。
官方关于该机制的介绍KIP-101(太长不看版)。