您當(dāng)前的位置:首頁科技正文

RocketMQ Streams:將輕量級實(shí)時(shí)計(jì)算引擎融合進(jìn)消息系統(tǒng)

2021-12-15 15:31:13

隨著各行各業(yè)移動(dòng)互聯(lián)和云計(jì)算技術(shù)的普及發(fā)展,大數(shù)據(jù)計(jì)算已深入人心,最常見的比如 flink、spark 等。這些大數(shù)據(jù)框架,采用中心化的 Master-Slave 架構(gòu),依賴和部署比較重,每個(gè)任務(wù)也有較大開銷,有較大的使用成本。RocketMQ Streams 著重打造輕量計(jì)算引擎,除了消息隊(duì)列,無額外依賴,對過濾場景做了大量優(yōu)化,性能提升 3-5 倍,資源節(jié)省 50%-80%。

RocketMQ Streams 適合大數(shù)據(jù)量 ->高過濾 ->輕窗口計(jì)算的場景,核心打造輕資源,高性能優(yōu)勢,在資源敏感場景中有很大優(yōu)勢,最低 1core,1g 可部署,建議的應(yīng)用場景(安全,風(fēng)控,邊緣計(jì)算,消息隊(duì)列流計(jì)算)。

RocketMQ Streams 兼容 Blink(Flink 的阿里內(nèi)部版本) 的 SQL,UDF/UDTF/UDAF,多數(shù) Blink 任務(wù)可以直接遷移成 RocketMQ Streams 任務(wù)。將來還會(huì)發(fā)布和 Flink 的融合版本,RocketMQ Streams 可以直接發(fā)布成 Flink 任務(wù),既可以享有 RocketMQ Streams 帶來的高性能,輕資源,還可以和現(xiàn)有的 Flink 任務(wù)統(tǒng)一運(yùn)維和管理。

本篇文章主要從五個(gè)方面來介紹 RocketMQ Streams 實(shí)時(shí)計(jì)算平臺:

首先簡單先介紹一下什么是 RocketMQ Streams;

第二部分,基于 RocketMQ Streams 的 SDK,來了解下它是怎么去使用的;

第三部分,RocketMQ Streams 整體的架構(gòu)以及它的原理實(shí)現(xiàn);

第四部分,在云安全的場景下該怎么使用 RocketMQ Streams;

第五部分,RocketMQ Streams 的未來規(guī)劃。

1

什么是 RocketMQ Streams?

本章節(jié)從基礎(chǔ)簡介、設(shè)計(jì)思路和特點(diǎn)三方面對 RocketMQ streams 進(jìn)行整體介紹。

RocketMQ Streams 簡介

1)首先,它是一個(gè) Lib 包,啟動(dòng)即運(yùn)行,和業(yè)務(wù)直接集成;

2)然后,它具備 SQL 引擎能力,兼容 Blink SQL 語法,兼容 Blink UDF/UDTF/UDAF;

3)其次,它包含 ETL 引擎,可以無編碼實(shí)現(xiàn)數(shù)據(jù)的 ETL,過濾和轉(zhuǎn)存;

4)最后,它基于數(shù)據(jù)開發(fā) SDK,大量實(shí)用組件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的場景。

RocketMQ Streams 的特點(diǎn)

RocketMQ streams 基于上述的實(shí)現(xiàn)思路,可以看到它有以下幾個(gè)特點(diǎn):

輕量

1 核 1g 就可以部署,依賴較輕,在測試場景下用 Jar 包直接寫個(gè) main 方法就可以運(yùn)行,在正式環(huán)境下最多依賴消息隊(duì)列和存儲(chǔ)(其中存儲(chǔ)是可選的,主要是為了分片切換時(shí)的容錯(cuò))。

高性能

實(shí)現(xiàn)高過濾優(yōu)化器,包括前置指紋過濾,同源規(guī)則自動(dòng)歸并,hyperscan 加速,表達(dá)式指紋等,比優(yōu)化前性能提升 3-5 倍,資源節(jié)省 50% 以上。

維表 JOIN(千萬數(shù)據(jù)量維表支持)

設(shè)計(jì)高壓縮內(nèi)存存儲(chǔ)數(shù)據(jù),無 java 頭部和對齊的開銷,存儲(chǔ)接近原始數(shù)據(jù)大小,純內(nèi)存操作,性能最大化,同時(shí)對于 Mysql 提供了多線程并發(fā)加載,提高加載維表的速度。

高擴(kuò)展的能力

Source 可按需擴(kuò)展,已實(shí)現(xiàn):RocketMQ,F(xiàn)ile,Kafka;

Sink 可按需擴(kuò)展,已實(shí)現(xiàn):RocketMQ,F(xiàn)ile,Kafka,Mysql,ES;

可按 Blink 規(guī)范擴(kuò)展 UDF/UDTF/UDAF;

提供了更輕的 UDF/UDTF 擴(kuò)展能力,不需要任何依賴就可以完成函數(shù)的擴(kuò)展。

提供了豐富的大數(shù)據(jù)的能力

包括精確計(jì)算一次靈活的窗口,雙流 join,統(tǒng)計(jì),開窗,各種轉(zhuǎn)換過濾,滿足大數(shù)據(jù)開發(fā)的各種場景,支持彈性容錯(cuò)的能力。

2

RocketMQ Streams 的使用

RocketMQ Streams 對外提供兩種 SDK,一種是 DSL SDK,一種是 SQL SDK,用戶可以按需選擇;DSL SDK 支持實(shí)時(shí)場景 DSL 語義;SQL SDK 兼容 Blink(Flink 的阿里內(nèi)部版本) SQL 的語法,多數(shù) Blink SQL 可以通過 RocketMQ Streams 運(yùn)行;

接下來,我們詳細(xì)的介紹一下這兩種 SDK。

環(huán)境要求

JDK1.8 版本以上;

Maven 3.2 版本以上。

DSL SDK

利用 DSL SDK 開發(fā)實(shí)時(shí)任務(wù)時(shí),需要做如下的一些準(zhǔn)備工作:

依賴準(zhǔn)備

準(zhǔn)備工作完成后,就可以直接開發(fā)自己的實(shí)時(shí)程序。

代碼開發(fā)

其中:

1)Namespace 是業(yè)務(wù)隔離的,相同的業(yè)務(wù)可以寫成相同的 Namespace。相同的 Namespace 在任務(wù)調(diào)度里可以跑在進(jìn)程里,也可以共享一些配置;

2)pipelineName 可以理解成就是 job name ,唯一區(qū)分 job;

3)DataStreamSource 主要是創(chuàng)建 Source,然后這個(gè)程序運(yùn)行起來,最終的結(jié)果就是在原始的消息里面會(huì)加"--",然后把它打印出來。

豐富的算子

RocketMQ streams 提供了豐富的算子, 包括:

source 算子:包括 fromFile, fromRocketMQ, fromKafka 以及可以自定義 source 來源的 from 算子;

sink 算子:包括 toFile, toRocketMQ, toKafka,toDB,toPrint, toES 以及可以自定義 sink 的 to 算子;

action 算子:包括 Filter,Expression,Script,selectFields,Union,forEach,Split,Select,Join,Window 等多個(gè)算子。

部署執(zhí)行

基于 DSL SDK 完成開發(fā),通過下面命令打成 jar 包,執(zhí)行 jar,或直接執(zhí)行任務(wù)的 main 方法。

SQL SDK

依賴準(zhǔn)備

代碼開發(fā)

首先開發(fā)業(yè)務(wù)邏輯代碼, 可以保存為文件也可以直接使用文本;

其中

CREATE FUNCTION:引入外部的函數(shù)來支持業(yè)務(wù)邏輯, 包括 flink 以及系統(tǒng)函數(shù);

CREATE Table:創(chuàng)建 source/sink;

CREATE VIEW:執(zhí)行字段轉(zhuǎn)化,拆分,過濾;

INSERT INTO:數(shù)據(jù)寫入 sink;

函數(shù):內(nèi)置函數(shù),udf 函數(shù)。

SQL 擴(kuò)展

RocketMQ streams 支持三種 SQL 擴(kuò)展能力,具體實(shí)現(xiàn)細(xì)節(jié)請看:https://github.com/alibaba/rsqldb

1)通過 Blink UDF/UDTF/UDAF 擴(kuò)展 SQL 能力;

2)通過 RocketMQ streams 擴(kuò)展 SQL 能力,只要實(shí)現(xiàn)函數(shù)名是 eval 的 java bean 即可;

3)通過現(xiàn)有 java 代碼擴(kuò)展 SQL 能力,create function 函數(shù)名就是 java 類的方法名。

SQL 執(zhí)行

你可以從這里下載最新的 Rocketmq Streams 代碼并構(gòu)建。

解壓 tar.gz 包, 進(jìn)入目錄結(jié)構(gòu)

其目錄結(jié)構(gòu)如下

bin 指令目錄,包括啟動(dòng)和停止指令

conf 配置目錄,包括日志配置以及應(yīng)用的相關(guān)配置文件

jobs 存放 sql,可以兩級目錄存儲(chǔ)

ext 存放擴(kuò)展的 UDF/UDTF/UDAF/Source/Sink

lib 依賴包目錄

log 日志目錄

執(zhí)行 SQL

執(zhí)行多個(gè) SQL

如果想批量執(zhí)行一批 SQL,可以把 SQL 放到 jobs 目錄,最多可以有兩層,把 sql 放到對應(yīng)目錄中,通過 start 指定子目錄或 sql 執(zhí)行任務(wù)。

任務(wù)停止

日志查看

目前所有的運(yùn)行日志都會(huì)存儲(chǔ)在 log/catalina.out 文件中。

3

架構(gòu)設(shè)計(jì)及原理分析

RocketMQ Streams 設(shè)計(jì)思路

在了解完 RocketMQ streams 的基本簡介,接下來,我們看下 RocketMQ streams 的設(shè)計(jì)思路,設(shè)計(jì)思路主要從設(shè)計(jì)目標(biāo)和策略兩個(gè)方面來介紹:

設(shè)計(jì)目標(biāo)

依賴少,部署簡單,1 核 1g 單實(shí)例可部署,可隨意擴(kuò)展規(guī)模;

打造場景優(yōu)勢,重點(diǎn)打造大數(shù)據(jù)量 ->高過濾 ->輕窗口計(jì)算的場景,功能覆蓋度要全,實(shí)現(xiàn)需要的大數(shù)據(jù)特性:Exactly-ONCE、靈活的窗口(滾動(dòng)、滑動(dòng)、會(huì)話窗口);

要在保持低資源的前提下,對高過濾有性能突破,打造性能優(yōu)勢;

兼容 Blink SQL,UDF/UDTF/UDAF,讓非技術(shù)人員更容易上手。

策略(適配場景:大數(shù)據(jù)量>高過濾 /ETL>低窗口計(jì)算)

采用 shared-nothing 的分布式架構(gòu)設(shè)計(jì),依賴消息隊(duì)列做負(fù)載均衡和容錯(cuò)機(jī)制,單實(shí)例可啟動(dòng),增加實(shí)例實(shí)現(xiàn)能力擴(kuò)展,并發(fā)能力取決于分片數(shù);

利用消息隊(duì)列的分片做 shuffle,利用消息隊(duì)列負(fù)載均衡實(shí)現(xiàn)容錯(cuò);

利用存儲(chǔ)實(shí)現(xiàn)狀態(tài)備份,實(shí)現(xiàn) Exactly-ONCE 的語義。用結(jié)構(gòu)化遠(yuǎn)程存儲(chǔ)實(shí)現(xiàn)快速啟動(dòng),不等本地存儲(chǔ)恢復(fù)。

重力打造過濾優(yōu)化器,通過前置指紋過濾,同源規(guī)則自動(dòng)歸并,hyperscan 加速,表達(dá)式指紋提高過濾性能

RocketMQ Streams Source 的實(shí)現(xiàn)

1)Source 要求實(shí)現(xiàn)最少消費(fèi)一次的語義,系統(tǒng)通過 checkpoint 系統(tǒng)消息實(shí)現(xiàn),在提交 offset 前發(fā)送 checkpoint 消息,通知所有算子刷新內(nèi)存。

2)Source 支持分片的自動(dòng)負(fù)載和容錯(cuò)

數(shù)據(jù)源在分片移除時(shí),發(fā)送移除系統(tǒng)消息,讓算子完成分片清理工作;

當(dāng)有新分片時(shí),發(fā)送新增分片消息,讓算子完成分片初始化。

3)數(shù)據(jù)源通過 start 方法,啟動(dòng) consuemr 獲取消息;

4)原始消息經(jīng)過編碼,附加頭部信息包裝成 Message 投遞給后續(xù)算子。

RocketMQ Streams Sink 的實(shí)現(xiàn)

1)Sink 是實(shí)時(shí)性和吞吐的一個(gè)結(jié)合;

2)實(shí)現(xiàn)一個(gè) sink 只要繼承 AbstractSink 類實(shí)現(xiàn) batchInsert 方法即可。batchInsert 的含義是一批數(shù)據(jù)寫入存儲(chǔ),需要子類調(diào)用存儲(chǔ)接口實(shí)現(xiàn),盡量應(yīng)用存儲(chǔ)的批處理接口,提高吞吐;

3)常規(guī)的使用方式是寫 message->cache->flush->存儲(chǔ)的方式,系統(tǒng)會(huì)嚴(yán)格保證每次批次寫入存儲(chǔ)的量不超過 batchsize 的量,如果超過了,會(huì)拆分成多批寫入;

4)Sink 有一個(gè) cache,數(shù)據(jù)默認(rèn)寫 cache,批次寫入存儲(chǔ),提高吞吐(一個(gè)分片一個(gè) cache);

5)可以開啟自動(dòng)刷新,每個(gè)分片會(huì)有一個(gè)線程,定時(shí)刷新 cache 數(shù)據(jù)到存儲(chǔ),提高實(shí)時(shí)性。實(shí)現(xiàn)類:DataSourceAutoFlushTask;

6)通過調(diào)用 flush 方法刷新 cache 到存儲(chǔ);

7)Sink 的 cache 會(huì)有內(nèi)存保護(hù),當(dāng) cache 的消息條數(shù)>batchSize,會(huì)強(qiáng)制刷新,釋放內(nèi)存。

RocketMQ Streams Exactly-ONCE 實(shí)現(xiàn)

1)Source 確保在 commit offset 時(shí),會(huì)發(fā)送 checkpoint 系統(tǒng)消息,收到消息的組件會(huì)完成存盤操作,消息至少消費(fèi)一次;

2)每條消息會(huì)有消息頭部,里面封裝了 queueld 和 offset;

2)組件在存儲(chǔ)數(shù)據(jù)時(shí),會(huì)把 queueld 和處理的最大 offset 存儲(chǔ)下來,當(dāng)有消息重復(fù)時(shí),根據(jù) maxoffset 去重;

3)內(nèi)存保護(hù),一個(gè) checkpoint 周期可能有多次 flush(條數(shù)觸發(fā)),保障內(nèi)存占用可控。

RocketMQ Streams Window

實(shí)現(xiàn)方式:

1)支持滾動(dòng)、滑動(dòng)和會(huì)話窗口,支持事件時(shí)間和自然時(shí)間(消息進(jìn)入算子的時(shí)間);

2)支持 Emit 語法,可以在觸發(fā)前或觸發(fā)后,每隔 n 段時(shí)間,更新一次數(shù)據(jù);比如 1 小時(shí)窗口,窗口觸發(fā)前希望每分鐘看到最新結(jié)果,窗口觸發(fā)后希望不丟失遲到一天內(nèi)的數(shù)據(jù),且每 10 分鐘更新數(shù)據(jù)。

3)支持高性能模式和高可靠模式,高性能模式不依賴遠(yuǎn)程存儲(chǔ),但在分片切換時(shí),有丟失窗數(shù)據(jù)的風(fēng)險(xiǎn);

4)快速啟動(dòng),無需等待本地存儲(chǔ)恢復(fù),在發(fā)生錯(cuò)誤或分片切換時(shí),異步從遠(yuǎn)程存儲(chǔ)恢復(fù)數(shù)據(jù),同時(shí)直接訪問遠(yuǎn)程存儲(chǔ)計(jì)算;

5)利用消息隊(duì)列負(fù)載均衡,實(shí)現(xiàn)擴(kuò)容縮容容,每個(gè) queue 是一份組,一個(gè)分組同一刻只被一臺機(jī)器消費(fèi);

6)正常計(jì)算依賴本地存儲(chǔ),具備 flink 相似的計(jì)算性能。

4

RocketMQ Streams 在安全場景的最佳實(shí)踐

背景

從公共云轉(zhuǎn)戰(zhàn)專有云,遇到了新的問題。因?yàn)閷S性葡翊髷?shù)據(jù)這種 saas 服務(wù)是非必須輸出的,且最小輸出規(guī)模也比較大,用戶成本會(huì)增加很多,難落地,導(dǎo)致安全能力無法快速同步到專有云。

解決辦法

RocketMQ Streams 在云安全的應(yīng)用 - 流計(jì)算

基于安全場景打造輕量級計(jì)算引擎,基于安全高過濾的場景特點(diǎn),可以針對高過濾場景優(yōu)化,然后再做較重的統(tǒng)計(jì)、窗口、join 操作,因?yàn)檫^濾率比較高,可以用更輕的方案實(shí)現(xiàn)統(tǒng)計(jì)和 join 操作;

SQL 和引擎都可熱升級

業(yè)務(wù)結(jié)果

1)規(guī)則覆蓋:自建引擎,覆蓋 100% 規(guī)則(正則,join,統(tǒng)計(jì));

2)輕資源,內(nèi)存是公共云引擎的 1/24,cpu 是 1/6,依賴過濾優(yōu)化器,資源不隨規(guī)則線性增加,新增規(guī)則無資源壓力,通過高壓縮表,支持千萬情報(bào);

3)SQL 發(fā)布,通過 c/s 部署模式,SQL 引擎熱發(fā)布,尤其護(hù)網(wǎng)場景,可快速上線規(guī)則;

4)性能優(yōu)化,對核心組件進(jìn)行專題性能優(yōu)化,保持高性能,每實(shí)例(2g,4 核,41 規(guī)則)5000qps 以上。

5

RocketMQ Streams 的未來規(guī)劃

打造 RocketMQ 一體化計(jì)算能力

1)和 RocketMQ 整合,去除 DB 依賴,融合 RocketMQ KV;

2)和 RocketMQ 混部,支持本地計(jì)算,利用本地特點(diǎn),打造高性能;

3)打造邊緣計(jì)算最佳實(shí)踐

Connector 增強(qiáng)

1)支持 pull 消費(fèi)方式,checkpoint 異步刷新;

2)兼容 blink/flink connector。

ETL 能力建設(shè)

1)增加文件,syslog 的數(shù)據(jù)接入能力

2)兼容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳實(shí)踐

穩(wěn)定性和易用性打造

1)Window 多場景測試,提升穩(wěn)定性,性能優(yōu)化;

2)補(bǔ)充測試用例,文檔,應(yīng)用場景。

 

推薦搜索: RocketMQ Streams

“如果發(fā)現(xiàn)本網(wǎng)站發(fā)布的資訊影響到您的版權(quán),可以聯(lián)系本站!同時(shí)歡迎來本站投稿!