您好,欢迎来到叨叨游戏网。
搜索
您的当前位置:首页KafKa的存储机制和可靠性

KafKa的存储机制和可靠性

来源:叨叨游戏网

一、存储机制

        我在我的上篇文章中,写到了,其他的几款产品,其中与kafka最接近的是RocketMQ,他们两个都支持高吞吐,并且吞吐量达十万级。但是KafKa是为了解决大数据的实时日志流而生的,每天要处理的日志量级在千亿规模。对于日志流的特点主要包括:

所以kafka必然面临分布式系统所遇到的高并发、高可用、高性能等三高问题。对于大数据产生的日志来说要保证以下几点:

1、存储的主要是消息流。

2、要支持海量数据的高效存储,高持久化(设备重启之后保证数据的不丢失)。

3、要支持海量数据的高校检索(通过offset或者时间戳进行高效查询并且进行消费处理)。

4、要保证数据的安全性和稳定性,故障转移容错性(高可用)。

1、存储选型

        既然我们要将数据存储在磁盘中,所以我们要对比所有方法中IO的速度。当然内存的IO速度是快的,但是我们要存储大量的日志数据,所以不会存储在内存中。那么较快的就是普通机械磁盘的顺序IO,它略快于内存的随机IO性能。那当我们了解了这些情况的话,就基本可以确定存储的类型了:顺序IO。当然我们选定了较好的存储方式后,要提高存储性能,还要提高读写速度:

1、提高读速度:我们可以利用索引,来提高查询速度,但是有了索引,大量的写操作都会维护索引,那么会降低写入效率。常见的如关系型数据库:mysq了。

2、提高写速度:这种一般是采用日志存储, 通过顺序追加(批量)写的方式来提高写入速度,因为没有索引,无法快速查询,最严重的只能一行行遍历读取。常见的如大数据相关领域的基本都基于此方式来实现。

2、存储方案剖析

对于 Kafka 来说, 它主要用来处理海量数据流,这个场景的特点主要包括:

1. 写操作:写并发要求非常高,基本得达到百万级 TPS,顺序追加写日志即可,无需考虑更新操作

2. 读操作:相对写操作来说,比较简单,只要能按照一定规则高效查询即可(offset或者时间戳

        对于写操作来说,直接采用顺序追加写日志的方式就可以满足 Kafka 对于百万TPS写入效率要求。 所以我们重点放在如何解决高效查询这些日志。Kafka采用了稀疏哈希索引(底层基于Hash Table 实现)的方式。

        对于上面的结构,在稀疏索引表中,我们将所有的信息的offset进行存储,并且在磁盘中的位置信息也保存在这个稀疏索引中,当我们消费者去搜索信息的时候,通过稀疏索引来找到在磁盘中的位置。这样我们不需要额外引入哈希表结构,也能顺序存放了。当然在稀疏索引中我们并不是一条一条的搜索的,而是类似于那种二分查找的原理,这样可以快速锁定位置。     

3、存储架构设计

Kafka 最终的存储实现方案, 即基于顺序追加写日志 + 稀疏哈希索引。从上图可以看出Kafka 是基于「主题 + 分区 + 副本 + 分段 + 索引」的结构:

1. kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。

2. Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息 都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中。

3. Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。

4. 然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又 引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件 被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理 的时候,直接删除旧的 LogSegement 文件就可以了。

5. Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件(索引和时间戳),以及可能的其他文件。

在上面所说的segment中,会将一个大log文件进行拆分成多个小的LogSegment,这些文件的命名会以这个文件中存储的第一条消息的offset来进行命名。

$ ls /tmp/kafka-logs/test-1
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
leader-epoch-checkpoint

4、日志系统架构设计

        kafka 消息是按主题 Topic 为基础单位归类的,各个 Topic 在逻辑上是的,每个 Topic 又可以分为 一个或者多个 Partition,每条消息在发送的时候会根据分区规则被追加到指定的分区中。

1、顺序追加日志

2、PageCache

        我们知道 Kafka 是依赖文件系统来存储和缓存消息,以及典型的顺序追加写日志操作,另外它使用操作系统的 PageCache 来减少对磁盘 I/O 操作,即将磁盘的数据缓存到内存中,把对磁盘的访问转变为对内存的访问,也就是说让系统直接调用 PageCache  而经过用户态。

        在 Kafka 中,大量使用了 PageCache, 这也是 Kafka 能实现高吞吐的重要因素之一, 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据页是否在 PageCache 中,如果命中则直接返回数据,从而避免了对磁盘的 I/O 操作;如果没有命中,操作系统则会向磁盘发起读取请求并将读取的数据页存入 PageCache 中,之后再将数据返回给进程。

        同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检查数据页是否在页缓存中,如果不存在,则 PageCache 中添加相应的数据页,最后将数据写入对应的数据页。被修改过后的数据页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性

3、零拷贝(Zero-Copy)

二、可靠性

关于可靠性的一些问题:

我发消息的时候,需要等 ack 嘛?

我发了消息之后,消费者一定会收到嘛?

遇到各种故障时,我的消息会不会丢?

消费者侧会收到多条消息嘛?

消费者 svr 重启后消息会丢失嘛?

可靠性也就是保证信息的写入,那我们可以通过回答生产者来保证:

1. 发消息之后有没有 ack 。

2. 发消息收到 ack 后,是不是消息就不会丢失了?

我们可以配置 Kafka 来指定 producer 生产者在发送消息时的 ack 策略:

# -1(全量同步确认,强可靠性保证)
Request.required.acks= -1
# 1(leader 确认收到, 默认)
Request.required.acks = 1
# 0(不确认,但是吞吐量大)
Request.required.acks = 0

1、CP(Consistency & Partition tolerance) 

request.required.acks=-1
min.insync.replicas = ${N/2 + 1}
unclean.leader.election.enable = false

        如图所示,在 acks=-1 的情况下,新消息只有被 ISR 中的所有 follower(f1 和 f2, f3) 都从 leader 复制过去才会回 ack, 回 ack 后,无论那种机器故障情况(全部或部分), 写入的 msg4,都不会丢失, 消息状态满足一致性 C 要求。 正常情况下,所有 follower 复制完成后,leader 回 producer ack。 异常情况下,如果当数据发送到 leader 后部分副本(f1 和 f2 同步), leader 挂了?

        此时任何 follower 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,但这样数据可能会重复(但不会丢失), 暂不考虑数据重复的情况。 min.insync.replicas 参数用于保证当前集群中处于正常同步状态的副本 follower 数量,当实际值小于配置值时,集群停止服务。如果配置为 N/2+1, 即多一半的数量,则在满足此条件下,通过算法保证强一致性。当不满足配置数时,牺牲可用性即停服。 异常情况下,leader 挂掉,此时需要重新从 follower 选举 leader。可以为 f2 或者 f3。

        当挂掉的时候,有个副本没有同步到数据,如果选择这个副本作为leader,那么就会发生消息截断,因为它这个副本还没同步完数据。那么我们可以通过配置参数 enable 这个来控制此种情况,是否可以选择这个副本。

        通过 ack 和 min.insync.replicas 和 unclean.leader.election.enable 的配合,保证在 kafka 配置为 CP 系统时,要么不工作,要么得到 ack 后,消息不会丢失且消息状态一致。

2、AP(Availability & Partition tolerance)

request.required.acks=1
min.insync.replicas = 1
unclean.leader.election.enable = false

        当配置为 acks=1 时,即 leader 接收消息后回 ack,这时会出现消息丢失的问题:如果 leader 接受到了第 4 条消息,此时还没有同步到 follower 中,leader 机器挂了,其中一个 follower 被选为 leader, 则第 4 条消息丢失了。当然这个也需要 unclean.leader.election.enable 参数配置为 false 来配合。

        但是 leader 回 ack 的情况下,follower 未同步的概率会大大提升。 通过 producer 策略的配置和 kafka 集群通用参数的配置,可以针对自己的业务系统特点来进行合理的 参数配置,在通讯性能和消息可靠性下寻得某种平衡。

3、Broker 的可靠性保证

消息通过 producer 发送到 broker 之后,还会遇到很多问题:

Partition leader 写入成功, follower 什么时候同步?

Leader 写入成功,消费者什么时候能读到这条消息?

Leader 写入成功后,leader 重启,重启后消息状态还正常嘛?

Leader 重启,如何选举新的 leader?

LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。

HW: HighWaterMark的缩写,是指consumer能够看到的此partition的位置。 取一个partition对 应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。

        在同步过程中不同的副本同步的效率不尽相同,在某一时刻follower1完全跟上了leader副本而 follower2只同步了消息3,如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO 为4,那 么当前分区的HW取最小值4,此时消费者可以消费到offset0至3之间的消息。

        由此可见,HW用于标识消费者可以读取的最大消息位置,LEO用于标识消息追加到文件的最后位置。 如果消息发送成功,不代表消费者可以消费这条消息。

感谢大家的观看。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- gamedaodao.net 版权所有 湘ICP备2024080961号-6

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务