kafka 进阶

Author Avatar
Sean Yu 12月 17, 2019
  • 在其它设备中阅读本文章

日志存储

Kafka中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset)

如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log和LogSegment也不是纯粹物理意义上的概念,Log在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。在图51中又补充了Log和LogSegment的关系。

r6mRFU.png

Log对应了一个命名形式为<topic><partition>的文件夹。举个例子,假设有一个名为“topiclog”的主题,此主题中具有4个分区,那么在实际物理存储上表现为“topiclog0”“topiclog1”“topiclog2”“topiclog3”这4个文件夹:

r6nPTf.png

向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。为了方便描述,我们将最后一个LogSegment称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。

为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个LogSegment都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为00000000000000000000.log。

举例说明,向主题topiclog中发送一定量的消息,某一时刻topiclog0目录中的布局如下所示。

r6nJ1J.png

示例中第2个LogSegment对应的基准位移是133,也说明了该LogSegment中的第一条消息的偏移量为133,同时可以反映出第一个LogSegment中共有133条消息(偏移量从0至132的消息)。

日志索引

本章开头就提及了每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka中的索引文件以稀疏索引(sparseindex)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由broker端参数log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

偏移量索引

偏移量索引项的格式如图58所示。每个索引项占用8个字节,分为两个部分。(

  • relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,占用4个字节,当前索引文件的文件名即为baseOffset的值。
  • position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。

r63IZd.png

消息的偏移量(offset)占用8个字节,也可以称为绝对偏移量。索引项中没有直接使用绝对偏移量而改为只占用4个字节的相对偏移量(relativeOffset=offsetbaseOffset),这样可以减小索引文件占用的空间。举个例子,一个日志分段的baseOffset为32,那么其文件名就是00000000000000000032.log,offset为35的消息在索引文件中的relativeOffset的值为3532=3。

我们以本章开头topiclog0目录下的00000000000000000000.index为例来进行具体分析,截取00000000000000000000.index部分内容如下:

r6Y8yT.png

我们这里给出00000000000000000000.index和00000000000000000000.log的对照图来做进一步的陈述,如图59所示。

r6tAhR.png

如果我们要查找偏移量为23的消息,那么应该怎么做呢?首先通过二分法在偏移量索引文件中找到不大于23的最大索引项,即[22,656],然后从日志分段文件中的物理位置656开始顺序查找偏移量为23的消息。

以上是最简单的一种情况。参考图510,如果要查找偏移量为268的消息,那么应该怎么办呢?首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset=268251=17,之后再在对应的索引文件中找到不大于17的索引项,最后根据索引项中的position定位到具体的日志分段文件位置开始查找目标消息。那么又是如何查找baseOffset为251的日志分段的呢?这里并不是顺序查找,而是用了跳跃表的结构。Kafka的每个日志对象中使用了ConcurrentSkipListMap来保存各个日志分段,每个日志分段的baseOffset作为key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

r6NkqS.png

还需要注意的是,Kafka强制要求索引文件大小必须是索引项大小的整数倍,对偏移量索引文件而言,必须为8的整数倍。如果broker端参数log.index.size.max.bytes配置为67,那么Kafka在内部会将其转换为64,即不大于67,并且满足为8的整数倍的条件。

时间戳索引

r6NgJA.png

每个索引项占用12个字节,分为两个部分。

  • timestamp:当前日志分段最大的时间戳。
  • relativeOffset:时间戳所对应的消息的相对偏移量。

时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的timestamp必须大于之前追加的索引项的timestamp,否则不予追加。如果broker端参数log.message.timestamp.type设置为LogAppendTime,那么消息的时间戳必定能够保持单调递增;相反,如果是CreateTime类型则无法保证。生产者可以使用类似ProducerRecord(Stringtopic,Integerpartition,Longtimestamp,Kkey,Vvalue)的方法来指定时间戳的值。即使生产者客户端采用自动插入的时间戳也无法保证时间戳能够单调递增,如果两个不同时钟的生产者同时往一个分区中插入消息,那么也会造成当前分区的时间戳乱序。

我们已经知道每当写入一定量的消息时,就会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。两个文件增加索引项的操作是同时进行的,但并不意味着偏移量索引中的relativeOffset和时间戳索引项中的relativeOffset是同一个值。

我们画出00000000000000000000.timeindex的具体结构,详细参考图512左上角。

r6USwF.png

如果要查找指定时间戳targetTimeStamp=1526384718288开始的消息,首先是找到不小于指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段了,需要分以下几个步骤来完成。

  • 将targetTimeStamp和每个日志分段中的最大时间戳largestTimeStamp逐一对比,直到找到不小于targetTimeStamp的largestTimeStamp所对应的日志分段。日志分段中的largestTimeStamp的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。
  • 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp的最大索引项,即[1526384718283,28],如此便找到了一个相对偏移量28。
  • 在偏移量索引文件中使用二分算法查找到不大于28的最大索引项,即[26,838]。
  • 从步骤1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp的消息。

日志清理

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka中每一个分区副本都对应一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理策略。

  • 日志删除(LogRetention):按照一定的保留策略直接删除不符合条件的日志分段。
  • 日志压缩(LogCompaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

磁盘存储(回答kafka为什么快)

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的I/O操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。

Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因素之一。

虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能。同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过笔者并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

磁盘I/O流程

一张磁盘I/O的流程图

r6UOtH.png

一般磁盘I/O的场景有以下四种。

  • 用户调用标准C库进行I/O操作,数据流为:应用程序buffer→C库标准IObuffer→文件系统页缓存→通过具体文件系统到磁盘。
  • 用户调用文件I/O,数据流为:应用程序buffer→文件系统页缓存→通过具体文件系统到磁盘。
  • 用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘。
  • 用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接写磁盘。

发起I/O请求的步骤可以表述为如下的内容

  • 写操作:用户调用fwrite把数据写入C库标准IObuffer后就返回,即写操作通常是异步操作;数据写入C库标准IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write调用)页缓存;数据到达页缓存后也不会立即刷新到磁盘,内核有pdflush线程在不停地检测脏页,判断是否要写回到磁盘,如果是则发起磁盘I/O请求。
  • 读操作:用户调用fread到C库标准IObuffer中读取数据,如果成功则返回,否则继续;到页缓存中读取数据,如果成功则返回,否则继续;发起I/O请求,读取数据后缓存buffer和C库标准IObuffer并返回。可以看出,读操作是同步请求。
  • I/O请求处理:通用块层根据I/O请求构造一个或多个bio结构并提交给调度层;调度器将bio结构进行排序和合并组织成队列且确保读写操作尽可能理想:将一个或多个进程的读操作合并到一起读,将一个或多个进程的写操作合并到一起写,尽可能变随机为顺序(因为随机读写比顺序读写要慢),读必须优先满足,而写也不能等太久。

kafka是消息顺序追加,可是实现顺序io

零拷贝

除了消息顺序追加、页缓存等技术,Kafka还使用零拷贝(ZeroCopy)技术来进一步提升性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对Linux操作系统而言,零拷贝技术依赖于底层的sendfile()方法实现。对应于Java语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。

单纯从概念上理解“零拷贝”比较抽象,这里简单地介绍一下它。考虑这样一种常用的情形:你需要将静态内容(类似图片、文件)展示给用户。这个情形就意味着需要先将静态内容从磁盘中复制出来放到一个内存buf中,然后将这个buf通过套接字(Socket)传输给用户,进而用户获得静态内容。这看起来再正常不过了,但实际上这是很低效的流程,我们把上面的这种情形抽象成下面的过程:

首先调用read()将静态内容(这里假设为文件A)读取到tmp_buf,然后调用write()将tmp_buf写入Socket,如图523所示。在这个过程中,文件A经历了4次复制的过程:

  • 调用read()时,文件A中的内容被复制到了内核模式下的ReadBuffer中。
  • CPU控制将内核模式数据复制到用户模式下。
  • 调用write()时,将用户模式下的内容复制到内核模式下的SocketBuffer中。- 将内核模式下的SocketBuffer的数据复制到网卡设备中传送。

r6dOSA.png

从上面的过程可以看出,数据平白无故地从内核模式到用户模式“走了一圈”,浪费了2次复制过程:第一次是从内核模式复制到用户模式;第二次是从用户模式再复制回内核模式,即上面4次过程中的第2步和第3步。而且在上面的过程中,内核和用户模式的上下文的切换也是4次。

如果采用了零拷贝技术,那么应用程序可以直接请求内核把磁盘中的数据传输给Socket,如图524所示。

r6dvOP.png

零拷贝技术通过DMA(DirectMemoryAccess)技术将文件内容复制到内核模式下的ReadBuffer中。不过没有数据被复制到SocketBuffer,相反只有包含数据的位置和长度的信息的文件描述符被加到SocketBuffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。

Kafka 速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗,通过 mmap 提高 I/O 速度,写入数据的时候由于单个 Partion 是末尾添加所以速度最优;读取数据的时候配合 sendfile 直接暴力输出。

副本分析

LEO与HW

LEO 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO,ISR 中最小的 LEO 即为 HW,俗称高水位,消费者只能拉取到 HW 之前的消息。

消息追加过程:

  • 生产者客户端发送消息至 leader 副本(副本1)中。
  • 消息被追加到 leader 副本的本地日志,并且会更新日志的偏移量。
  • follower 副本(副本2和副本3)向 leader 副本请求同步数据。
  • leader 副本所在的服务器读取本地日志,并更新对应拉取的 follower 副本的信息。
  • leader 副本所在的服务器将拉取结果返回给 follower 副本。
  • follower 副本收到 leader 副本返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

我们再来分析在这个过程中各个副本 LEO 和 HW 的变化情况。下面的示例采用同上图中相同的环境背景,如下图(左)所示,生产者一直在往 leader 副本(带阴影的方框)中写入消息。某一时刻,leader 副本的 LEO 增加至5,并且所有副本的 HW 还都为0。

r6zkCV.png

之后 follower 副本(不带阴影的方框)向 leader 副本拉取消息,在拉取的请求中会带有自身的 LEO 信息。leader 副本返回给 follower 副本相应的消息,并且还带有自身的 HW 信息。

此时两个 follower 副本各自拉取到了消息,并更新各自的 LEO 为3和4。与此同时,follower 副本还会更新自己的 HW,更新 HW 的算法是比较当前 LEO 和 leader 副本中传送过来的HW的值,取较小值作为自己的 HW 值。当前两个 follower 副本的 HW 都等于0(min(0,0) = 0)。

接下来 follower 副本再次请求拉取 leader 副本中的消息,如下图(左)所示。

r6zdUI.png

此时 leader 副本收到来自 follower 副本的 FetchRequest 请求,其中带有 LEO 的相关信息,选取其中的最小值作为新的 HW,即 min(15,3,4)=3。然后连同消息和 HW 一起返回 FetchResponse 给 follower 副本,如上图(右)所示。注意 leader 副本的 HW 是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。

两个 follower 副本在收到新的消息之后更新 LEO 并且更新自己的 HW 为3(min(LEO,3)=3)。

在一个分区中,leader 副本所在的节点会记录所有副本的 LEO,而 follower 副本所在的节点只会记录自身的 LEO,而不会记录其他副本的 LEO。对 HW 而言,各个副本所在的节点都只记录它自身的 HW。

为什么不支持读写分离

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从而实现的是一种主写主读的生产消费模型。Kafka 并不支持主写从读,这是为什么呢?

这块原作者写的不太好,我的理解是基于pull模式维护副本的kafka,数据不一致的问题会更加明显。此外kafka本身就是分布式结构了,做读写分离意义不大。

日志同步

在 Kafka 中动态维护着一个 ISR 集合,处于 ISR 集合内的节点保持与 leader 相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable 配置为 false)才有资格被选为新的 leader。写入消息时只有等到所有ISR集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为 leader,选举过程简单、开销低,这也是 Kafka 选用此模型的重要因素。Kafka 中包含大量的分区,leader 副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响 Kafka 的性能指标。在采用 ISR 模型和(f+1)个副本数的配置下,一个 Kafka 分区能够容忍最大f个节点失败。

自定义ttl

通过消息的 timestamp 字段和 ConsumerInterceptor 接口的 onConsume() 方法来实现消息的 TTL 功能,还需要消息中的 headers 字段来做配合。将消息的 TTL 的设定值以键值对的形式保存在消息的 headers 字段中,这样消费者消费到这条消息的时候可以在拦截器中根据 headers 字段设定的超时时间来判断此条消息是否超时。

headers 字段涉及 Headers 和 Header 两个接口,Headers 是对多个 Header 的封装,Header 接口表示的是一个键值对,具体实现如下:

package org.apache.kafka.common.header;

public interface Header {
    String key();
    byte[] value();
}

可以直接使用 Kafka 提供的实现类 org.apache.kafka.common.header.internals.RecordHeaders 和 org.apache.kafka.common.header.internals.RecordHeader。这里只需使用一个 Header,key 可以固定为“ttl”,而 value 用来表示超时的秒数,超时时间一般用 Long 类型表示,但是 RecordHeader 中的构造方法 RecordHeader(String key, byte[] value) 和 value() 方法的返回值对应的 value 都是 byte[] 类型,这里还需要一个小工具实现long与 byte[] 的互转

下面我们向 Kafka 中发送3条 TTL 分别为20秒、5秒和30秒的3条消息,主要代码如代码清单18-1所示。

// 代码清单18-1 发送自定义TTL消息的主要代码
ProducerRecord<String, String> record1 =
        new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                null, "msg_ttl_1",new RecordHeaders().add(new RecordHeader("ttl",
                        BytesUtils.longToBytes(20))));
ProducerRecord<String, String> record2 = //超时的消息
        new ProducerRecord<>(topic, 0, System.currentTimeMillis()-5*1000,
                null, "msg_ttl_2",new RecordHeaders().add(new RecordHeader("ttl",
                        BytesUtils.longToBytes(5))));
ProducerRecord<String, String> record3 =
        new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                null, "msg_ttl_3",new RecordHeaders().add(new RecordHeader("ttl",
                        BytesUtils.longToBytes(30))));
producer.send(record1).get();
producer.send(record2).get();
producer.send(record3).get();

ProducerRecord 中包含 Headers 字段的构造方法只有2个,具体如下:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)

代码清单18-1中指定了分区编号为0和消息 key 的值为 null,其实这个示例中我们并不需要指定这2个值,但是碍于 ProducerRecord 中只有2种与 Headers 字段有关的构造方法。其实完全可以扩展 ProducerRecord 中的构造方法.

很显然代码清单18-1中的第2条消息 record2 是故意被设定为超时的,因为这条消息的创建时间为 System.currentTimeMillis()-5×1000,往前推进了5秒,而这条消息的超时时间也为5秒。如果在发送这3条消息的时候也开启了消费者,那么经过拦截器处理后应该只会收到“msg_ttl_1”和“msg_ttl_3”这两条消息。

// 代码清单18-2 自定义TTL的拦截器关键代码实现
@Override
public ConsumerRecords<String, String> onConsume(
        ConsumerRecords<String, String> records) {
    long now = System.currentTimeMillis();
    Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
            = new HashMap<>();
    for (TopicPartition tp : records.partitions()) {
        List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
        List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
        for (ConsumerRecord<String, String> record : tpRecords) {
            Headers headers = record.headers();
            long ttl = -1;
            for (Header header : headers) {//判断headers中是否有key为“ttl”的Header
                if (header.key().equalsIgnoreCase("ttl")) {
                    ttl = BytesUtils.bytesToLong(header.value());
                }
            }
            //消息超时判定
            if (ttl > 0 && now - record.timestamp() < ttl * 1000) {
                newTpRecords.add(record); 
            } else {//没有设置TTL,不需要超时判定
                newTpRecords.add(record);
            }
        }
        if (!newTpRecords.isEmpty()) {
            newRecords.put(tp, newTpRecords);
        }
    }
    return new ConsumerRecords<>(newRecords);
}

代码清单18-2中判断每条消息的 headers 字段中是否包含 key 为“ttl”的 Header,如果包含则对其进行超时判定;如果不包含,则不需要超时判定,即无须拦截处理。

使用这种方式实现自定义消息 TTL 时同样需要注意的是:使用类似中这种带参数的位移提交的方式,有可能会提交错误的位移信息。在一次消息拉取的批次中,可能含有最大偏移量的消息会被消费者拦截器过滤。不过这个也很好解决,比如在过滤之后的消息集中的头部或尾部设置一个状态消息,专门用来存放这一批消息的最大偏移量。

kafka中的延时操作

两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,这样徒耗资源,显然不太合理。

这里就涉及到了Kafka延迟操作的概念。Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。

时间轮

JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O(nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)。时间轮的应用并非 Kafka 独有,其应用场景还有很多,在 Netty、Akka、Quartz、ZooKeeper 等组件中都存在时间轮的踪影。

rcVU4e.png

如上图所示,Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize 计算得出。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 中的所有任务。

若时间轮的 tickMs 为 1ms 且 wheelSize 等于20,那么可以计算得出总体时间跨度 interval 为20ms。

初始情况下表盘指针 currentTime 指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的 TimerTaskList 中。随着时间的不断推移,指针 currentTime 不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的 TimeTaskList 中的任务进行相应的到期操作。此时若又有一个定时为8ms的任务插进来,则会存放到时间格10中,currentTime 再过8ms后会指向时间格10。

如果同时有一个定时为19ms的任务插进来怎么办?新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针 currentTime 的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在 currentTime 和 currentTime+interval 之间。

如果此时有一个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小?Kafka 中不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个 wheelSize 为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

rcZA8H.png

如上图所示,复用之前的案例,第一层的时间轮 tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即20ms。每一层时间轮的 wheelSize 是固定的,都是20,那么第二层的时间轮的总体时间跨度 interval 为400ms。以此类推,这个400ms也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为8000ms。

对于之前所说的 350ms 的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的 TimerTaskList。如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的 TimerTaskList。注意到在到期时间为 [400ms,800ms) 区间内的多个任务(比如 446ms、455ms 和 473ms 的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的 TimerTaskList 的超时时间为 400ms。

随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中。再经历 40ms 之后,此时这个任务又被“察觉”,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作。

设计源于生活。我们常见的钟表就是一种具有三层结构的时间轮,第一层时间轮 tickMs=1s、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层 tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。

在 Kafka 中,第一层时间轮的参数同上面的案例一样:tickMs=1ms、wheelSize=20、interval=20ms,各个层级的 wheelSize 也固定为20,所以各个层级的 tickMs 和 interval 也可以相应地推算出来。Kafka 在具体实现时间轮 TimingWheel 时还有一些小细节:

  • TimingWheel 在创建的时候以当前系统时间为第一层时间轮的起始时间(startMs),这里的当前系统时间并没有简单地调用 System.currentTimeMillis(),而是调用了 Time.SYSTEM.hiResClockMs,这是因为 currentTimeMillis() 方法的时间精度依赖于操作系统的具体实现,有些操作系统下并不能达到毫秒级的精度,而 Time.SYSTEM.hiResClockMs 实质上采用了 System.nanoTime()/1_000_000 来将精度调整到毫秒级。
  • TimingWheel 中的每个双向环形链表 TimerTaskList 都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点,那么线性表的第一个元素应该是链表的第二个节点。
  • 除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的 currentTime。每一层的 currentTime 都必须是 tickMs 的整数倍,如果不满足则会将 currentTime 修剪为 tickMs 的整数倍,以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime = startMs - (startMs % tickMs)。currentTime 会随着时间推移而推进,但不会改变为 tickMs 的整数倍的既定事实。若某一时刻的时间为 timeMs,那么此时时间轮的 currentTime = timeMs - (timeMs % tickMs),时间每推进一次,每个层级的时间轮的 currentTime 都会依据此公式执行推进。
  • Kafka 中的定时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用,以此层级调用可以实现定时器间接持有各个层级时间轮的引用。

Kafka 中的定时器借了 JDK 中的 DelayQueue 来协助推进时间轮。具体做法是对于每个使用到的 TimerTaskList 都加入 DelayQueue,“每个用到的 TimerTaskList”特指非哨兵节点的定时任务项 TimerTaskEntry 对应的 TimerTaskList。DelayQueue 会根据 TimerTaskList 对应的超时时间 expiration 来排序,最短 expiration 的 TimerTaskList 会被排在 DelayQueue 的队头。

Kafka 中会有一个线程来获取 DelayQueue 中到期的任务列表,有意思的是这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。当“收割机”线程获取 DelayQueue 中超时的任务列表 TimerTaskList 之后,既可以根据 TimerTaskList 的 expiration 来推进时间轮的时间,也可以就获取的 TimerTaskList 执行相应的操作,对里面的 TimerTaskEntry 该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

注意对定时任务项 TimerTaskEntry 的插入和删除操作而言,TimingWheel时间复杂度为 O(1),性能高出 DelayQueue 很多,如果直接将 TimerTaskEntry 插入 DelayQueue,那么性能显然难以支撑。就算我们根据一定的规则将若干 TimerTaskEntry 划分到 TimerTaskList 这个组中,然后将 TimerTaskList 插入 DelayQueue,如果在 TimerTaskList 中又要多添加一个 TimerTaskEntry 时该如何处理呢?对 DelayQueue 而言,这类操作显然变得力不从心。

分析到这里可以发现,Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry 的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的 expiration 为 200ms,第二个超时任务为 840ms,这里获取 DelayQueue 的队头只需要 O(1) 的时间复杂度(获取之后 DelayQueue 内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用 DelayQueue 来辅助以少量空间换时间,从而做到了“精准推进”。Kafka 中的定时器真可谓“知人善用”,用 TimingWheel 做最擅长的任务添加和删除操作,而用 DelayQueue 做最擅长的时间推进工作,两者相辅相成。

延时操作

如果在使用生产者客户端发送消息的时候将 acks 参数设置为-1,那么就意味着需要等待 ISR 集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

如下面3张图所示,假设某个分区有3个副本:leader、follower1 和 follower2,它们都在分区的 ISR 集合中。为了简化说明,这里我们不考虑 ISR 集合伸缩的情况。Kafka 在收到客户端的生产请求(ProduceRequest)后,将消息3和消息4写入 leader 副本的本地日志文件。由于客户端设置了 acks 为-1,那么需要等到 follower1 和 follower2 两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1 副本或 follower2 副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数 request.timeout.ms 配置,默认值为30000,即30s。

rcVEn0.png

那么这里等待消息3和消息4写入 follower1 副本和 follower2 副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入 leader 副本的本地日志文件之后,Kafka 会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。

延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

就延时生产操作而言,它的外部事件是所要写入消息的某个分区的 HW(高水位)发生增长。也就是说,随着 follower 副本不断地与 leader 副本进行消息同步,进而促使HW进一步增长,HW 每增长一次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响应结果给客户端;如果在超时时间内始终无法完成,则强制执行。

延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。在第7节中提及时间轮的轮转是靠“收割机”线程 ExpiredOperationReaper 来驱动的,这里的“收割机”线程就是由延时操作管理器启动的。也就是说,定时器、“收割机”线程和延时操作管理器都是一一对应的。延时操作需要支持外部事件的触发,所以还要配备一个监听池来负责监听每个分区的外部事件—查看是否有分区的HW发生了增长。另外需要补充的是,ExpiredOperationReaper 不仅可以推进时间轮,还会定期清理监听池中已完成的延时操作。

rceR6s.png

上图描绘了客户端在请求写入消息到收到响应结果的过程中与延时生产操作相关的细节,在了解相关的概念之后应该比较容易理解:如果客户端设置的 acks 参数不为-1,或者没有成功的消息写入,那么就直接返回结果给客户端,否则就需要创建延时生产操作并存入延时操作管理器,最终要么由外部事件触发,要么由超时触发而执行。

rceokT.png

延时队列

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

如果采用这种方案,那么一般是按照不同的延时等级来划分的,比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间,这样延时误差控制在两个延时等级的时间差范围之内(比如延时时间为17s的消息投递到30s的延时主题中,之后按照延时时间为30s进行计算,延时误差为13s)。虽然有一定的延时误差,但是误差可控,并且这样只需增加少许的主题就能实现延时队列的功能。

rciBPs.png

如上图所示,生产者 Producer 发送若干延时时间不同的消息到主题 real_topic_A 和 real_topic_B 中,消费者 Consumer 订阅并消费主题 real_topic_A 和 real_topic_B 中的消息,对用户而言,他看到的就是这样一个流程。但是在内部,Producer 会根据不同的延时时间将消息划分为不同的延时等级,然后根据所划分的延时等级再将消息发送到对应的内部主题中,比如5s内的消息发送到 delay_topic_1,6s至10s的消息划分到 delay_topic_2 中。这段内部的转发逻辑需要开发人员对生产者客户端做一些改造封装,可以根据消息的 timestamp 字段、headers 字段(设置延时时间),以及生产者拦截器来实现具体的代码。

发送到内部主题(delaytopic*)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。

rcicrT.png

针对不同延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。如下图所示,DelayService 的设计应当尽量保持简单,避免锁机制产生的隐患。

rci4i9.png

为了保障内部 DelayQueue 不会因为未处理的消息过多而导致内存的占用过大,DelayService 会对主题中的每个分区进行计数,当达到一定的阈值之后,就会暂停拉取该分区中的消息。

同一分区中的消息的延时级别一样,也就意味着延时时间一样,那么对同一个分区中的消息而言,也就自然而然地按照投递时间进行有序排列,那么为何还需要 DelayQueue 的存在呢?因为一个主题中一般不止一个分区,分区之间的消息并不会按照投递时间进行排序,那么可否将这些主题都设置为一个分区呢?这样虽然可以简化设计,但同时却丢弃了动态扩展性,原本针对某个主题的发送或消费性能不足时,可以通过增加分区数进行一定程度上的性能提升。

前面我们也提到了,这种延时队列的实现方案会有一定的延时误差,无法做到秒级别的精确延时,不过一般应用对于延时的精度要求不会那么高,只要延时等级设定得合理,这个实现方案还是能够具备很大的应用价值。

那么有没有延时精度较高的实现方案?

第一种思路,在生产者这一层面我们采取延时操作来发送消息,这样原本立刻发送出去的消息被缓存在了客户端中以等待延时条件的满足。这种思路有明显的弊端:如果生产者中缓存的消息过多,则必然引起内存的暴涨;消息可靠性也很差,如果生产者发生了异常,那么这部分消息也就丢失了,除非配套相应的重发机制。

第二种思路,在 Kafka 服务中增加一个前置缓存,生产者还是正常将消息发往 Kafka 中,Kafka 在判定消息是延时消息时(可以增加一个自定义协议,与发送普通消息的 PRODUCE 协议分开,比如 DELAY_PRODUCE,作为发送延时消息的专用协议)就将消息封装成延时操作并暂存至缓存中,待延时操作触发时就会将消息发送到真实的主题中,整体架构上与上图中所描述的类似。这种思路也有消息可靠性的问题,如果缓存延时操作的那台服务器宕机,那么消息也会随之丢失,为此我们可以引入缓存多副本的机制,如下图所示。

rcFL60.png

生产者发送的消息不单单发往一个缓存中,而是发往多个缓存,待所有缓存都收到消息之后才算发送成功,这一点和 Kafka 生产者客户端参数 acks = -1 的机理相通。每个 broker 中都会有一个延时操作的清理服务,彼此之间有主从的关系,任意时刻只有一个清理服务在工作,其余的清理服务都处于冷备状态。当某个延迟操作触发时会通知清理服务去清理其他延时操作缓存中对应的延时操作。这种架构虽然可以弥补消息可靠性的缺陷,但对于分布式架构中一些老生常谈的问题(比如缓存一致性、主备切换等)需要格外注意。

第二种思路还需要修改 Kafka 内核的代码,对开发人员源码的掌握能力及编程能力也是一个不小的挑战,后期系统的维护成本及 Kafka 社区的福利也是不得不考虑的问题。与此同时,这种思路和第一种思路一样会有内存暴涨的问题,单凭这个问题也可以判断出此种思路并不适合实际应用。

退一步思考,我们并不需要复用 Kafka 中的延时操作的模块,而是可以选择自己开发一个精度较高的延时模块,这里就用到了第7节中提及的时间轮的概念,所不同的是,这里需要的是单层时间轮。而且延时消息也不再是缓存在内存中,而是暂存至文件中。时间轮中每个时间格代表一个延时时间,并且每个时间格也对应一个文件,整体上可以看作单层文件时间轮,如下图所示。

rcev0x.png

每个时间格代表1秒,若要支持2小时(也就是 2×60×60 = 7200)之内的延时时间的消息,那么整个单层时间轮的时间格数就需要7200个,与此对应的也就需要7200个文件,听上去似乎需要庞大的系统开销,就单单文件句柄的使用也会耗费很多的系统资源。

其实不然,我们并不需要维持所有文件的文件句柄,只需要加载距离时间轮表盘指针(currentTime)相近位置的部分文件即可,其余都可以用类似“懒加载”的机制来维持:若与时间格对应的文件不存在则可以新建,若与时间格对应的文件未加载则可以重新加载,整体上造成的时延相比于延时等级方案而言微乎其微。随着表盘指针的转动,其相邻的文件也会变得不同,整体上在内存中只需要维持少量的文件句柄就可以让系统运转起来。

这里为什么强调的是单层时间轮。试想一下,如果这里采用的是多层时间轮,那么必然会有时间轮降级的动作,那就需要将高层时间轮中时间格对应文件中的内容写入低层时间轮,高层时间格中伴随的是读取文件内容、写入低层时间轮、删除已写入的内容的操作,与此同时,高层时间格中也会有新的内容写入。

如果要用多层时间轮来实现,不得不增加繁重的元数据控制信息和繁杂的锁机制。对单层时间轮中的时间格而言,其对应的要么是追加文件内容,要么是删除整个文件(到达延时时间,就可以读取整个文件中的内容做转发,并删除整个文件)。采用单层时间轮可以简化工程实践,减少出错的可能,性能上也并不会比多层时间轮差。

rcm1js.png

采用时间轮可以解决延时精度的问题,采用文件可以解决内存暴涨的问题,那么剩下的还有一个可靠性的问题,这里就借鉴了前面的多副本机制,如上图所示。生产者同样将消息写入多个备份(单层文件时间轮),待时间轮转动而触发某些时间格过期时就可以将时间格对应的文件内容(也就是延时消息)转发到真实主题中,并且删除相应的文件。与此同时,还会有一个后台服务专门用来清理其他时间轮中相应的时间格。

单层文件时间轮的方案不需要修改 Kafka 内核的源码,与前面第二种思路相比实现较为简单。单层文件时间轮的方案与延时级别的实现方案一样可以将延时服务(上图中单层时间轮与后台服务的整合体)与Kafka进程进行一对一配比的同机部署,以保证整体服务的可用性。

死信队列和重试队列

由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。后续分析程序可以通过消费这个死信队列中的内容来分析当时遇到的异常情况,进而可以改善和优化系统。

与死信队列对应的还有一个“回退队列”的概念,如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认,进而发生回滚消息的操作之后,消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

无论 RabbitMQ 中的队列,还是 Kafka 中的主题,其实质上都是消息的载体,换种角度看待问题可以让我们找到彼此的共通性。我们依然可以把 Kafka 中的主题看作“队列”,那么重试队列、死信队列的称谓就可以同延时队列一样沿用下来。 理解死信队列,关键是要理解死信。死信可以看作消费者不能处理收到的消息,也可以看作消费者不想处理收到的消息,还可以看作不符合处理要求的消息。比如消息内包含的消息内容无法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就可以看作消费者不能处理的消息。再比如超过既定的重试次数之后将消息投入死信队列,这里就可以将死信看作不符合处理要求的消息。

至于死信队列到底怎么用,是从 broker 端存入死信队列,还是从消费端存入死信队列,需要先思考两个问题:死信有什么用?为什么用?从而引发怎么用。在 RabbitMQ 中,死信一般通过 broker 端存入,而在 Kafka 中原本并无死信的概念,所以当需要封装这一层概念的时候,就可以脱离既定思维的束缚,根据应用情况选择合适的实现方式,理解死信的本质进而懂得如何去实现死信队列的功能。

重试队列其实可以看作一种回退队列,具体指消费端消费消息失败时,为了防止消息无故丢失而重新将消息回滚到 broker 中。与回退队列不同的是,重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列 Q1,Q1 的重新投递延时为5s,5s过后重新投递该消息;如果消息再次消费失败则入重试队列 Q2,Q2 的重新投递延时为10s,10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此还需要设置一个上限,超过投递次数就进入死信队列。重试队列与延时队列有相同的地方,都需要设置延时级别。它们的区别是:延时队列动作由内部触发,重试队列动作由外部消费端触发;延时队列作用一次,而重试队列的作用范围会向后传递。

消息路由

Kafka 默认按照主题进行路由,也就是说,消息发往主题之后会被订阅的消费者全盘接收,这里没有类似消息路由的功能来将消息进行二级路由,这一点从逻辑概念上来说并无任何问题。从业务应用上而言,如果不同的业务流程复用相同的主题,就会出现消息接收时的混乱,这种问题可以从设计上进行屏蔽,如果需要消息路由,那么完全可以通过细粒度化切分主题来实现。

rcmOPS.png

kafka rabbitmq对比

功 能 项 Kafka(2.0.0版本) RabbitMQ(3.6.10版本)
优先级队列 不支持。不过可以改造支持,难度不大 支持。建议优先级大小设置在0~10之间
延时队列 不支持。不过可以改造支持 支持
死信队列 不支持。不过可以改造支持 支持
重试队列 不支持。不过可以改造支持 不支持。RabbitMQ中可以参考延时队列实现一个重试队列,二次封装比较简单。如果要在Kafka中实现重试队列,则得先实现延时队列的功能,相对比较复杂
消费模式 拉模式 推模式+拉模式
广播消费 支持。Kafka对于广播消费的支持相对而言更加正统 支持,但力度较Kafka弱
回溯消费 支持。Kafka支持按照offset和timestamp两种维度进行回溯消费 不支持。RabbitMQ中消息一旦被确认消费就会被标记删除
消息堆积 支持 支持。一般情况下,内存堆积达到特定阈值时会影响其性能,但这不是绝对的。如果考虑到吞吐量这个因素,Kafka的堆积效率比RabbitMQ总体上要高得多
持久化 支持 支持
消息轨迹 不支持,可以改造支持 支持。RabbitMQ中可以采用Firehose或rabbitmq_tracing插件实现。不过开启rabbitmq_tracing插件件会大幅影响性能,不建议在生产环境中开启,反倒是可以使用Firehose与外部链路系统结合以提供高细腻度的消息轨迹支持
消息审计 不支持 不支持
消息过滤 客户端级别的支持 不支持。不过可以改造支持,难度不大
多租户 支持 支持
多协议支持 只支持自定义协议 RabbitMQ本身就是AMQP协议的实现,同时支持MQTT、STOMP等协议
跨语言支持 采用Scala和Java编写,支持多种语言的客户端 采用Erlang编写,支持多种语言的客户端
流量控制 支持 RabbitMQ的流控基于Credit-Based算法,是内部被动触发的保护机制,作用于生产者层面
消息顺序性 支持单分区级别的顺序性 顺序性的条件比较苛刻,需要单线程发送、单线程消费,并且不采用延迟队列、优先级队列等一些高级功能,从某种意义上来说不算支持顺序性
安全机制 支持 支持
幂等性 支持单个生产者单分区单会话的幂等性 不支持
事务性消息 支持 支持