1. 背景
在當前全行業、全公司降本增效的背景下,嗶哩嗶哩也在積極推進實時、一流的業務資源整合,向雲原生架構遷移,統一資源池和排程,提高資源利用效率。 但真正的問題是,不同業務場景下資源的規格和需求是不同的。
由於業務性質的原因,業務資源池一般只有很強的算力,基本不具備儲存和IO能力。 Flink 雖然是乙個計算引擎,但由於其有狀態特性,在很多計算場景中對儲存和 IO 都有很強的要求,因此實時資源池也具有很強的儲存和計算能力。 考慮到大資料存算分離的整體發展趨勢,我們嘗試用存算分離來改造Flink,核心工作是statebackend的遠端化。
二、痛點
flink 中的 statebackend 用於儲存任務狀態。 在使用上,分為operatorstatebackend和keyedstatebackend,operatorstatebackend一般儲存一些與計算邏輯本身無關的資料,比如Kafka的偏移量,比較小,不會受到資料規模和計算邏輯的影響,而keyedstatebackend則相反,儲存的是與計算邏輯強繫結的狀態資料, 如agg雙流連線的中間結果大小受資料規模和計算邏輯的影響。
嗶哩嗶哩上有4000+個流式計算任務,其中95%是SQL任務,50%有狀態,數百個任務狀態大小超過500GB。 目前預設的statebackend使用rocksdbstatebackend,在實現中支援增量快照,可以減少大量重複的狀態資料上傳到檔案系統,而歷史的flink集群機器配置了相對高效能的磁碟,可以支援將TaskManager中的大狀態本地儲存到rocksdb,當任務是checkpoint時, 增量狀態檔案上傳到檔案系統,無論 keyedstate 大小是 0 還是 TB,檔案系統都可以支援所有流式計算任務。現有環境將存在以下兩個痛點:
1.整體磁碟利用率較低
所有 Flink 機器都配備了高效能的大盤,可以支援超大狀態任務的健康執行,並且我們預設配置了狀態的 retentiontime,80% 的任務在 100 GB 狀態內是小的或無鍵的,TaskManager 執行的機器整體利用率明顯偏低, 而如果機器沒有處於大型狀態,那麼當前機器的高效能大盤顯然是浪費的。
2.大任務重新縮放很慢
在重新擴縮容超大狀態時,任務會先從檔案系統中移除狀態資料** 當狀態處於TB級別時,狀態資料的重新分發成本比較大,恢復時間大約需要半小時,使用者體驗會很差,容易增加引擎端的duty成本。
3. 遠端狀態後端
解決上述痛點,一是狀態資料需要實時儲存在遠端服務中,減少Flink集群對磁碟的強依賴,實現儲存和計算分離,這也符合雲原生架構的演進目標另一種是狀態資料可以儲存在金鑰組單元中,以避免資料重新分發操作。
在與分布式儲存團隊溝通後,其自研的泰山(B站分布式KV儲存[1])儲存基本可以滿足我們的需求,泰山儲存基於RockSDB和SparrowDB的改造,採用Raft一致性協議來保證多副本資料的一致性,構建乙個高可靠、高可用、高效能、高擴充套件的儲存系統。 泰山儲存提供了 j**a 的 Put Get del Scan 等 API,可以具備快照功能,在 API 層面與 RocksDB 的功能基本一致,並且可以支援橫向擴充套件和公升級,完全可以滿足 Flink 的需求,經過多次溝通和功能支援,具備了構建 TaishanStatebackend 的條件。
1.狀態切換保證
在 KeyedStateBackend 中,每條要計算的資料都進入子任務,並根據資料的鍵進行傳遞"mathutils.murmurhash(key's hashcode) %maxparallelism"計算乙個id作為keygroupid,可以保證同一key的資料在同乙個子任務中計算,不會造成計算引擎中亂序的現象,maxparallelism為最大並行度,任務啟動後其值不會改變。 根據 keygroupid 將每條資料歸屬於乙個鍵組分片,鍵組分片總數為 maxparallelism,每個分片是不能修改的,因為重啟前後不允許修改 maxparallelism 的值,當任務重新擴容時,鍵組下的資料會為狀態移動, 如下圖所示。
泰山儲存有分片的概念,類似於 Flink 的 keygroup,泰山的分片有 ** 和合併的能力,我們只需要同意 Flink 的泰山表禁用了這個功能。 Flink 的另乙個關鍵點是 CheckPoint,RocksDBsDBSuptendEnd 的增量 checkpoint 是在同步過程中對 RocksDB 做乙個快照,非同步程序在任務重啟時將修改後的 SST 檔案上傳到檔案系統進行恢復,Taishan Storage 提供了在每個分片上建立和恢復快照的能力,而 Flink 在做 checkpoint 時可以做 checkpoint 在每個子任務中, 在 keygroupRange 中從頭到尾進行快照,snapshotId 與 Flink 的 checkpointid 相同。
2.Statebackend 拓撲設計
Keyed StateBackend 負責核心狀態資料管理,託管狀態分為 keyed state 和 priority queue state。 鍵控狀態有基本的 internalvaluestate 和 internalmapstate,以及複雜的 internalliststate、internalaggregatingstate、internalReducingState 和 internalfoldingState,不同型別的狀態會應用於不同 SQL 場景下的運算元優先順序佇列狀態需要由 InternalPriorityQueue 實現。 Flink Flow Computation 的 checkpoint 機制是任務在過程中執行時可靠性的基石** 如果發生故障,可以將 checkpoint 的資訊恢復到故障前的某個狀態,然後從該狀態恢復任務的執行,checkpoint 的執行策略根據 snapshotstrategy 的實現確定, 並且根據 restoreoperation 的實現確定 checkpoint 的恢復策略 statebackend 的整體拓撲如下圖所示。
3.台山State後端架構設計
TaishanStateBackend是根據上述規則和泰山儲存做出以下約定的:
taishan 表的 keygroupid 和 shardid 是一對一對映的。
taishan 表中的分片數量與 Flink 的 maxparallelism 相同,並且總是不可避免地被合併。
當任務首次啟動時,每個具有 keyedstate 的運算元只會建立乙個 taishan 表。
乙個運算子下可能有多個狀態,傳送的 KV 資料的 K 以 columnfamily 字首為字首。
當 flink 作為 checkpoint 時,會為 Taishan 中的每個分片建立乙個快照,並且每個分片的 snapshotid 是相同的。
Flink 在做任務恢復時,本質是根據 snapshotid 恢復泰山的每個分片。
修改遠端儲存架構後,主要優點如下:
1)檢查點更輕
RocksDBsNationalBackend 的狀態儲存在子任務的本地 RocksDB 中,只有在做 checkpoint 的時候,才會根據 snapshotstrategy 決定是以增量還是全量模式上傳到檔案系統,上傳的內容分為元資料和 keyedstate 內容,修改為 taishanstatebackend 時, 只需要將元資料元資料檔案上傳到檔案系統中,單個分片的快照過程就可以在毫秒級完成。
2)儲存和計算分離
使用TaishanStateBackend後,有狀態運算元不需要節點機擁有高效能磁碟,狀態資料儲存在遠端泰山系統中,減少了容器機對磁碟的強依賴性,從而達到儲存和計算分離的效果。
3) 加速任務重擴
在 RocksDB StateBackend 中對任務進行擴容或擴縮容時,由於子任務下對應的 KeyGroupRange 的起始和結束發生變化,需要根據子任務之間的 keygroupid 字首對本地 RocksDB 例項進行狀態尋道和 batchput 操作,從而根據金鑰組重新分配狀態資料。使用 TaishanStateBackend 後,該任務無需遷移狀態資料進行伸縮操作,因為每個金鑰組都與分片一一對應,只需要修改金鑰組範圍的開始和結束。
四、優化
在 rocksdbstatebackend 的前期,我們已經對 read、write、delete、readnull、seek 等請求的狀態的耗時、請求量、資料包長度等做了指標統計。 通過對線上執行的任務指標的分析可以看出,狀態每秒讀寫請求數最高可達100萬,每秒10萬個讀寫請求任務有上百個,在group agg和window agg場景下,每個請求包的大小一般為幾十位元組, 而即使在 ZSTD 壓縮之後,聯接場景中單個狀態的值仍將存在於數百 kb 中。
在功能測試中發現,使用TaishanStateBackend的任務在使用相同資源時CPU負載較高,並且發現當狀態下的RPC請求數量較大時,網路消耗的CPU佔比較大。 我們狀態下的資料從本地儲存轉換為遠端儲存後,每個請求都是由網路 RPC 請求發出的,雖然 RocksDB 中沒有堆外記憶體的消耗,但是對網路的依賴性會很大,當 RPC 請求達到一定閾值時,網路必然會成為當前場景下的瓶頸。 為了解決這個問題,我們選擇在狀態和遠端KV儲存之間增加一層快取層,以減少對網路的請求讀取。 整體結構如下,下面將逐步說明。
1.寫入優化
1) 優化寫入
而不是將快取中的結果寫入遠端,或者在put remove時直接與遠端互動,將put遠端請求放入當前子任務的blockingqueue中,並使用put執行緒來消費put請求量達到一定閾值時,將請求量重新整理到遠端KV儲存中, 或者當 Flink 作為檢查點時觸發重新整理操作。新增指標和壓測後發現,當最大批量大小設定為 800 時,對遠端 KV 儲存的 PUT 請求只會增加 2-4 倍,整體網路寫入請求量會減少 100 倍,提高了狀態的寫入速率。
2.讀取優化
1) 快取加速讀取
而不是將快取中的結果寫入遠端,或者在put remove時直接與遠端互動,將put遠端請求放入當前子任務的blockingqueue中,並使用put執行緒來消費put請求量達到一定閾值時,將請求量重新整理到遠端KV儲存中, 或者當 Flink 作為檢查點時觸發重新整理操作。新增指標和壓測後發現,當最大批量大小設定為 800 時,對遠端 KV 儲存的 PUT 請求只會增加 2-4 倍,整體網路寫入請求量會減少 100 倍,提高了狀態的寫入速率。
2) Readnull 優化
在實踐過程中,會發現以下兩種場景都存在較大的 readnull 請求,即快取獲取的結果為 null,從遠端獲取的結果也是 null。
當任務的 key 稀疏時,通過指標會發現大量的 readnull 請求,尤其是在去重和視窗 agg hop 場景下。
當任務的 key 按天或按小時週期性變化時,會產生大量的 readnull 請求,尤其是在 group agg 場景下。
由於 readnull 請求較多,經過調查,我們最初嘗試使用 Hadoop 的 bloomfilter 來過濾 readnull 請求,當 Flink 進行 checkpoint 時,將 bloomfilter 結果寫入遠端儲存系統,在任務恢復時可以恢復任務。 但是,無論如何調整 BloomFilter 的係數和容量,隨著時間的推移,BloomFilter 的過濾效果越來越低,誤報率越來越高,無法解決 ReadNull 的根本問題。
我們內部的 flink bsql 任務目前對狀態有預設的 TTL 設定為 24 小時,當超過 24 小時未訪問狀態的資料時,狀態會過期,然後在 rocksdb 做 compaction 時刪除,基本可以滿足大部分使用者的實際使用。
經過團隊和使用者的排查總結,發現在視窗 5 10 分鐘和雙流加入 1 2 小時的使用者業務行為中存在很多場景,所以 24 小時 TTL 對於這類情況非常豐富,在這種情況下,我們可以假設大多數場景下資料都不會有延遲, 所以任務在5分鐘或1小時內的實際有效金鑰會比24小時內少很多。在這種情況下,可以選擇為快取分配一定比例的記憶體,以建立乙個獨立的 kv 快取使用逐出策略刪除資料時,可以自定義刪除邏輯,刪除邏輯是判斷當前時間與失效的絕對時間的對比, 如果逐出資料不滿足當前時間,小於過期時間,則會丟擲OOM異常,並新增相關記憶體或調整KV快取比例。
由於 GC 對堆記憶體的限制,以及研究使用堆外記憶體作為快取的預設選擇,只有在使用堆外記憶體時才允許使用這種獨立的 KV 快取,我們稱之為 offheapbloomfilter,下面將解釋堆內外的選擇。 OffHeapBloomFilter 會將所有有效鍵快取在記憶體中,充當 bloomfilter,從而過濾大量 readnull 請求。
當任務重啟時,OffHeapBloomFilter 會在第乙個 TTL 時間內過期,但它會記錄狀態鍵前後的資料,OffHeapBloomFilter 會在第乙個 TTL 時間後開始正常工作,可以保證 Bloomfilter 的準確性和有效性。
由於視窗 agg 的鍵控狀態使用優先順序佇列狀態按照視窗的順序進行清理,因此其鍵控狀態預設沒有 TTL。CTL 是 Assigner 的視窗時間加上允許的延遲時間,一般用於做任務重啟、失敗或累加的緩衝時間,避免因任務異常時間過長導致資料結果不滿意的問題,導致狀態為 null。同樣,我們也分別對間隔加入和兩種延遲雙流加入的鍵控狀態提供了相應的支援,TTL設定為左右流的最大時間加上允許的延遲時間。
3.記憶體模型優化
在快取讀優化中,快取使用的記憶體是任務堆的堆記憶體,與使用者JVM的堆記憶體共享,快取使用高效能快取庫Caffeine。 當乙個運算元至少有乙個狀態時,建立乙個泰山表,子任務會根據它有多少個鍵控狀態來決定建立多少個咖啡因快取,如下圖所示:
這樣的結構會遇到兩個問題:
建立的咖啡因快取數等於 n*m,其中 n 是當前 TaskManager 中的插槽數,m 是鍵控狀態數。 由於流式計算前後可能存在AGG或filter,快取前後快取處理的狀態資料量不同,導致快取難以根據時間或數量配置逐出策略,且快取物件之間不共享,頻繁更新的快取記憶體無法充分利用。
研究發現,經過G1引數[2]的多次調整後,GC時間仍然很高,並且不時出現GC高耗時抖動的現象,這使得任務不那麼流暢,影響了使用者體驗。
在研究了 RocksDBSdBStateBackend [3] 的堆外記憶體機制和市面上的堆外記憶體框架後,我們選擇使用 Off Heap Cache(OHC)框架 [4] 來製作快取,並生成了如下結構。 Caffeine Cache 被 OHC Cache 取代,記憶體從 Task HEAP 轉換為 Managed OffHeap,多個子任務共享當前插槽分配的 OHC Cache,並且分配的快取物件唯一共享,以上兩個主要問題在這樣的替換後可以完全解決,執行效率基本符合預期。
由於早期泰山儲存中沒有使用 ColumnFamily 的概念,而 KeyGroupID 作為 API 級別的引數放在 API 中,所以原 RocksDB 的 K 中的 KeyGroupID 被 ColumnFamily 替換,並且由於 OHC 快取在堆外共享,並且可能存在多個狀態,具有相同的 columnfamily 和 key, 所以為了防止運算元影響彼此的狀態資料,將運算元的前八位作為字首新增到泰山狀態的K中。
此外,對於 OHC 快取,我們預設選擇 OffHeapLinkedLrumap,其記憶體模型結構如下圖所示,並進行了一些適配:
更改了 hashtablesize 並禁用了 map 的自動重新雜湊,以防止鍵迭代器呼叫導致資料不準確。
修改了 LRU 的逐出邏輯,ohc 在 cacheserializer 中新增了 elementcouldremove 方法,在 OHC 逐出資料之前會呼叫該方法判斷是否可以移除資料,而 flink 端 Serializer 上的值只需要實現判斷當前時間是否大於過期時間, 即可以與非同步延遲時間重新整理對齊,確保無效資料已寫入遠端儲存,另乙個用於 offheaplumfilter 中資料失效的逐出策略邏輯。
雖然OHC的超時觸發邏輯可以通過指標準確檢視快取中堆外記憶體的實際使用情況,但對任務的吞吐量有負面影響,因此預設關閉超時功能。
5. 現在和未來
目前,嗶哩嗶哩自 2022 年 11 月初開始,已逐步切換了 100+ 個具有 keystate 的線上任務,由於 RPC 網路開銷的增加,整體資源使用量略有增加,但儲存計算分離、加速任務擴容的目的已基本實現,整體符合預期。
當然也有一些特殊情況,需要優化的主要專案如下:
高QPS場景:在一些業務場景中,金鑰非常稀疏,快取命中率較低,當狀態資料量足夠大時,offheapbloomfilter 的記憶體要求過高,在降本增效的環境下難以實現,關閉 offheapbloomfilter 會導致大量 readnull 現象, 一般表現為單個子任務的grPC請求QPS過高,網路壓力大。
大鍵值場景:如果雙流加入場景下存在較大值,在多字段去重場景下存在較大金鑰,執行一段時間後會出現寫卡頓現象,影響任務的健康執行,這也是儲存團隊下一階段需要做進一步聯合除錯和優化的地方。
狀態分層儲存:目前快取的實現是使用堆外記憶體作為儲存介質,在上述高QPS和大鍵值場景下,由於記憶體空間限制,快取命中率會下降。 未來,我們計畫參考 Flink Forward Asia 2022 中提到的分層狀態後端的思想,同時使用機器上的磁碟和記憶體作為快取加速資源,同時保持遠端儲存上的狀態資料完整,形成一套分層狀態儲存架構,不僅可以解決單個快取介質的容量限制, 而且在混合零件的情況下,也更有效地提高了機器的資源利用率。接下來,Flink 的實時任務將通過 K8S 平台和 ** 業務進行統一的混合和管理,並且隨著嗶哩嗶哩的 ** 業務正在積極推進“無盤”的轉型,本地閒置的 SSD 磁碟資源可以更好地被 Flink 任務用作分層狀態儲存的快取資源, 進一步提高混合部件技術下機器資源的利用率。
在進行上述優化項的同時,將進一步提公升taishanstatebackend的覆蓋率,並選擇合適的任務部署在混合部分集群中,最終達到預設啟用的效果。
引用
作者丨Zhang**&Cao Jie**丨***嗶哩嗶哩科技(ID:Bilibili-TC) DBAPLUS社群歡迎技術人員投稿,投稿郵箱:editor@dbapluscn