您现在的位置是: 首页 > 能源汽车 能源汽车
Pulsar_pulsar官网
zmhk 2024-06-16 人已围观
简介Pulsar_pulsar官网 大家好,今天我想和大家分享一下我对“Pulsar”的理解。为了让大家更深入地了解这个问题,我将相关资料进行了整理,现在就让我们一起来探讨吧。1.Pulsar 的消息存储机制和
大家好,今天我想和大家分享一下我对“Pulsar”的理解。为了让大家更深入地了解这个问题,我将相关资料进行了整理,现在就让我们一起来探讨吧。
1.Pulsar 的消息存储机制和 Bookie 的 GC 机制原理
2.什么是脉冲星
3.脉冲星是什么
4.Apache Pulsar 跨地域复制详解
5.基于Pulsar实现的数据同步实践
Pulsar 的消息存储机制和 Bookie 的 GC 机制原理
[TOC]
本文是 Pulsar 技术系列中的一篇,主要简单梳理了 Pulsar 消息存储与 BookKeeper 存储文件的清理机制。其中,BookKeeper 可以理解为一个 NoSQL 的存储系统,默认使用 RocksDB 存储索引数据。
Pulsar 的消息存储在 BookKeeper 中,BookKeeper 是一个胖客户的系统,客户端部分称为 BookKeeper,服务器端集群中的每个存储节点称为 bookie。Pulsar 系统的 broker 作为 BookKeeper 存储系统的客户端,通过 BookKeeper 提供的客户端 SDK 将 Pulsar 的消息存储到 bookies 集群中。
Pulsar 中的每个 topic 的每个分区(非分区 topic,可以按照分区 0 理解,分区 topic 的编号是从 0 开始的),会对应一系列的 ledger,而每个 ledger 只会存储对应分区下的消息。对于每个分区同时只会有一个 ledger 处于 open 即可写状态。
Pulsar 在生产消息,存储消息时,会先找到当前分区使用的 ledger,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。非批量生产的情况(producer 端可以配置这个参数,默认是批量的),一个 entry 中包含一条消息。批量方式下,一个 entry 可能包含多条消息。而 bookie 中只会按照 entry 维度进行写入、查找、获取。
因此,每个 Pulsar 下的消息的 msgID 需要有四部分组成(老版本由三部分组成),分别为(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分区 topic 的时候为 -1,batch-index 在非批量消息的时候为 -1。
每个 ledger,当存在的时长或保存的 entry 个数超过阈值后会进行切换,同一个 partition 下的,新的消息会存储到下一个 ledger 中。Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。
BookKeeper 集群中的每个 bookie 节点收到消息后,数据会分三部分进行存储处理,分别为:journal 文件、entryLog 文件、索引文件。
其中 journal 文件,entry 数据是按照 wal 方式写入的到 journal 文件中,每个 journal 文件有大小限制,当超过单个文件大小限制的时候会切换到下一个文件继续写,因为 journal 文件是实时刷盘的,所以为了提高性能,避免相互之间的读写 IO 相互影响,建议存储目录与存储 entrylog 的目录区分开,并且给每个 journal 文件的存储目录单独挂载一块硬盘(建议使用 ssd 硬盘)。journal 文件只会保存保存几个,超过配置个数的文件将会被删除。entry 存储到 journal 文件完全是随机的,先到先写入,journal 文件是为了保证消息不丢失而设计的。
如下图所示,每个 bookie 收到增加 entry 的请求后,会根据 ledger id 映射到存储到那个 journal 目录和 entry log 目录,entry 数据会存储在对应的目录下。目前 bookie 不支持在运行过程中变更存储目录(使用过程中,增加或减少目录会导致部分的数据查找不到)。
如下图所示,bookie 收到 entry 写入请求后,写入 journal 文件的同时,也会保存到 write cache 中,write cache 分为两部分,一部分是正在写入的 write cache, 一部分是正在正在刷盘的部分,两部分交替使用。
write cache 中有索引数据结构,可以通过索引查找到对应的 entry,write cache 中的索引是内存级别的,基于 bookie 自己定义的 ConcurrentLongLongPairHashMap 结构实现。
另外,每个 entorylog 的存储目录,会对应一个 SingleDirectoryDbLedgerStorage 类实例对象,而每个 SingleDirectoryDbLedgerStorage 对象里面会有一个基于 RocksDB 实现的索引结构,通过这个索引可以快速的查到每个 entry 存储在哪个 entrylog 文件中。每个 write cache 在增加 entry 的时候会进行排序处理,在同一个 write cache,同一个 ledger 下的数据是相邻有序的,这样在 write cache 中的数据 flush 到 entrylog 文件时,使得写入到 entrylog 文件中的数据是局部有序的,这样的设计能够极大的提高后续的读取效率。
SingleDirectoryDbLedgerStorage 中的索引数据也会随着 entry 的刷盘而刷盘到索引文件中。在 bookie 宕机重启时,可以通过 journal 文件和 entry log 文件还原数据,保证数据不丢失。
Pulsar consumer 在消费数据的时候,做了多层的缓存加速处理,如下图所示:
获取数据的顺序如下:
上面每一步,如果能获取到数据,都会直接返回,跳过后面的步骤。如果是从磁盘文件中获取的数据,会在返回的时候将数据存储到 read cache 中,另外如果是读取磁盘的操作,会多读取一部分磁盘上的时候,因为存储的时候有局部有序的处理,获取相邻数据的概率非常大,这种处理的话会极大的提高后续获取数据的效率。
我们在使用的过程中,应尽量避免或减少出现消费过老数据即触发读取磁盘文件中的消息的场景,以免对整体系统的性能造成影响。
BookKeeper 中的每个 bookie 都会周期的进行数据清理操作,默认 15 分钟检查处理一次,清理的主要流程如下:
通过上面的流程,我们可以了解 bookie 在清理 entrylog 文件时的大体流程。
需要特别说明的是,ledger 是否是可以删除的,完全是客户端的触发的,在 Pulsar 中是 broker 触发的。
broker 端有周期的处理线程(默认 2 分钟),清理已经消费过的消息所在的 ledger 机制,获取 topic 中包含的 cursor 最后确认的消息,将这个 topic 包含的 ledger 列表中,在这个 id 之前的(注意不包含当前的 ledger id)全部删除(包括 zk 中的元数据,同时通知 bookie 删除对应的 ledger)。
在运用的过程中我们多次遇到了 bookie 磁盘空间不足的场景,bookie 中存储了大量的 entry log 文件。比较典型的原因主要有如下两个。
原因一:
生产消息过于分散,例如,举个极端的场景,1w 个 topic,每个 topic 生产一条,1w 个 topic 顺序生产。这样每个 topic 对应的 ledger 短时间内不会因为时长或者存储大小进行切换,active 状态的 ledger id 分散在大量的 entry log 文件中。这些 entry log 文件是不能删除或者及时压缩的。
如果遇到这种场景,可以通过重启,强制 ledger 进行切换进行处理。当然如果这个时候消费进行没有跟上,消费的 last ack 位置所在的 ledger 也是处于 active 状态的,不能进行删除。
原因二:
GC 时间过程,如果现存的 enrylog 文件比较多,且大量符合 minor 或 major gc 阈值,这样,单次的 minor gc 或者 major gc 时间过长,在这段时间内是不能清理过期的 entry log 文件。
这是由于单次清理流程的顺序执行导致的,只有上次一轮执行完,才会执行下一次。目前,这块也在提优化流程,避免子流程执行实现过长,对整体产生影响。
什么是脉冲星
PULSAR 手表是美国的品牌的。
Pulsar 是精工表旗下的一个品牌,中文译名不太清楚,可以到seiko的专柜看看,属于市场中档手表,有多种类型,男款女款,运动表,或者镶钻,价格不一。
Pulsar原来是一家美国表厂,是世上第一枚电子手表制造商,随后被精工集团收购,现为精工旗下一个定位比精工稍高一些的品牌。除了网站分类比精工本身好之外,可供查看的手表定位很实际,没有精工那种充大款的感觉。
脉冲星是什么
脉冲星,就是旋转的中子星。
脉冲星是在1967年首次被发现的。当时,还是一名女研究生的贝尔,发现狐狸星座有一颗星会发出一种周期性的电波。经过仔细分析,科学家认为这是一种未知的天体。因为这种星体不断地发出电磁脉冲信号,就把它命名为脉冲星。
2021年5月20日,国家天文台研究团队利用中国天眼FAST望远镜在观测中取得的重要进展,正式发布了201颗新脉冲星的发现。
Apache Pulsar 跨地域复制详解
脉冲星(Pulsar),是变星的一种,1967年首次被发现的。当时,还是一名女研究生的贝尔,发现狐狸星座有一颗星发出一种周期性的电波。经过仔细分析,科学家认为这是一种未知的天体。因为这种星体不断地发出电磁脉冲信号,人们就把它命名为脉冲星。脉冲星特征:
1.锥形扫射1968年有人提出脉冲星是快速旋转的中子星。中子星具有强磁场,运动的带电粒子发出同步辐射,形成与中子星一起转动的射电波束。由于中子星的自转轴和磁轴一般并不重合,每当射电波束扫过地球时,就接收到一个脉冲。
2.恒星在演化末期,缺乏继续燃烧所需要的核反应原料,内部辐射压降低,由于其自身的引力作用逐渐坍缩。质量不够大(约数倍太阳质量)的恒星坍缩后依靠电子简并压力与引力相抗衡,成为白矮星,而在质量比这还大的恒星里面,电子被压入原子核,形成中子,这时候恒星依靠中子的简并压与引力保持平衡,这就是中子星。典型中子星的半径只有几公里到十几公里,质量却在1-2倍太阳质量之间,因此其密度可以达到每立方厘米上亿吨。由于恒星在坍缩的时候角动量守恒,坍缩成半径很小的中子星后自转速度往往非常快。又因为恒星磁场的磁轴与自转轴通常不平行,有的夹角甚至达到90度,而电磁波只能从磁极的位置发射出来,形成圆锥形的辐射区。
3.此为在持脉冲星便是中子星的证据中,其中一个便是我们在蟹状星云(M1;原天关客星,SN 1054)确实也发现了一个周期约0.033s的波霎。
4.脉冲星靠消耗自转能而弥补辐射出去的能量,因而自转会逐渐放慢。但是这种变慢非常缓慢,以致于信号周期的精确度能够超过原子钟。而从脉冲星的周期就可以推测出其年龄的大小,周期越短的脉冲星越年轻。
5.脉冲星的特征除高速自转外,还具有极强的磁场,电子从磁极射出,辐射具有很强的方向性。由于脉冲星的自转轴和它的磁轴不重合,在自转中,当辐射向着观测者时,观测者就接收到了脉冲。到1999年,已发现1000颗脉冲星。
基于Pulsar实现的数据同步实践
在 Geo-Replication 的设计支撑下,其一,我们可以比较容易的将服务分散到多个机房;其二,可以应对机房级别的故障,即在一个机房不可用的情况下,服务可以转接到其它的机房来继续对外提供服务。Apache Pulsar 内置了多集群跨地域复制的功能,GEO-Repliaaction 是指把分散在不同物理地域的集群通过一定的配置方式让其能在集群之间进行数据的相互复制。
根据消息是否为异步读写的维度,跨地域复制可以分为如下两种方案:
下面我们讨论的是异步模式下,pulsar 的跨地域复制方案。
Pulsar 目前支持以下三种异步跨地域复制的方案:
从是否具有 configurationStoreServers (global zookeeper)的角度可以分为以下两种异步跨地域复制方案:
在整个跨地域复制中的一个核心理念在于,各个集群之间的数据是否能够互通,它们之间的交互主要依靠如下配置信息:
Full-mesh 的形式允许数据在多个集群中共享,如下图:
对于多个集群之间的数据复制,我们均可以简化到两个集群之间的数据复制,基于这个理念,Geo-Replication 的原理如下图所示:
当前拥有两个集群,分别部署在北京和上海,当用户在北京的集群中使用 producer 发送数据时,首先会发送到北京机房的本地集群中(topic1)与此同时会去创建一个 replication cursor,用于专门复制数据的一个游标,通过这个cursor信息,你可以判断当前数据究竟复制到哪一个阶段。同时会去创建 replication producer,它会把数据从北京机房的 topic1 中读取数据,然后将数据写到上海机房的 topic1 中,上海机房的 broker 收到 producer 的请求之后,会写到本地相同的 topic 中来(topic1)。此时如果上海机房的用户开启 consumer 去消费数据的话,会接收到由北京机房 producer 生产的数据信息。反之亦然。
在这里需要说明如下问题:
上面我们提到,在配置了 global zookeeper 的情况下,是没有办法做数据的单向复制的,但是很多场景下,我们并不需要所有的集群之间的数据都是全连通的,这种场景下,我们就可以考虑使用单向复制的功能,需要强调的是,单向复制并不需要用户单独配置或指定 configurationStoreServers,配置时只需要将 configurationStoreServers 的值配置为本地集群的 zookeeper 地址(zookeeperServers)即可。
那么在不配置 global zookeeper 的情况下,如何去做跨集群复制的场景呢?
在上面我们提到,global zookeeper 的作用主要是用来存储多个集群的地址信息以及相应的namespace信息,并没有额外的元数据信息。所以在单向复制的场景下,你需要告诉其它机房的集群,你需要读到不同集群之间的 namespace 信息。
Failover 模式是单向复制的特例。
Failover 模式下,远端机房的集群只是用来做数据的备份,并不会有producer和consumer的存在,只有当当前处于 active 的集群宕机之后,才会把对应的 producer 和 consumer 切换到对应的 standby 集群中来继续消费。因为有 replication sub 的存在,所以会一同将订阅的状态也复制到备份机房。
目前 TDMD 已经基于 Apache Pulsar 应用在多种业务场景下,腾讯云TDMQ、计平、数平等多个团队也在一起共建Pulsar,对Pulsar感兴趣的小伙伴,欢迎加入下面的企业微信群一起交流。
[上传失败...(image-b7c1a1-1639451966467)]
数据同步,就是让数据通过一定的传输介质,从一个地点到达另一个地点,从而实现数据的同步或复制,来满足应用需求。在本人的上一份工作中,随着业务量及数据量的的大幅增长,公司不得不对现有的微服务再度细化(拆分)。公司采用的是基于领域驱动设计的微服务体系,每个领域在需求日益增加的同时,必然会愈来愈大,考虑到业务内聚、系统性能等诸多因素不得不把某些大的领域中心拆分成多个服务。这个过程就是系统重构。
系统拆分如何让用户无感知呢?上线时通过分流策略将部分用户引流到新的服务中,要求新老系统并行运行一段时间来支撑新服务的试运行到完全落地,从而最大化减小生产故障。为了让新服务数据能够与旧系统服务中的数据实时一致,就需要同步数据。随着数据量大幅增长,要加快查询速度,可以将数据复制到 ES(ElasticSearch)中,提高查询速率。
综上总结:如何实现增量数据实时同步?
市场上有相关的开源数据同步产品和商业版数据通道工具,不需要人工任何接入即可实现双边的数据同步复制。但在系统重构时时可能会发生一些表结构的变动以及表对象的一些变动,此时就无法兼容商业的数据同步,当然也可以采纳其中的部分解决方案但还是需要开发人员介入进行相关处理,所以最终,我们采用了 Maxwell + Pulsar 的自研解决方案:使用 Maxwell 读取 binlog(也可以使用 canal ,maxwell实现较简单点),Pulsar 进行数据传输。Maxwell + Pulsar 实现上层的数据读取,下游业务方实现对应的数据同步逻辑。比如,针对系统重构拆分的数据同步业务场景,以及读写分离,将数据复制同步到类似 ElasticSearch 搜索引擎中的业务场景。
该图主要展示核心的数据链路,隐藏日志记录、链路追踪等一些微服务体系中所含有的组件。
Pulsar 有四种消费模式:exclusive 模式(独占模式)、failover 模式 (故障转移模式)、Shared 模式 (共享模式)以及 Key_Shared 模式。
Exclusive 模式只有一个 Consumer,接收一个 Topic 所有的消息。
Key_Shared 模式是 shared 订阅模式拓展,一个分区可以有几个消费者并行消费消息,但具有相同 key 的消息只会路由给一个消费者。其原理则是通过哈希来确定目标的使用者。每个消费端提供固定范围的哈希值,当然散列值的整个范围可以覆盖所有的消费端。
经上所述,对于数据同步的场景,我们将key指定为下面的示例,就可以实现有序的存放至指定的分区以及消息有序的消费啦!
消息的传输保障一般有三种:At least once、At most once 和 Exactly once。
在数据同步场景下,要最大化的保证消息的可达性,可以运用 Maxwell 的 At least once 模式,尽可能保证消息传输。在网络不理想时,消息可能已经投递至目标,但接收到超时响应或者未接收成功,Pulsar 会再次投递,从而产生了我们认为的“重复消息”。
好了,关于“Pulsar”的话题就到这里了。希望大家通过我的介绍对“Pulsar”有更全面、深入的认识,并且能够在今后的实践中更好地运用所学知识。