ACK One Argo Workflows 在任務編排中實現動態扇出風扇

Mondo 科技 更新 2024-02-07

作者:莊宇。

在工作流編排過程中,為了加快大任務處理效率,可以使用扇出扇入任務編排,將大任務分解為小任務,然後並行執行小任務,最後聚合結果。

從上圖可以看出,可以使用DAG(Directed Acyclic Graph)來編排扇出扇入任務,子任務分為靜態和動態,分別對應靜態DAG和動態DAG。 動態 dag 扇出扇入也可以理解為 mapreduce。 每個子任務都是乙個對映,最終的聚合結果是遞減的。

靜態匕首:拆分子任務分類是固定的,例如,在資料採集場景中,同時採集資料庫 1 和資料庫 2 的資料,最後對結果進行聚合。

動態匕首:拆分子任務分類是動態的,取決於上乙個任務的輸出,例如,在乙個資料處理場景中,任務A可以掃瞄待處理的資料集,對每個子資料集(例如乙個子目錄)開始子任務bn處理,當所有子任務bns完成後,將結果聚合到子任務C中,啟動多少個子任務b取決於任務A的輸出。 您可以根據實際業務場景,自定義任務A中子任務的拆分規則。

在實際業務場景中,為了加快大任務的執行速度,提高效率,往往需要將乙個大任務分解成上千個子任務,為了保證上千個子任務的同時執行,需要排程數以萬計的CPU資源。 例如,修改後的演算法的回歸測試需要由乙個子任務針對所有驅動場景執行,開發團隊要求所有子場景並行測試,以加快迭代速度。

如果您需要使用動態 DAG 來編排資料處理、計算和科學計算場景中的任務,或者需要排程數萬個 CPU 資源來加速任務執行,您可以使用阿里雲 Ack One 分布式工作流 Argo 集群

ACK ONE分布式工作流Argo集群,產品化託管Argo Workflow提供售後支援,支援動態DAG扇出扇入任務編排,支援雲算力按需排程,利用雲彈性排程數萬CPU資源並行執行大規模子任務,操作完成後及時資源,減少執行時間,節約成本。 支援資料處理、機器學習、計算、科學計算、CICD等業務場景。

Argo Workflow 是乙個開源的 CNCF 畢業專案,專注於雲原生領域的工作流編排,使用 Kubernetes CRD 編排離線任務和 DAG 工作流,使用 Kubernetes Pod 在集群中排程執行。

本文介紹如何使用 Argo Workflow 編排動態 DAG 扇出扇入任務。

我們將構建乙個動態的 DAG 扇出扇入工作流,在 OSS 中讀取乙個大日誌檔案,將其拆分為多個小檔案,啟動多個子任務來計算每個小檔案中的關鍵字數量(count),最後聚合結果(merge)。

1.建立分布式工作流 ARGO 集群

2.通過附加阿里雲 OSS 儲存卷,您可以像處理本地檔案一樣操作阿里雲 OSS 上的檔案。 參考:工作流使用儲存卷

3.使用以下工作流 yaml 建立工作流,請參閱: 建立工作流。有關詳細資訊,請參閱注釋。

apiversion: argoproj.io/v1alpha1kind: workflowmetadata: generatename: dynamic-dag-map-reduce-spec: entrypoint: main # claim a oss pvc, workflow can read/write file in oss through pvc. volumes: -name: workdir persistentvolumeclaim: claimname: pvc-oss # how many tasks to split, default is 5. arguments: parameters: -name: numparts value: "5" templates: -name: main # dag definition. dag: tasks: # split log files to several small files, based on numparts. -name: split template: split arguments: parameters: -name: numparts value: "}" # multiple map task to count words in each small file. -name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" # run as a loop, partid from split task json outputs. withparam: '}' - name: reduce template: reduce arguments: parameters: -name: numparts value: "}" depends: "map" # the `split` task split the big log file to several small files. each file has a unique id (partid). # finally, it dumps a list of partid to stdout as output parameters - name: split inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["split.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # one `map` per partid is started. finds its own "part file" and processes it. -name: map inputs: parameters: -name: partid container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["count.py"] env: -name: part_id value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # the `reduce` task takes the "results directory" and returns a single result. -name: reduce inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["merge.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol outputs: artifacts: -name: result path: /mnt/vol/result.json
4.動態 DAG 實現。

1)拆分大檔案後,拆分任務會在標準輸出中輸出乙個json字串,包括:子任務要處理的partid,例如:

2)map任務使用withparam引用拆分任務的輸出,解析json字串得到所有的},並以每個}作為輸入引數啟動多個map任務。

- name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" withparam: '}'
有關如何定義它的更多資訊,請參閱開源 Argo 工作流文件5.工作流執行後,通過分布式工作流 Argo 集群控制台執行檢視任務 DAG 程序和執行結果。

6.阿里雲OSS檔案列表,log-count-datatxt 是輸入日誌檔案,split-output,cout-output 中間結果目錄,resultjson 是最終結果檔案。

7.示例中源資訊詳見aliyuncontainerservice github argo-workflow-examples

Argo Workflow 是乙個開源的 CNCF 畢業專案,專注於雲原生領域的工作流編排,使用 Kubernetes CRD 編排離線任務和 DAG 工作流,使用 Kubernetes Pod 在集群中排程執行。

阿里雲 ACK One 分布式工作流 Argo Cluster,產品化託管 Argo Workflow,提供售後支援,加強控制面,實現數萬個子任務(Pod)的穩定高效排程和運營,資料面支援雲上大規模算力的無伺服器排程,無需運維集群或節點,支援雲算力按需排程, 利用雲彈性,排程數萬CPU資源並行執行大規模子任務,減少執行時間,支援資料處理、機器學習、**計算、科學計算、CICD等業務場景。

歡迎加入ACK ONE客戶溝通釘釘群與我們交流。 (釘釘群號:35688562)。

1] 阿里雲 Ack One 分布式工作流 Argo 集群。

2] argo workflow

3] 建立分布式工作流 argo 集群。

4] 工作流使用儲存卷。

5] 建立工作流。

6] 開源 Argo 工作流文件。

walk-through/loops/

7] 分布式工作流 Argo 集群控制台。

8] aliyuncontainerservice github argo-workflow-examples

相關問題答案

    企業服務 HA 系統審批流程 工作流的產品設計

    為了滿足企業業務管控的需要,審批流程存在於各種各樣的業務系統中。審批一般分為一般審批和業務審批。一般的審批一般可以通過釘釘 飛書等辦公OA自帶的審批功能來完成,非常成熟,不用多說業務審批往往與業務文件耦合,如採購訂單的審批等,毫無疑問,相當一部分業務審批的可用性相對較差,今天我們就來談談如何設計乙個...

    智慧型臥式迴轉櫃產品優勢及工作流程

    智慧型水平轉盤是一種現代儲存裝置,具有許多優點和高效的工作流程。科明智庫將為您介紹智慧型臥式迴轉櫃的產品優勢和工作流程。智慧型臥式旋轉櫃的產品優勢之一是空間利用率高。智慧型水平轉盤採用垂直旋轉設計,通過最大限度地利用垂直空間來存放物品。每個櫃子都配有多層貨架,可以靈活地調節和分配貨架的高度,以容納不...

    日本遊戲製造商公布了 AI 工作流程示例

    雖然我們還在期待AI將如何改變生活和工作,但刻板印象中 自給自足 的日本遊戲開發者已經 開放 並且已經使用了AI工作流程。無論你是遊戲玩家還是業內人士,這篇最新發布的資料分析一定讓你有所收穫,一起來看看吧來自知名廠商Level 的AI應用在遊戲開發和推廣中的應用 關於級 首先,我們來介紹一下本文的主...

    5 種 AI 工具,讓您的工作流程更智慧型

    生成式人工智慧的熱潮始於去年ChatGPT的出現,在短短一年的時間裡,該技術已經整合到各種生產力平台中,大大降低了進入門檻和操作我們日常工作流程的難度。我知道,很多朋友在聽到 AI在工作場景中 時,首先擔心的是會不會被新技術取代。別擔心,本文中討論的工具並不是要取代人類,而是旨在提高工作效率。這些人...

    使用乙個易於銷售的工作流程,高效處理 N 個複雜的業務場景

    隨著企業業務的不斷擴充套件,企業現有的業務流程已無法支撐複雜的業務需求現有流程無法支援日益複雜的業務需求,定製化新業務流程的開發費時費力,業務流程效率低下為了幫助企業更好地適應不斷變化的業務需求,SalesEasy推出了工作流功能,支援個性化流程的構建,支援業務的快速發展,為提高企業業務運營效率提供...