消息队列总结

消息队列MQ

中间件:位于操作系统之上,客户应用系统之下的中间层,针对某些特定的领域抽取他们的共性部分独立出来的一套组件,称之为中间件。目的是为了进一步简化应用系统的开发,避免重复劳动。

  • 消息队列系统传输的对象是消息,消息如何来表示,即消息格式,比如使用CSV、XML、JSON或是二进制序列。
  • 消息如何传输,消息在生产者,服务器和消费者之间如何流转,即是传输模型。
点对点模型

消息被保存在队列中,一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。一旦消费者读取队列中的消息,该消息就会从队列中消失。

发布/订阅模型

该模型有一个Topic的概念,和点对点模型不同的是,该模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,他们都能接收到相同主题的消息。

消息队列的作用

  • 业务解耦(将频繁变动的非关键业务代码与稳定且关键的业务逻辑代码剥离开来)
  • 异步调用(下单业务中,同步调用的调用链会很长,响应时间较长,异步调用会提高响应时间)
  • 削峰填谷(遇到一些类似于双11的大促业务,可以使用MQ承接多的流量,之后再通过机器去MQ中拉取消息处理业务)
  • 信息汇聚(大数据场景下,多数据源将各自产生的数据集中收拢到消息中间件,然后再由下游清洗和消费)
  • 数据同步管道(用于同步数据变化的场景,例如数据库日志在两个数据库实例间的同步)

RocketMQ

生产者:消息发布的角色,支持分布式集群方式部署,通过MQ的负载均衡模块(SDK)选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

服务端NameServer:元数据管理中心,类似于Zookeeper,支持Broker的动态注册与发现,以及Topic的路由注册。NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据,然后提供心跳检测机制,检查Broker是否还存活。每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息,然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息。从而进行 消息的投递和消费。Broker向每一台NameServer注册自己的路由信息,所以当某一个NameServer下线之后,Broker仍然可以向其他NameServer同步其路由信息。

Broker服务实例:负责消息的投递、存储和查询以及服务高可用保证。Broker下的子模块

  • Remoting Moudle:整个Broker的实体,负责处理来自客户端的请求。
  • Client Manager:负责管理客户端和维护Consumer的Topic订阅信息。
  • Store Service:提供方便简单的API接口处理消息存储到无力硬盘和查询功能
  • HA Service:高可用服务,提供Master Broker和Slaver Broker之间的数据同步功能
  • Index Service:根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

消费者:支持分布式集群方式部署,支持长轮询模式对消息进行消费,同时也支持集群方式和广播方式的消费,以及实时消息订阅机制。

RocketMQ集群工作流程

  • 启动NameServer,监听端口,等待Broker、Producer和Consumer连接。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。
  • 收发消息前,先创建Topic,创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并默认每隔30秒从NameServer中拉取Topic路由信息TopicPublishInfoTable,缓存到本地,从TopicPublishInfoTable中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。Broker作为消息的接收者接收消息并落盘存储。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,通过客户端负载均衡机制选择某一个或者几个消息队列来拉取消息进行消费。这里的拉取其实是兼顾拉(pull)和推(push)的长轮询模式(long polling),即首先消费者客户端向Broker发出拉取请求,如果Broker上的相应队列有数据,则拉取该数据;如果没有,则Broker作为服务端会hold该拉取请求30s,30s内有数据,则数据返回本次请求结束;如果30s内没有数据,则timeout,该请求结束,客户端立即发起下一次请求。

消费端负载均衡算法

  • 平均分配算法
  • hash一致性算法
  • 按机房分配算法
  • 自定义算法

默认的是消息队列的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录。

RocketMQ的rebalance

Consumer在消费过程中是会连上具体的Broker server的,而且会定期上报心跳,因此Broker上是有全量的Consumer信息的,Consumer客户端可以拉取到整体有多少个Consumer节点,自己的排序情况如何;同时Consumer也能从Broker上拉取它所订阅的Topic全量的MessageQueue信息即总size。总size除以Consumer节点数就得到每页(每个)Consumer应该分配的MessageQueue。Broker定时会主动触发检查Topic的MessageQueue数量是否发生变化,如果变化则会触发为每个Consumer节点重新分配MessageQueue的计算。

RocketMQ的存储机制

  • CommitLog:日志数据文件,即消息主体以及元数据的存储主体,由Broker来创建,它解决了消息存哪里的问题。生产者(Producer)通过API发送消息到Broker端,首先会以顺序写的方式追加到一个名为CommitLog的文件末尾。这里的CommitLog文件就是真正存储消息本身的地方。默认一个CommitLog文件为1G大小,这是为了避免一个过大文件存储所有消息,导致读和写的效率低下。一个Broker实例下所有的Topic队列共用一个(组)CommitLog来存储消息
  • ConsumeQueue,即逻辑消费队列,可以等同的理解为前文中提到的逻辑分片messageQueue。它解决队列怎么存,消费者如何消费的问题。一个Topic会以多个ConsumeQueue的形式来存储(默认是4),Consumequeue以文件形式存储,采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量(commitLogOffset)、4字节的消息长度(msgSize)、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。因此,我们可以看出ConsumeQueue没有重复存储消息本身,而是作为消息的索引,帮助Consumer来查找待消费的数据。只要有了物理偏移量(commitLogOffset)和消息长度(msgSize),消费者就完全可以从CommitLog中读到它想要消费的信息了。
  • IndexFile: 即索引文件,Broker在创建ConsumeQueue的同时也会创建IndexFile,它用于解决性能问题。它提供了一种可以通过Key或时间区间来查询消息的方法。
  • miniOffset,consumerOffset和maxOffset,用于解决如何消费的问题。

RocketMQ消息刷盘机制

  • 同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,要谨慎使用,一般适用于金融业务。

  • 异步刷盘:能够充分利用OS的页缓存(PageCache)的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

幂等处理

在编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。

实现幂等性的技术方案

  • 查询操作:select是天然的幂等操作
  • 删除操作
  • 唯一索引,防止新增脏数据
  • token机制,防止页面重复提交
  • 悲观锁
  • 乐观锁
  • 分布式锁

2PC(两阶段提交协议)https://segmentfault.com/a/1190000012534071

两阶段提交协议的目标在于为分布式系统保证数据的一致性,许多分布式系统采用该协议提供对分布式事务的支持。顾名思义,该协议将一个分布式的事务过程拆分成两个阶段: 投票和事务提交 。为了让整个数据库集群能够正常的运行,该协议指定了一个 协调者 单点,用于协调整个数据库集群各节点的运行。为了简化描述,我们将数据库集群中的各个节点称为 参与者 ,三阶段提交协议中同样包含协调者和参与者这两个角色定义

第一阶段:投票

该阶段的主要目的在于打探数据库集群中的各个参与者是否能够正常的执行事务

  • 协调者向所有的参与者发送事务执行请求,并等待参与者反馈事务执行结果;
  • 事务参与者收到请求之后,执行事务但不提交,并记录事务日志;
  • 参与者将自己事务执行情况反馈给协调者,同时阻塞等待协调者的后续指令。

第二阶段:事务提交

经过第一阶段协调者的询盘之后,各个参与者会回复自己事务的执行情况,这时候存在 3 种可能性

  • 所有的参与者都回复能够正常执行事务。

    处理如下

    • 协调者向各个参与者发送 commit 通知,请求提交事务;
    • 参与者收到事务提交通知之后执行 commit 操作,然后释放占有的资源;
    • 参与者向协调者返回事务 commit 结果信息。
  • 一个或多个参与者回复事务执行失败。

    处理如下

    • 协调者向各个参与者发送事务 rollback 通知,请求回滚事务;
    • 参与者收到事务回滚通知之后执行 rollback 操作,然后释放占有的资源;
    • 参与者向协调者返回事务 rollback 结果信息。
  • 协调者等待超时。

    处理如下

    • 协调者向各个参与者发送事务 rollback 通知,请求回滚事务;
    • 参与者收到事务回滚通知之后执行 rollback 操作,然后释放占有的资源;
    • 参与者向协调者返回事务 rollback 结果信息。

Kafka(消息引擎系统)

传输模型

  • 点对点模型
  • 发布/订阅模型

体系结构

  • Producer

  • Consumer

  • Broker:服务代理节点

  • Topic

  • Partition

  • Offset

    一个topic有多个Partition,一个Partition又会有多个Replica,一个Replica中记录的是Log,Log又可以被分为多个LogSegment,LogSegment包含.log.index.timeindex等文件

Kafka的多副本机制

  • 同一分区的不同副本中保存的是相同的消息,副本之间是“一主多从”的关系

Kafka的主要场景还是在消息引擎而不是以数据存储的方式对外提供读服务,通常设计频繁的生产消息和消费消息,这不属于典型的读多写少场景,因此读写分离不适合kafka。

  • AR(Assigned Replicas):分区中的所有副本
  • ISR(In-Sync Replicas):所有与leader副本保持一定程度同步的副本(包括leader副本)
  • OSR(Out-of-Sync Replicas):与leader副本同步滞后过多的副本
  • HW(High watermark):高水位,标识了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息
  • LEO(Log End Offset):标识当前日志文件中下一条待写入消息的offset

Kafka的三层消息架构:

  • 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
  • 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。

Kafka的分区策略

  • 轮询策略(Round-robin)
  • 随机策略(Randomness)
  • 按消息键保存策略

Rebalance的触发条件

  • 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

位移主题:_consumer_offsets是kafka的内部主题,也被称为位移主题,即Offsets Topic。

老版本Consumer的位移管理依托于ZooKeeper,它会自动或手动的将位移数据提交到ZooKeeper中保存,当Consumer重启后,它能自动从ZooKeeper中读取位移数据,从而在上次消费截止的地方继续消费。ZooKeeper的缺点时不适用于高频的写操作。

新版本Consumer的位移管理很简单,就是将Consumer的位移数据作为一条条普通的Kafka消息,提交到_consumer_offsets中,可以说,_consumer_offsets的主要作用是保存Kafka消费者的位移信息,要求这个提交过程不仅要支持高持久性,还要支持高频的写操作。

kafka consumer多线程方案

  • 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。
  • 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。

副本机制的好处

  • 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
  • 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
  • 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

Kafka的Leader-based副本机制

  • 在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
  • 在 Kafka 中,追随者副本是不对外提供服务的。这就是说,追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
  • 领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。

Read-your-writes:这里指当你使用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息。

Monotonic Reads(单调读):对于一个消费者用户而言,在多次消费消息时,他不会看到某条消息一会存在一会不存在。如果第一次读到的是leader副本的消息,第二次可能读到follow副本的消息。而此时follow副本还没有同步leader副本中的消息。

Unclean领导者选举:通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。这就是我们常说的分布式系统的CAP理论。

Reactor模式:Kafka处理请求的方式。

  • Broker中存在一个类似于Reactor中的dispatcher的组件名叫SocketServer,它有对应的Acceptor线程和一个工作线程池。在kafka中这个线程池叫网络线程池。当网络线程拿到请求后,它不是自己处理,而是将请求放到一个共享请求队列中。Broker端还有个IO线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。IO 线程池处中的线程才是执行请求逻辑的线程。请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。这么设计的原因就在于,Dispatcher 只是用于请求分发而不负责响应回传,因此只能让每个网络线程自己发送 Response 给客户端,所以这些 Response 也就没必要放在一个公共的地方。
  • Purgatory组件:用来缓存延时请求,所谓延时请求,就是那些一时未满足条件不能立刻处理的请求。比如设置了 acks=all 的 PRODUCE 请求,一旦设置了 acks=all,那么该请求就必须等待 ISR 中所有副本都接收了消息后才能返回,此时处理该请求的 IO 线程就必须等待其他 Broker 的写入结果。当请求不能立刻处理时,它就会暂存在 Purgatory 中。稍后一旦满足了完成条件,IO 线程会继续处理该请求,并将 Response 放入对应网络线程的响应队列中。

Coordinator:Kafka的协调者,专门为Consumer Group服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。

Controller:Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。集群中任意一台Broker都能充当控制器的角色,但在运行过程中,只能有一个Broker成为控制器,行使其管理和协调的职责。

控制器的功能

  • 主题管理(创建、删除、增加分区)
  • 分区重分配:Kafka-reassign-partitions脚本提供的对已有主题分区进行细粒度的分配功能。
  • Preferred领导者选举:Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案。
  • 集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
  • 数据服务:控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

Kafka的定时器:kafka基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。可以将插入和删除操作的时间复杂度降为O(1)。例如当配置了acks=all的生产者发送的请求必须确保ISR中的所有副本都成功响应这次写入,这就是所谓的延时请求。

kafka中使用的请求被延时处理的机制是分层时间轮算法。

kafka的broker如何实现持久化数据:使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写消息的物理文件,不过如果不停的向一个日志写消息,最终也会耗尽所有的磁盘空间,因此Kafka通过Log Segment机制,在Kafka底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满一个日志段后,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来,Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。

kafka控制器的选举机制:每个代理节点都会作为ZK的客户端,向ZK服务端尝试创建/controller临时节点。Zookeeper里采用的是Zab共识算法/协议。

Kafka命令行操作

查看当前服务器中的所有topic

1
kafka-topics --list --zookeeper localhost:2181

创建topic

1
2
3
4
5
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数

删除topic

1
kafka-topics --delete --zookeeper localhost:2181 --topic test

查看某个topic的详情

1
kafka-topics --describe --zookeeper localhost:2181 --topic test

生产消息

1
kafka-console-producer --broker-list localhost:9092 --topic test

消费消息

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

Pulsar

核心思想:计算存储分离

特点:多租户,易于扩展

Pulsar核心组件

Broker:计算层,整个消息层的生产和消费,无存储状态。

  • Dispatcher:调度分发模块,承担协议转换、序列化反序列化等。
  • Load balancer:负载均衡模块,对访问流量进行控制管理。
  • Global replicators:跨集群复制模块,承担异步的跨集群消息同步功能。
  • Service discovery:服务发现模块,为每个 topic 选择无状态的主节点。

BookKeeper:存储层,数据持久化保存的节点,有存储状态。

  • Bookie:存储设备

RocksDB:内嵌在 BookKeeper 中的数据库,存储每个条目的位置索引。

Zookeeper:元数据管理,存储 Pulsar 和 BookKeeper 元数据 和 节点状态,以及服务发现(发现 broker ,发现bookie)。在 Pulsar 里的作用是存储 Pulsar 系统里元数据的存储和集群的管理以及节点的发现等,节点发现是指发现集群里有多少个 broker,有多少 bookie。

  • local zk:负责Pulsar Cluster内部的配置等
  • global zk:则用于Pulsar Cluster之间的数据复制等。
  • Configuration Store:存储的是集群复制信息,让集群之间互相了解各自的地址。同时还包括一些 clients 或 namespace 的相关配置信息。

Pulsar订阅模型

  • Exclusive(独享):一个订阅只能有一个消费者消费消息
  • Fail-Over(灾备):一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。
  • Shared(共享):一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。消息通过轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给它,但没有被确认的消息将被重新安排,分发给其他存活的消费者。
  • Key Shared(Key共享):一个订阅中同时可以有多个消费者,消息在各个使用者之间进行分发,其规则是相同Key的消息仅传递给一个使用者。不管消息被重新发送多少次,它都会被发送到同一消费者。

Puslar消息路由策略

  • 单分区:生产者随机挑选一个分区,并将数据写入该分区。
  • 轮询:生产者通过轮询的方式将数据平均地分布到各个分区上。
  • 哈希:每个消息会带上一个键,要写入哪个分区取决于它所带的键。
  • 自定义:生产者使用自定义函数生成分区对应的数值,然后根据这个数值将消息写入对应的分区。

Pulsar 的数据存储节点 Bookkeeper 被称为 Bookie,相当于一个 Kafka Broker。Ledger 是 Topic 的若干日志的集合,是 Pulsar 数据删除的最小单元,即 Pulsar 每次淘汰以 Ledger 为单位进行删除。Fragment 是 Bookkeeper 的概念,对应一个日志文件,每个 Ledger 由若干 Fragment 组成。

Pulsar Broker

Pulsar 的 metadata 存储在 zookeeper 上,而消息数据存储在 Bookkeeper 上。Broker 虽然需要这些 metadata,但是其自身并不持久化存储这些数据,所以可以认为是无状态的。不像 Kafka 是在 Partition 级别拥有一个 leader Broker,Pulsar 是在 Topic 级别拥有一个 leader Broker,称之为拥有 Topic 的所有权,针对该 Topic 所有的 R/W 都经过该 Broker 完成。

Pulsar Broker 可以认为是一种 Proxy,它对 client 屏蔽了服务端读写流程的复杂性,是保证数据一致性与数据负载均衡的重要角色,所以 Pulsar 可以认为是一种基于 Proxy 的分布式系统。与之形成对比的 kafka 可以认为是一种基于 SmartClient 的系统,所以 Kafka 服务端自身的数据一致性流程还需要 Client SDK 与之配合完成。

Kafka 的所有 Broker 会选出一个 Leader,作为 Broker Leader 决定 Broker 宕机判断、集群扩容、创建删除 Topic、Topic Replica分布、Topic Partition 的 Leader 的选举。Pulsar 的所有 Broker 也会借助 zookeeper 加锁的方式选举一个 Leader【或者称为 Master 更合适,以区分于 Topic 的 Leader】,对 Broker 宕机判断(Failover)、根据 Bookie 集群负载Topic Ledger 所有权【即 Ledger 所在的 Bookie】等任务

Pulsar Bookie

Pulsar 的底层数据 以 Ledger(上图中的 Segment 就是 Ledger) 形式存储在多个 BookKeeper 上,当集群扩容添加 Bookies 后,Pulsar 会在新的 Bookie 上创建新的 Segment(即 Bookeeper 的 Ledger),所以不需要再扩容时候像 Kafka 一样进行 Rebalance 操作,其结果就是 Fragments跨多个Bookies以带状分布。但是这样的结果就是同一个 Ledger 的 Fragments 分布在多个 Bookie 上,导致读取和写入会在多个 Bookies 之间跳跃。Topic的 Ledger 和 Fragment 之间映射关系等元数据存储在 Zookeeper 中,Pulsar Broker 需要实时跟踪这些关系进行读写流程。

Ledgers

一个Topic实际上是一个ledgers流。Ledger本身就是一个日志。所以一系列的子日志(Ledgers)组成了一个父日志(Topic)。

Ledgers追加到一个Topic,条目(消息或者一组消息)追加到Ledgers。Ledger一旦关闭是不可变的。Ledger作为最小的删除单元,也就是说我们不能删除单个条目而是去删除整个Ledger。

Ledgers本身也被分解为多个Fragment。Fragment是BookKeeper集群中最小的分布单元,每个Ledger(由一个或多个Fragment组成)可以跨多个BookKeeper节点(Bookies)进行复制,以实现数据容灾和提升读取性能。每个Fragment都在一组不同的Bookies中复制(存在足够的Bookies)。

每个Ledger有三个关键配置

  • Ensemble Size(E):Ensemble表示将要写入的实际的Bookies数量
  • Write Quorum Size(Qw):Write Quorum (Qw) 是Pulsar将要写入的实际的Bookies数量。可以等于或者小于Ensemble。
  • Ack Quorum Size(Qa):Ack Quorum (Qa) 是确认写入Bookies的数量,Pulsar Broker将确认发送给客户端。为了一致性,Qa应该是:(Qw + 1) / 2 或者更大。

Pulsar总结

  • 每个Topic都有一个归属的Broker
  • 每个Topic在逻辑上分解为Ledgers、Fragments和Entries。
  • Fragments分布在Bookie集群中。Topic与Bookie并不耦合。
  • Fragments可以跨多个Bookies带状分布。
  • 当Pulsar Broker不可用时,该Broker持有的Topic所有权将转移至其他的Broker。Fencing机制避免了同一个Topic当前的Ledger同时有两个所有者(Broker)。
  • 当Bookie不可用时,自动恢复(如果启用)将自动进行数据重新复制到其他的Bookies。如果禁用,则可以手动启动此过程。
  • Broker缓存尾部消息日志,可以非常高效的为尾部读取操作提供服务。
  • Bookies使用Journal提供持久化保证。该日志可用于故障恢复时恢复尚未写入Entry Log文件的数据。

  • 所有Topic的的条目都保存在Entry Log文件中。查找索引保存在RocksDB中。

  • 所有Topic的的条目都保存在Entry Log文件中。查找索引保存在RocksDB中。
  • Bookies可以通过单独的磁盘做IO读写分离。
  • Zookeeper存储Pulsar和BookKeeper的所有元数据。如果Zookeeper不可用整个Pulsar将不可用。
  • 存储可以单独扩展。如果存储是瓶颈,那么只需要添加更多的Bookies,他们会自动承担负载,不需要Rebalance。

Kafka和Pulsar

Kafka:单片架构模型,服务与存储相结合

Pulsar:多层架构,可以在单独的层内进行管理

相对于kafka,这是一个非常不同且复杂的模型。对于kafka,每个Partition副本都完整的存储在kafka节点上。Partition以及Partition副本由一系列的Segment和索引文件组成。kafka模型的优点在于简单快捷。所有读写都是顺序的。不好的是,单个节点必须有足够的磁盘空间来处理副本,因此非常大的副本可能会迫使你是用非常大的磁盘。第二个缺点是,在集群扩展时必须做Rebalance。这个过程是比较痛苦的,需要良好的计划和执行来保证没有任何故障的情况下分散节点的存储压力。

在Pulsar+BookKeeper模型中,Topic中的数据分布在多个Bookies上。Topic被分割成Ledgers,Ledgers被分割成Fragments分布在Fragment使用的Bookies上。当需要做集群扩展时,只需添加更多Bookies,它们就会在创建新的Fragment时开始在的Bookies上写入数据,不再需要kafka的Rebalance操作。

事务区别

RocketMQ中的事务,它解决的问题是,确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且RocketMQ增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。

Kafka 中的事务,它解决的问题是,确保在一个事务中发送的多条消息,要么都成功,要么都失败。(这里面的多条消息不一定要在同一个主题和分区中,可以是发往多个主题和分区的消息)当然也可以在kafka事务执行过程中开启本地事务来实现类似RocketMQ事务消息的效果,但是Kafka是没有事务消息反查机制的,它是直接抛出异常的,用户可以根据异常来实现自己的重试等方法保证事务正常运行。

它们的共同点就是:都是通过两阶段提交来实现事务的,事务消息都保存在单独的主题上。不同的地方就是RocketMQ是通过“半消息”来实现的,kafka是直接将消息发送给对应的topic,通过客户端来过滤实现的。而且它们两个使用的场景区别是非常之大的,RockteMQ主要解决的是基于本地事务和消息的数据一致性,而Kafka的事务则是用于实现它的Exactly-once机制,应用于实时流计算的场景中。

Pulsar的事务消息和Kafka应用场景和语义类似,只是由于底层实现机制有差别,在一些细节上有区别。

参考资料:

http://matt33.com/2019/01/28/bk-store-realize/

https://mp.weixin.qq.com/s/CIpCLCxqpLoQVUKz6QeDJQ

https://alexstocks.github.io/html/pulsar.html

https://segmentfault.com/a/1190000023605433

https://segmentfault.com/a/1190000038173886

https://blog.csdn.net/shijinghan1126/article/details/117418520