作者:莊宇。
在工作流編排過程中,為了加快大任務處理效率,可以使用扇出扇入任務編排,將大任務分解為小任務,然後並行執行小任務,最後聚合結果。
從上圖可以看出,可以使用DAG(Directed Acyclic Graph)來編排扇出扇入任務,子任務分為靜態和動態,分別對應靜態DAG和動態DAG。 動態 dag 扇出扇入也可以理解為 mapreduce。 每個子任務都是乙個對映,最終的聚合結果是遞減的。
靜態匕首:拆分子任務分類是固定的,例如,在資料採集場景中,同時採集資料庫 1 和資料庫 2 的資料,最後對結果進行聚合。
動態匕首:拆分子任務分類是動態的,取決於上乙個任務的輸出,例如,在乙個資料處理場景中,任務A可以掃瞄待處理的資料集,對每個子資料集(例如乙個子目錄)開始子任務bn處理,當所有子任務bns完成後,將結果聚合到子任務C中,啟動多少個子任務b取決於任務A的輸出。 您可以根據實際業務場景,自定義任務A中子任務的拆分規則。
在實際業務場景中,為了加快大任務的執行速度,提高效率,往往需要將乙個大任務分解成上千個子任務,為了保證上千個子任務的同時執行,需要排程數以萬計的CPU資源。 例如,修改後的演算法的回歸測試需要由乙個子任務針對所有驅動場景執行,開發團隊要求所有子場景並行測試,以加快迭代速度。
如果您需要使用動態 DAG 來編排資料處理、計算和科學計算場景中的任務,或者需要排程數萬個 CPU 資源來加速任務執行,您可以使用阿里雲 Ack One 分布式工作流 Argo 集群
ACK ONE分布式工作流Argo集群,產品化託管Argo Workflow提供售後支援,支援動態DAG扇出扇入任務編排,支援雲算力按需排程,利用雲彈性排程數萬CPU資源並行執行大規模子任務,操作完成後及時資源,減少執行時間,節約成本。 支援資料處理、機器學習、計算、科學計算、CICD等業務場景。
Argo Workflow 是乙個開源的 CNCF 畢業專案,專注於雲原生領域的工作流編排,使用 Kubernetes CRD 編排離線任務和 DAG 工作流,使用 Kubernetes Pod 在集群中排程執行。
本文介紹如何使用 Argo Workflow 編排動態 DAG 扇出扇入任務。
我們將構建乙個動態的 DAG 扇出扇入工作流,在 OSS 中讀取乙個大日誌檔案,將其拆分為多個小檔案,啟動多個子任務來計算每個小檔案中的關鍵字數量(count),最後聚合結果(merge)。
1.建立分布式工作流 ARGO 集群
2.通過附加阿里雲 OSS 儲存卷,您可以像處理本地檔案一樣操作阿里雲 OSS 上的檔案。 參考:工作流使用儲存卷
3.使用以下工作流 yaml 建立工作流,請參閱: 建立工作流。有關詳細資訊,請參閱注釋。
apiversion: argoproj.io/v1alpha1kind: workflowmetadata: generatename: dynamic-dag-map-reduce-spec: entrypoint: main # claim a oss pvc, workflow can read/write file in oss through pvc. volumes: -name: workdir persistentvolumeclaim: claimname: pvc-oss # how many tasks to split, default is 5. arguments: parameters: -name: numparts value: "5" templates: -name: main # dag definition. dag: tasks: # split log files to several small files, based on numparts. -name: split template: split arguments: parameters: -name: numparts value: "}" # multiple map task to count words in each small file. -name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" # run as a loop, partid from split task json outputs. withparam: '}' - name: reduce template: reduce arguments: parameters: -name: numparts value: "}" depends: "map" # the `split` task split the big log file to several small files. each file has a unique id (partid). # finally, it dumps a list of partid to stdout as output parameters - name: split inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["split.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # one `map` per partid is started. finds its own "part file" and processes it. -name: map inputs: parameters: -name: partid container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["count.py"] env: -name: part_id value: "}" volumemounts: -name: workdir mountpath: /mnt/vol # the `reduce` task takes the "results directory" and returns a single result. -name: reduce inputs: parameters: -name: numparts container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count command: [python] args: ["merge.py"] env: -name: num_parts value: "}" volumemounts: -name: workdir mountpath: /mnt/vol outputs: artifacts: -name: result path: /mnt/vol/result.json
4.動態 DAG 實現。
1)拆分大檔案後,拆分任務會在標準輸出中輸出乙個json字串,包括:子任務要處理的partid,例如:
2)map任務使用withparam引用拆分任務的輸出,解析json字串得到所有的},並以每個}作為輸入引數啟動多個map任務。
- name: map template: map arguments: parameters: -name: partid value: '}' depends: "split" withparam: '}'
有關如何定義它的更多資訊,請參閱開源 Argo 工作流文件5.工作流執行後,通過分布式工作流 Argo 集群控制台執行檢視任務 DAG 程序和執行結果。
6.阿里雲OSS檔案列表,log-count-datatxt 是輸入日誌檔案,split-output,cout-output 中間結果目錄,resultjson 是最終結果檔案。
7.示例中源資訊詳見aliyuncontainerservice github argo-workflow-examples
Argo Workflow 是乙個開源的 CNCF 畢業專案,專注於雲原生領域的工作流編排,使用 Kubernetes CRD 編排離線任務和 DAG 工作流,使用 Kubernetes Pod 在集群中排程執行。
阿里雲 ACK One 分布式工作流 Argo Cluster,產品化託管 Argo Workflow,提供售後支援,加強控制面,實現數萬個子任務(Pod)的穩定高效排程和運營,資料面支援雲上大規模算力的無伺服器排程,無需運維集群或節點,支援雲算力按需排程, 利用雲彈性,排程數萬CPU資源並行執行大規模子任務,減少執行時間,支援資料處理、機器學習、**計算、科學計算、CICD等業務場景。
歡迎加入ACK ONE客戶溝通釘釘群與我們交流。 (釘釘群號:35688562)。
1] 阿里雲 Ack One 分布式工作流 Argo 集群。
2] argo workflow
3] 建立分布式工作流 argo 集群。
4] 工作流使用儲存卷。
5] 建立工作流。
6] 開源 Argo 工作流文件。
walk-through/loops/
7] 分布式工作流 Argo 集群控制台。
8] aliyuncontainerservice github argo-workflow-examples