不要誤會我的意思,這是不可靠的,這意味著 Spark 和 Flink 都無法通過 JDBC 支援真正意義上的單詞流式讀取,並不是說它不可用。
至少,從目前兩者的官方檔案來看,或者通過我自己的實踐經驗來判斷。
那麼我們來談談JDBC的缺點,雖然它是一種通用的資料庫連線方式,但在流式讀取(或計算)方面。
資料來源的讀取分類
我們知道,隨著企業對資料處理的要求越來越高,這直接導致了我們的資料處理系統,對資料來源的讀取方式有了更多的可變要求。
從業務端的使用需求來看,資料來源的讀取方式和頻率大致可以分為兩類:
型別1:一次性,即一次性讀取目標系統(如資料庫)的所有資料,我們稱之為批處理
型別 2:連續,在型別 1 的基礎上,它仍然監視資料來源的變化,並繼續讀取後續的新資料和更改的資料,我們稱之為流
首先是我們對讀取資料源的傳統需求,主流計算引擎 Spark 和 Flink 可以滿足。
但是,對於第二種型別,雖然 Spark 和 Flink 都支援流式計算的特性,但它們其實這種支援有乙個重要的前提,就是資料來源端以及與資料來源端對應的對接方式可以配合
適用於 Spark 的 JDBC
我之前吹噓過,任何儲存系統或資料庫,你可以按名稱命名,Spark都有乙個介面來讀取其中的資料,或者將計算結果儲存到其中。
誠然,Spark 是這樣做的,但是當我們想從特定資料庫流式傳輸資料源時,這有點讓人不知所措。
例如,我希望它從 MySQL 流式傳輸資料
我能想到的就是用它的結構化流式框架去嘗試讀取mysql,但是當我開啟官網,看到它支援的資料來源(最新)時,我不禁感到有些失望:
也就是說,資料來源中沒有官方支援以流式方式讀取的MySQL,也沒有提到JDBC。
但是,我以前的實踐經驗告訴我,有時我們不能完全相信官方的話,所以最好自己嘗試一下,以防萬一,對吧?
根據以往的經驗,我寫了如下核心**(記得提前在pom檔案中介紹對應的mysql-connector包):
它可能看起來像這樣,但一旦你執行它,你就會發現它:
果然不行,官網不會騙我的。
但我知道Spark絕對可以通過JDBC讀取MySQL資料來源。
所以,把核心**改成這個,就可以執行了:
只是這樣一來,違背了我的初衷,這個邏輯已經從我原來想要的流式計算變成了批處理,也就是經過這次修改,就不再是Spark結構化的流式處理,而是普通的Spark。
因此,Spark官員無法(至少到目前為止)以流式方式直接使用JDBC讀取資料源。
我在 GitHub 上看到乙個開源專案,官方原生支援支援的 JDBC 方法改造後,說可以支援使用 Spark Structured Streaming 增量讀取 MySQL 資料來源,我暫時還沒有驗證過,感興趣的同學可以看看(github.)。com/sutugin/spark-streaming-jdbc-source)。
用於 Flink 的 JDBC
開啟 Flink 官網,在 Flink Connector 列,出現 JDBC(沒有 MySQL):
在這種情況下,讓我們嘗試一下。
首先,你需要配置開發環境,與Spark不同,如果Flink想要讀取MySQL資料來源,就需要引入Flink獨有的JDBC Connector(非傳統的MySQL-Connector)。
注意這個版本的選擇,可能與官網上的描述有所不同,最新官方文件的版本由兩部分組成:聯結器版本+Flink版本,我的版本稍舊一些。
然後是**部分,如下(像上面的Spark,這裡只演示了讀取mysql資料,然後列印出來):
package com.anryg.mysql.jdbc
import j**a.time.duration
import org.apache.flink.contrib.streaming.state.embeddedrocksdbstatebackend
import org.apache.flink.streaming.api.checkpointingmode
import org.apache.flink.streaming.api.environment.checkpointconfig.externalizedcheckpointcleanup
import org.apache.flink.streaming.api.scala.streamexecutionenvironment
import org.apache.flink.table.api.bridge.scala.streamtableenvironment
desc:以JDBC模式讀取MySQL資料來源。
auther: anryg
date: 2023/11/8 10:49
object frommysql2print {
def main(args: array[string]):unit = {
val env = streamexecutionenvironment.getexecutionenvironment
env.enablecheckpointing(10000l)
env.setStateBackend(New EmbeddedRocksDbStateBackend(True)) 一種設定狀態後端的新方法。
env.getcheckpointconfig.setcheckpointstorage("hdfs:")
env.getcheckpointconfig.setexternalizedcheckpointcleanup(externalizedcheckpointcleanup.取消時保留)來設定檢查點記錄的保留策略。
env.getcheckpointconfig.setalignedcheckpointtimeout(duration.ofminutes(1l))
env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once)
val tableenv = streamtableenvironment.create(env)
第 1 步:讀取 MySQL 資料來源*
tableenv.executesql(
create table data_from_mysql(
client_ip` string,`domain` string,`time` string,`target_ip` string,`rcode` int,`query_type` int,`authority_record` string,`add_msg` string,`dns_ip` string,primary key(`client_ip`,`domain`,`time`,`target_ip`,`rcode`,`query_type`) not enforced
with('connector' = 'jdbc','url' = 'jdbc:mysql:', 'username' = '***', 'password' = '***', 'table-name' = 'test02'確定文字資料來源的分隔符。
.stripmargin)
結果直接列印*
tableenv.executesql(
select * from data_from_mysql limit 100
.stripmargin).print()
整個**內容與上一篇文章中寫的CDC閱讀mysql的方式非常相似(有興趣的可以去我之前的文章對比一下)。
但是,執行後,當程式完成對當前表中資料的讀取後,它會突然停止:
也就是說,雖然我們使用了 Flink 流計算的上下文(streamexecutionenvironment),但由於程式使用 JDBC 來讀取資料源,所以,它仍然只批量執行
在這方面,Flink 的行為與 Spark 完全相同。
最後
通過以上驗證可以確定,無論是 Spark 還是 Flink,都無法以 JDBC 的方式流式讀取 MySQL 資料來源(或其他資料庫),至少不可能直接使用官方正規軍的方式。
那麼,如果想直接通過計算引擎從某些資料庫(比如MySQL)中讀取增量資料,似乎最好的解決方案就是Flink CDC。
當然,JDBC也不是一無是處,對於一些較低版本的資料庫(CDC暫時不支援),比如MySQL 55及以下版本的歷史資料匯入,依然可以派上用場。
作者丨anryg
*丨*** Anruige 是一名程式設計師(ID:GH C12DC29AE2E7)。
DBAPLUS 社群歡迎技術人員的貢獻editor@dbapluscn