Spark 和 Flink 的 JDBC 不可靠

Mondo 社會 更新 2024-01-31

不要誤會我的意思,這是不可靠的,這意味著 Spark 和 Flink 都無法通過 JDBC 支援真正意義上的單詞流式讀取,並不是說它不可用。

至少,從目前兩者的官方檔案來看,或者通過我自己的實踐經驗來判斷。

那麼我們來談談JDBC的缺點,雖然它是一種通用的資料庫連線方式,但在流式讀取(或計算)方面。

資料來源的讀取分類

我們知道,隨著企業對資料處理的要求越來越高,這直接導致了我們的資料處理系統,對資料來源的讀取方式有了更多的可變要求。

從業務端的使用需求來看,資料來源的讀取方式和頻率大致可以分為兩類:

型別1:一次性,即一次性讀取目標系統(如資料庫)的所有資料,我們稱之為批處理

型別 2:連續,在型別 1 的基礎上,它仍然監視資料來源的變化,並繼續讀取後續的新資料和更改的資料,我們稱之為

首先是我們對讀取資料源的傳統需求,主流計算引擎 Spark 和 Flink 可以滿足。

但是,對於第二種型別,雖然 Spark 和 Flink 都支援流式計算的特性,但它們其實這種支援有乙個重要的前提,就是資料來源端以及與資料來源端對應的對接方式可以配合

適用於 Spark 的 JDBC

我之前吹噓過,任何儲存系統或資料庫,你可以按名稱命名,Spark都有乙個介面來讀取其中的資料,或者將計算結果儲存到其中。

誠然,Spark 是這樣做的,但是當我們想從特定資料庫流式傳輸資料源時,這有點讓人不知所措。

例如,我希望它從 MySQL 流式傳輸資料

我能想到的就是用它的結構化流式框架去嘗試讀取mysql,但是當我開啟官網,看到它支援的資料來源(最新)時,我不禁感到有些失望:

也就是說,資料來源中沒有官方支援以流式方式讀取的MySQL,也沒有提到JDBC。

但是,我以前的實踐經驗告訴我,有時我們不能完全相信官方的話,所以最好自己嘗試一下,以防萬一,對吧?

根據以往的經驗,我寫了如下核心**(記得提前在pom檔案中介紹對應的mysql-connector包):

它可能看起來像這樣,但一旦你執行它,你就會發現它:

果然不行,官網不會騙我的。

但我知道Spark絕對可以通過JDBC讀取MySQL資料來源。

所以,把核心**改成這個,就可以執行了:

只是這樣一來,違背了我的初衷,這個邏輯已經從我原來想要的流式計算變成了批處理,也就是經過這次修改,就不再是Spark結構化的流式處理,而是普通的Spark。

因此,Spark官員無法(至少到目前為止)以流式方式直接使用JDBC讀取資料源。

我在 GitHub 上看到乙個開源專案,官方原生支援支援的 JDBC 方法改造後,說可以支援使用 Spark Structured Streaming 增量讀取 MySQL 資料來源,我暫時還沒有驗證過,感興趣的同學可以看看(github.)。com/sutugin/spark-streaming-jdbc-source)。

用於 Flink 的 JDBC

開啟 Flink 官網,在 Flink Connector 列,出現 JDBC(沒有 MySQL):

在這種情況下,讓我們嘗試一下。

首先,你需要配置開發環境,與Spark不同,如果Flink想要讀取MySQL資料來源,就需要引入Flink獨有的JDBC Connector(非傳統的MySQL-Connector)。

注意這個版本的選擇,可能與官網上的描述有所不同,最新官方文件的版本由兩部分組成:聯結器版本+Flink版本,我的版本稍舊一些。

然後是**部分,如下(像上面的Spark,這裡只演示了讀取mysql資料,然後列印出來):

package com.anryg.mysql.jdbc

import j**a.time.duration

import org.apache.flink.contrib.streaming.state.embeddedrocksdbstatebackend

import org.apache.flink.streaming.api.checkpointingmode

import org.apache.flink.streaming.api.environment.checkpointconfig.externalizedcheckpointcleanup

import org.apache.flink.streaming.api.scala.streamexecutionenvironment

import org.apache.flink.table.api.bridge.scala.streamtableenvironment

desc:以JDBC模式讀取MySQL資料來源。

auther: anryg

date: 2023/11/8 10:49

object frommysql2print {

def main(args: array[string]):unit = {

val env = streamexecutionenvironment.getexecutionenvironment

env.enablecheckpointing(10000l)

env.setStateBackend(New EmbeddedRocksDbStateBackend(True)) 一種設定狀態後端的新方法。

env.getcheckpointconfig.setcheckpointstorage("hdfs:")

env.getcheckpointconfig.setexternalizedcheckpointcleanup(externalizedcheckpointcleanup.取消時保留)來設定檢查點記錄的保留策略。

env.getcheckpointconfig.setalignedcheckpointtimeout(duration.ofminutes(1l))

env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once)

val tableenv = streamtableenvironment.create(env)

第 1 步:讀取 MySQL 資料來源*

tableenv.executesql(

create table data_from_mysql(

client_ip` string,`domain` string,`time` string,`target_ip` string,`rcode` int,`query_type` int,`authority_record` string,`add_msg` string,`dns_ip` string,primary key(`client_ip`,`domain`,`time`,`target_ip`,`rcode`,`query_type`) not enforced

with('connector' = 'jdbc','url' = 'jdbc:mysql:', 'username' = '***', 'password' = '***', 'table-name' = 'test02'確定文字資料來源的分隔符。

.stripmargin)

結果直接列印*

tableenv.executesql(

select * from data_from_mysql limit 100

.stripmargin).print()

整個**內容與上一篇文章中寫的CDC閱讀mysql的方式非常相似(有興趣的可以去我之前的文章對比一下)。

但是,執行後,當程式完成對當前表中資料的讀取後,它會突然停止:

也就是說,雖然我們使用了 Flink 流計算的上下文(streamexecutionenvironment),但由於程式使用 JDBC 來讀取資料源,所以,它仍然只批量執行

在這方面,Flink 的行為與 Spark 完全相同。

最後

通過以上驗證可以確定,無論是 Spark 還是 Flink,都無法以 JDBC 的方式流式讀取 MySQL 資料來源(或其他資料庫),至少不可能直接使用官方正規軍的方式。

那麼,如果想直接通過計算引擎從某些資料庫(比如MySQL)中讀取增量資料,似乎最好的解決方案就是Flink CDC。

當然,JDBC也不是一無是處,對於一些較低版本的資料庫(CDC暫時不支援),比如MySQL 55及以下版本的歷史資料匯入,依然可以派上用場。

作者丨anryg

*丨*** Anruige 是一名程式設計師(ID:GH C12DC29AE2E7)。

DBAPLUS 社群歡迎技術人員的貢獻editor@dbapluscn

相關問題答案

    傳音Spark 20:一款新的廉價智慧型手機已經發布

    Tecno 最近推出了 Spark ,標誌著其 Spark 系列智慧型手機的開始。雖然尚未正式公布,但這款智慧型手機已在官方TECNO 傳音 展示會上首次亮相,展示了其全面的規格和設計。讓我們深入了解 Spark 的規格。因此,該裝置擁有寬敞的 英吋 LCD 面板提供 x 畫素的 HD 解像度,並擁...

    龍眼和龍眼的區別

    龍眼和龍眼其實指的是同一種水果。在不同的地區,這種水果可能有不同的名稱。在某些文化或地區,它被稱為 龍眼 而在另一些文化或地區,它被稱為 龍眼 它們都是指龍眼樹 Dimocarpus longan 的果實。同一種植物 不管叫龍眼還是龍眼,它們都來自同一棵熱帶果樹,屬於無患子科。外觀和味道 這種水果有...

    純金和黃金的區別

    純金和 是兩種不同的金屬材料,它們在成分 純度 顏色 質地等方面都有一定的差異。下面將詳細描述這兩種金屬。.成分。純金是指純度達到超過 而 是指元素週期表中化學符號為Au的金屬元素。因此,從成分上看,純金和 是同一種金屬,但純度不同。.純度。純金的純度非常高,通常達到 以上,故又稱千金。的純度相對較...

    與女人有染的技巧

    沒有戀愛經驗的小白,跟女人談戀愛,問題主要體現在 .時機不明確。推進關係太快,目的性太強,讓女人不舒服 關係推進太慢,錯過了視窗,關係停滯不前。 模稜兩可的尺度太大,容易說出來 規模太小,影響不大。針對第一點,時間不明確,我想給弟兄們三點建議。.當乙個女人快樂,心情好的時候。主要表現是,例如,女性對...

    大閘蟹和螃蟹的區別

    首先,形狀不同。大閘蟹體型較大,身體扁平,頭胸部寬度超過身體寬度。它們的體色通常是深綠色或深綠色,而腹部是乳白色。而螃蟹的體型相對較小,身體呈矩形或圓形,頭胸部和腹部的寬度相等。它們的體色通常是深紅色或棕色,而腹部是淡黃色。其次,習慣不同。大閘蟹是主要生活在河流和湖泊中的淡水生物。它們喜歡在水中游泳...