跳到主要內容

Alink漫談(五) : 迭代計算和Superstep_台中搬家公司


※推薦台中搬家公司優質服務,可到府估價



台中搬鋼琴,台中金庫搬運,中部廢棄物處理,南投縣搬家公司,好幫手搬家,西屯區搬家


Alink漫談(五) : 迭代計算和Superstep


目錄

  • Alink漫談(五) : 迭代計算和Superstep

    • 0x00 摘要

    • 0x01 緣由

    • 0x02 背景概念

      • 2.1 四層執行圖

      • 2.2 Task和SubTask

      • 2.3 如何劃分 Task 的依據

      • 2.4 JobGraph

      • 2.5 BSP模型和Superstep

        • BSP模型

        • BSP模型的實現

        • Flink-Gelly



    • 0x03 Flink的迭代算法(superstep-based)

      • 3.1 Bulk Iterate

      • 3.2 迭代機制


    • 0x04 Alink如何使用迭代

    • 0x05 深入Flink源碼和runtime來驗證

      • 5.1 向Flink提交Job

      • 5.2 生成JobGraph

      • 5.3 迭代對應的Task

        • 5.3.1 IterationHeadTask

        • 5.3.2 IterationIntermediateTask

        • 5.3.3 IterationTailTask

          • 如何和Head建立聯繫

          • 如何把用戶返回的數值傳給Head


        • 5.3.4 IterationSynchronizationSinkTask


      • 5.4 superstep


    • 0x06 結合KMeans代碼看superset

      • 6.1 K-means算法概要

      • 6.2 KMeansPreallocateCentroid

      • 6.3 KMeansAssignCluster 和 KMeansUpdateCentroids

      • 6.4 KMeansOutputModel


    • 0x07 參考




0x00 摘要


Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。本文將通過Superstep入手看看Alink是如何利用Flink迭代API來實現具體算法。


因為Alink的公開資料太少,所以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。


0x01 緣由


為什麼提到 Superstep 這個概念,是因為在擼KMeans代碼的時候,發現幾個很奇怪的地方,比如以下三個步驟中,都用到了context.getStepNo(),而且會根據其數值的不同進行不同業務操作:


public class KMeansPreallocateCentroid extends ComputeFunction {
public void calc(ComContext context) {
LOG.info("liuhao KMeansPreallocateCentroid ");
if (context.getStepNo() == 1) {
/** 具體業務邏輯代碼
* Allocate memory for pre-round centers and current centers.
*/
}
}
}

public class KMeansAssignCluster extends ComputeFunction {
public void calc(ComContext context) {
......
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
}
/** 具體業務邏輯代碼
* Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
*/
}
}

public class KMeansUpdateCentroids extends ComputeFunction {
public void calc(ComContext context) {
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
}
/** 具體業務邏輯代碼
* Update the centroids based on the sum of points and point number belonging to the same cluster.
*/
}

查看ComContext的源碼,發現stepNo的來源居然是runtimeContext.getSuperstepNumber()


public class ComContext {
private final int taskId;
private final int numTask;
private final int stepNo; // 對,就是這裏
private final int sessionId;
public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
this.sessionId = sessionId;
this.numTask = runtimeContext.getNumberOfParallelSubtasks();
this.taskId = runtimeContext.getIndexOfThisSubtask();
this.stepNo = runtimeContext.getSuperstepNumber(); // 這裏進行了變量初始化
}
/**
* Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
* @return iteration step number.
*/
public int getStepNo() {
return stepNo; // 這裡是使用
}
}

看到這裡有的兄弟可能會虎軀一震,這不是BSP模型的概念嘛。我就是想寫個KMeans算法,怎麼除了MPI模型,還要考慮BSP模型。下面就讓我們一步一步挖掘究竟Alink都做了什麼工作。


0x02 背景概念


2.1 四層執行圖


在 Flink 中的執行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖



  • StreamGraph:Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。

  • JobGraph:StreamGraph 經過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。JobGraph是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。

  • ExecutionGraph:JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。

  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的"圖",並不是一個具體的數據結構。


2.2 Task和SubTask


因為某種原因,Flink內部對這兩個概念的使用本身就有些混亂:在Task Manager里這個subtask的概念由一個叫Task的類來實現。Task Manager里談論的Task對象實際上對應的是ExecutionGraph里的一個subtask。


所以這兩個概念需要理清楚。



  • Task(任務) :Task對應JobGraph的一個節點,是一個算子Operator。Task 是一個階段多個功能相同 subTask 的集合,類似於 Spark 中的 TaskSet。

  • subTask(子任務) :subTask 是 Flink 中任務最小執行單元,是一個 Java 類的實例,這個 Java 類中有屬性和方法,完成具體的計算邏輯。在ExecutionGraph里Task被分解為多個并行執行的subtask 。每個subtask作為一個excution分配到Task Manager里執行。

  • Operator Chains(算子鏈) :沒有 shuffle 的多個算子合併在一個 subTask 中,就形成了 Operator Chains,類似於 Spark 中的 Pipeline。Operator subTask 的數量指的就是算子的并行度。同一程序的不同算子也可能具有不同的并行度(因為可以通過 setParallelism() 方法來修改并行度)。


Flink 中的程序本質上是并行的。在執行期間,每一個算子 Operator (Transformation)都有一個或多個算子subTask(Operator SubTask),每個算子的 subTask 之間都是彼此獨立,並在不同的線程中執行,並且可能在不同的機器或容器上執行。


Task( SubTask) 是一個Runnable 對象, Task Manager接受到TDD 後會用它實例化成一個Task對象, 並啟動一個線程執行Task的Run方法。


TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的數據結構。 他包含了關於Task的所有描述信息。比如:



  • TaskInfo : 包含該Task 執行的java 類,該類是某個 AbstractInvokable的實現類 , 當然也是某個operator的實現類 (比如DataSourceTask, DataSinkTask, BatchTask,StreamTask 等)。

  • IG描述 :通常包含一個或兩個InputGateDeploymentDescriptor(IGD)。

  • 目標RP的描述: ParitionId, PartitionType, RS個數等等。


2.3 如何劃分 Task 的依據


在以下情況下會重新劃分task



  • 并行度發生變化時

  • keyBy() /window()/apply() 等發生 Rebalance 重新分配;

  • 調用 startNewChain() 方法,開啟一個新的算子鏈;

  • 調用 diableChaining()方法,即:告訴當前算子操作不使用 算子鏈 操作。


比如有如下操作


DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
.filter(new FilterClass())
.map(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(2)

那麼StreamGraph的轉換流是:


 Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

其task是四個:



  • Source --> Filter --> Map

  • keyBy

  • timeWindow

  • Sink


其中每個task又會被分成分若干subtask。在執行時,一個Task會被并行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。


2.4 JobGraph


以上說了這麼多,就是要說jobGraph和subtask,因為本文中我們在分析源碼和調試時候,主要是從jobGraph這裏開始入手來看subtask


JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合併的操作,比如對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap滿足鏈接的條件時,可以可以將兩個操作符的操作放到一個線程并行執行,這樣可以減少網絡中的數據傳輸,由於在source和flatMap之間的傳輸的數據也不用序列化和反序列化,所以也提高了程序的執行效率。


相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不完全是"靜態"的數據結構了,因為它加入了中間結果集(IntermediateDataSet)這一"動態"概念。


作業頂點(JobVertex)、中間數據集(IntermediateDataSet)、作業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互為依賴:



  • 一個JobVertex關聯着若干個JobEdge作為輸入端以及若干個IntermediateDataSet作為其生產的結果集;每個JobVertex都有諸如并行度和執行代碼等屬性。

  • 一個IntermediateDataSet關聯着一個JobVertex作為生產者以及若干個JobEdge作為消費者;

  • 一個JobEdge關聯着一個IntermediateDataSet可認為是源以及一個JobVertex可認為是目標消費者;


那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式存儲了所有的JobVertex,鍵是JobVertexID:


private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();


至於其它的元素,通過JobVertex都可以根據關係找尋到。需要注意的是,用於迭代的反饋邊(feedback edge)當前並不體現在JobGraph中,而是被內嵌在特殊的JobVertex中通過反饋信道(feedback channel)在它們之間建立關係。


2.5 BSP模型和Superstep


BSP模型


BSP模型是并行計算模型的一種。并行計算模型通常指從并行算法的設計和分析出發,將各種并行計算機(至少某一類并行計算機)的基本特徵抽象出來,形成一個抽象的計算模型。


BSP模型是一種異步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息傳遞系統,塊內異步并行,塊間顯式同步,該模型基於一個master協調,所有的worker同步(lock-step)執行, 數據從輸入的隊列中讀取。


BSP計算模型不僅是一種體繫結構模型,也是設計并行程序的一種方法。BSP程序設計準則是整體同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具有水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成。


BSP模型的實現


BSP模型的實現大概舉例如下:



  • Pregel :Google的大規模圖計算框架,首次提出了將BSP模型應用於圖計算,具體請看Pregel——大規模圖處理系統,不過至今未開源。

  • Apache Giraph :ASF社區的Incubator項目,由Yahoo!貢獻,是BSP的java實現,專註於迭代圖計算(如pagerank,最短連接等),每一個job就是一個沒有reducer過程的hadoop job。

  • Apache Hama :也是ASF社區的Incubator項目,與Giraph不同的是它是一個純粹的BSP模型的java實現,並且不單單是用於圖計算,意在提供一個通用的BSP模型的應用框架。


Flink-Gelly


Flink-Gelly利用Flink的高效迭代算子來支持海量數據的迭代式圖處理。目前,Flink Gelly提供了"Vertex-Centric","Scatter-Gather"以及"Gather-Sum-Apply"等計算模型的實現。


"Vertex-Centric"迭代模型也就是我們經常聽到的"Pregel",是一種從Vertex角度出發的圖計算方式。其中,同步地迭代計算的步驟稱之為"superstep"。在每個"superstep"中,每個頂點都執行一個用戶自定義的函數,且頂點之間通過消息進行通信,當一個頂點知道圖中其他任意頂點的唯一ID時,該頂點就可以向其發送一條消息。


但是實際上,KMeans不是圖處理,Alink也沒有基於Flink-Gelly來構建。也許只是借鑒了其概念。所以我們還需要再探尋。


0x03 Flink的迭代算法(superstep-based)


迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。為了從大數據中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。


所謂迭代運算,就是給定一個初值,用所給的算法公式計算初值得到一个中間結果,然後將中間結果作為輸入參數進行反覆計算,在滿足一定條件的時候得到計算結果。


大數據處理框架很多,比如spark,mr。實際上這些實現迭代計算都是很困難的。


Flink直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,然後將其嵌入到迭代算子中去。有兩種迭代操作算子: Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調用step函數。


3.1 Bulk Iterate


這種迭代方式稱為全量迭代,它會將整個數據輸入,經過一定的迭代次數,最終得到你想要的結果。


迭代操作算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),然後計算得到下輪迭代的輸出(例如,map,reduce,join等)


迭代過程主要分為以下幾步:



  • Iteration Input(迭代輸入):是初始輸入值或者上一次迭代計算的結果。

  • Step Function(step函數):每次迭代都會執行step函數。它迭代計算DataSet,由一系列的operator組成,比如map,flatMap,join等,取決於具體的業務邏輯。

  • Next Partial Solution(中間結果):每一次迭代計算的結果,被發送到下一次迭代計算中。

  • Iteration Result(迭代結果):最後一次迭代輸出的結果,被輸出到datasink或者發送到下游處理。


它迭代的結束條件是:



  • 達到最大迭代次數

  • 自定義收斂聚合函數


編程的時候,需要調用iterate(int),該函數返回的是一個IterativeDataSet,當然我們可以對它進行一些操作,比如map等。Iterate函數唯一的參數是代表最大迭代次數。


迭代是一個環。我們需要進行閉環操作,那麼這時候就要用到closeWith(Dataset)操作了,參數就是需要循環迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數后終止。


3.2 迭代機制


DataSet API引進了獨特的同步迭代機制(superstep-based),僅限於用在有界的流。


我們將迭代操作算子的每個步驟函數的執行稱為單個迭代。在并行設置中,在迭代狀態的不同分區上并行計算step函數的多個實例。在許多設置中,對所有并行實例上的step函數的一次評估形成了所謂的superstep,這也是同步的粒度。因此,迭代的所有并行任務都需要在初始化下一個superstep之前完成superstep。終止準則也將被評估為superstep同步屏障。


下面是Apache原文



We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.



下面是apache原圖



概括如下:


每次迭代都是一個superstep
每次迭代中有若干subtask在不同的partition上分別執行step
每個step有一個HeadTask,若干IntermediateTask,一個TailTask
每個superstep有一個SynchronizationSinkTask 同步,因為迭代的所有并行任務需要在下一個迭代前完成

由此我們可以知道,superstep這是Flink DataSet API的概念,但是你從這裡能夠看到BSP模型的影子,比如:



  • 在傳統的BSP模型中,一個superstep被分為3步: 本地的計算, 消息的傳遞, 同步的barrier.

  • Barrier Synchronization又叫障礙同步或柵欄同步。每一次同步也是一個超步的完成和下一個超步的開始;

  • Superstep超步 是一次計算迭代,從起始每往前步進一層對應一個超步。

  • 程序該什麼時候結束是程序自己控制


0x04 Alink如何使用迭代


KMeansTrainBatchOp.iterateICQ函數中,生成了一個IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。


return new IterativeComQueue()
.initWithPartitionedData(TRAIN_DATA, data)
.initWithBroadcastData(INIT_CENTROID, initCentroid)
.initWithBroadcastData(KMEANS_STATISTICS, statistics)
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 終止條件
.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
.setMaxIter(maxIter) // 迭代最大次數
.exec();

而BaseComQueue.exec函數中則有:


public DataSet<Row> exec() {
IterativeDataSet<byte[]> loop // Flink 迭代API
= loopStartDataSet(executionEnvironment)
.iterate(maxIter);
// 後續操作能看出來,之前添加在queue上的比如KMeansPreallocateCentroid,都是在loop之上運行的。
if (null == compareCriterion) {
loopEnd = loop.closeWith...
} else {
// compare Criterion.
DataSet<Boolean> criterion = input ... compareCriterion
loopEnd = loop.closeWith( ... criterion ... )
}
}

再仔細研究代碼,我們可以看出:


superstep包括:


.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))


終止標準就是


※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益



擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。




利用KMeansIterTermination構建了一個RichMapPartitionFunction作為終止標準。最後結束時候調用 KMeansOutputModel完成業務操作。


最大循環就是


.setMaxIter(maxIter)


於是我們可以得出結論,superstep-based Bulk Iterate 迭代算子是用來實現整體KMeans算法,KMeans算法就是一個superstep進行迭代。但是在superstep內容如果需要通訊或者柵欄同步,則採用了MPI的allReduce。


0x05 深入Flink源碼和runtime來驗證


我們需要深入到Flink內部去挖掘驗證,如果大家有興趣,可以參見下面調用棧,自己添加斷點來研究。


execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)

5.1 向Flink提交Job


Alink和Flink構建聯繫,是在print調用中完成的。因為是本地調試,Flink會啟動一個miniCluster,然後會做如下操作。



  • 首先生成執行計劃Plan。Plan以數據流形式來表示批處理程序,但它只是批處理程序最初的表示,然後計劃會被優化以生成更高效的方案OptimizedPlan。

  • 然後,計劃被編譯生成JobGraph。這個圖是要交給flink去生成task的圖。

  • 生成一系列配置。

  • 將JobGraph和配置交給flink集群去運行。如果不是本地運行的話,還會把jar文件通過網絡發給其他節點。

  • 以本地模式運行的話,可以看到啟動過程,如啟動性能度量、web模塊、JobManager、ResourceManager、taskManager等等。


當我們看到了submitJob調用,就知道KMeans代碼已經和Flink構建了聯繫


@Internal
public class LocalExecutor implements PipelineExecutor {

public static final String NAME = "local";

@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

final JobGraph jobGraph = getJobGraph(pipeline, configuration);
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

jobIdFuture
.thenCompose(clusterClient::requestJobResult)
.thenAccept((jobResult) -> clusterClient.shutDownCluster());

return jobIdFuture.thenApply(jobID ->
new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
}

5.2 生成JobGraph


生成jobGraph的具體流程是:



  • IterativeDataSet.closeWith會生成一個BulkIterationResultSet。

  • PrintBatchOp.sinkFrom中會調用到ExecutionEnvironment.executeAsync

  • 調用createProgramPlan構建一個Plan

  • OperatorTranslation.translate函數發現if (dataSet instanceof BulkIterationResultSet),則調用translateBulkIteration(bulkIterationResultSet);

  • 這時候生成了執行計劃Plan

  • ExecutionEnvironment.executeAsync調用LocalExecutor.execute

  • 然後調用FlinkPipelineTranslationUtil.getJobGraph來生成jobGraph

  • GraphCreatingVisitor.preVisit中會判斷 if (c instanceof BulkIterationBase),以生成BulkIterationNode

  • PlanTranslator.translateToJobGraph會調用到JobGraphGenerator.compileJobGraph,最終調用到createBulkIterationHead就生成了迭代處理的Head。

  • 最後將jobGraph提交給Cluster ,jobGraph 變形為 ExceutionGraph在JM和TM上執行。


5.3 迭代對應的Task


前面代碼中,getJobGraph函數作用是生成了job graph。


然後 JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。


最後 JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task。


所以我們需要看看最終運行時候,迭代API對應着哪些Task。


針對IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了如下的task。



  • IterationHeadTask

  • IterationIntermediateTask

  • IterationTailTask

  • IterationSynchronizationSinkTask


5.3.1 IterationHeadTask


IterationHeadTask主要作用是協調一次迭代。


它會讀取初始輸入,和迭代Tail建立一個BlockingBackChannel。在成功處理輸入之後,它會發送EndOfSuperstep事件給自己的輸出。它在每次superstep之後會聯繫 synchronization task,等到自己收到一個用來同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示所有其他的heads已經完成了自己的迭代。


下一次迭代時候,上一次迭代中tail的輸出就經由backchannel傳輸,形成了head的輸入。何時進入到下一個迭代,是由HeadTask完成的。一旦迭代完成,head將發送TerminationEvent給所有和它關聯的task,告訴他們shutdown。


				barrier.waitForOtherWorkers();

if (barrier.terminationSignaled()) {
requestTermination();
nextStepKickoff.signalTermination();
} else {
incrementIterationCounter();
String[] globalAggregateNames = barrier.getAggregatorNames();
Value[] globalAggregates = barrier.getAggregates();
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
// 在這裏發起下一次Superstep。
nextStepKickoff.triggerNextSuperstep();
}
}

IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中構建的。其例子如下:


"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"

5.3.2 IterationIntermediateTask


IterationIntermediateTask是superstep中間段的task,其將傳輸EndOfSuperstepEvent和TerminationEvent給所有和它關聯的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代狀態。


如果迭代狀態被更新,本task的輸出將傳送回IterationHeadTask,在這種情況下,本task將作為head再次被安排。


IterationIntermediateTask的例子如下:


 "MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"

"Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"

"MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"

"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"


5.3.3 IterationTailTask


IterationTailTask是迭代的最末尾。如果迭代狀態被更新,本task的輸出將通過BlockingBackChannel傳送回IterationHeadTask,反饋給迭代頭就意味着一個迭代完整邏輯的完成,那麼就可以關閉這個迭代閉合環了。這種情況下,本task將在head所在的實例上重新被調度。


這裡有幾個關鍵點需要注意:


如何和Head建立聯繫

Flink有一個BlockingQueueBroker類,這是一個阻塞式的隊列代理,它的作用是對迭代併發進行控制。Broker是單例的,迭代頭任務和尾任務會生成同樣的broker ID,所以頭尾在同一個JVM中會基於相同的dataChannel進行通信。dataChannel由迭代頭創建。


IterationHeadTask中會生成BlockingBackChannel,這是一個容量為1的阻塞隊列。


// 生成channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager()));

// 然後block在這裏,等待Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();

IterationTailTask則是如下:


// 在基類得到channel,因為是單例,所以會得到同一個
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());

// notify iteration head if responsible for workset update 在這裏通知Head
worksetBackChannel.notifyOfEndOfSuperstep();

而兩者都是利用如下辦法來建立聯繫,在同一個subtask中會使用同一個brokerKey,這樣首尾就聯繫起來了。


public String brokerKey() {
if (this.brokerKey == null) {
int iterationId = this.config.getIterationId();
this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}

return this.brokerKey;
}

如何把用戶返回的數值傳給Head

這是通過output.collect來完成的。


首先,在Tail初始化時候,會生成一個outputCollector,這個outputCollector會被設置為本task的輸出outputCollector。這樣就保證了用戶函數的輸出都會轉流到outputCollector。


而outputCollector的輸出就是worksetBackChannel的輸出,這裏設置為同一個instance。這樣用戶輸出就輸出到backChannel中。


	@Override
protected void initialize() throws Exception {
super.initialize();

// set the last output collector of this task to reflect the iteration tail state update:
// a) workset update,
// b) solution set update, or
// c) merged workset and solution set update

Collector<OT> outputCollector = null;
if (isWorksetUpdate) {
// 生成一個outputCollector
outputCollector = createWorksetUpdateOutputCollector();

// we need the WorksetUpdateOutputCollector separately to count the collected elements
if (isWorksetIteration) {
worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
}
}

......
// 把outputCollector設置為本task的輸出
setLastOutputCollector(outputCollector);
}

outputCollector的輸出就是worksetBackChannel的輸出buffer,這裏設置為同一個instance。


	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
DataOutputView outputView = worksetBackChannel.getWriteEnd();
TypeSerializer<OT> serializer = getOutputSerializer();
return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
}

運行時候如下:


	@Override
public void run() throws Exception {

SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());

while (this.running && !terminationRequested()) {

// 用戶在這裏輸出,最後會輸出到output.collect,也就是worksetBackChannel的輸出buffer。
super.run();

// 這時候以及輸出到channel完畢,只是通知head進行讀取。
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
} else if (isSolutionSetUpdate) {
// notify iteration head if responsible for solution set update
solutionSetUpdateBarrier.notifySolutionSetUpdate();
}

...
}

IterationTailTask例子如下:


"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"

5.3.4 IterationSynchronizationSinkTask


IterationSynchronizationSinkTask作用是同步所有的iteration heads,IterationSynchronizationSinkTask被是實現成一個 output task。其只是用來協調,不處理任何數據。


在每一次superstep,IterationSynchronizationSinkTask只是等待直到它從每一個head都收到一個WorkerDoneEvent。這表示下一次superstep可以開始了。


這裏需要注意的是 SynchronizationSinkTask 如何等待各個并行度的headTask。比如Flink的并行度是5,那麼SynchronizationSinkTask怎麼做到等待這5個headTask。


在IterationSynchronizationSinkTask中,註冊了SyncEventHandler來等待head的WorkerDoneEvent。


this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);

在SyncEventHandler中,我們可以看到,在構建時候,numberOfEventsUntilEndOfSuperstep就被設置為并行度,每次收到一個WorkerDoneEvent,workerDoneEventCounter就遞增,當等於numberOfEventsUntilEndOfSuperstep,即并行度時候,就說明本次superstep中,所有headtask都成功了。


    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
if (this.endOfSuperstep) {
throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
} else {
// 每次遞增
++this.workerDoneEventCounter;
String[] aggNames = workerDoneEvent.getAggregatorNames();
Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
if (aggNames.length != aggregates.length) {
throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
} else {
for(int i = 0; i < aggNames.length; ++i) {
Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
aggregator.aggregate(aggregates[i]);
}

// numberOfEventsUntilEndOfSuperstep就是并行度,等於并行度時候就說明所有head都成功了。
if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
this.endOfSuperstep = true;
Thread.currentThread().interrupt();
}
}
}
}

IterationSynchronizationSinkTask的例子如下:


"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"

5.4 superstep


綜上所述,我們最終得到superstep如下:


***** 文字描述如下 *****

每次迭代都是一個superstep
每次迭代中有若干subtask在不同的partition上分別執行step
每個step有一個HeadTask,若干IntermediateTask,一個TailTask
每個superstep有一個SynchronizationSinkTask

***** 偽代碼大致如下 *****

for maxIter :
begin superstep
for maxSubTask :
begin step
IterationHeadTask
IterationIntermediateTask
IterationIntermediateTask
...
IterationIntermediateTask
IterationIntermediateTask
IterationTailTask
end step
IterationSynchronizationSinkTask
end superstep

0x06 結合KMeans代碼看superset


6.1 K-means算法概要


K-means算法的過程,為了盡量不用數學符號,所以描述的不是很嚴謹,大概就是這個意思,"物以類聚、人以群分":



  1. 首先輸入k的值,即我們希望將數據集經過聚類得到k個分組。

  2. 從數據集中隨機選擇k個數據點作為初始大哥(質心,Centroid)

  3. 對集合中每一個小弟,計算與每一個大哥的距離(距離的含義後面會講),離哪個大哥距離近,就跟定哪個大哥。

  4. 這時每一個大哥手下都聚集了一票小弟,這時候召開人民代表大會,每一群選出新的大哥(其實是通過算法選出新的質心)。

  5. 如果新大哥和老大哥之間的距離小於某一個設置的閾值(表示重新計算的質心的位置變化不大,趨於穩定,或者說收斂),可以認為我們進行的聚類已經達到期望的結果,算法終止。

  6. 如果新大哥和老大哥距離變化很大,需要迭代3~5步驟。


6.2 KMeansPreallocateCentroid


KMeansPreallocateCentroid也是superstep一員,但是只有context.getStepNo() == 1的時候,才會進入實際業務邏輯,預分配Centroid。當superstep為大於1的時候,本task會執行,但不會進入具體業務代碼。


public class KMeansPreallocateCentroid extends ComputeFunction {
private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);

@Override
public void calc(ComContext context) {
// 每次superstep都會進到這裏
LOG.info(" KMeansPreallocateCentroid 我每次都會進的呀 ");
if (context.getStepNo() == 1) {
// 實際預分配業務只進入一次
}
}
}

6.3 KMeansAssignCluster 和 KMeansUpdateCentroids


KMeansAssignCluster 作用是為每個點(point)計算最近的聚類中心,為每個聚類中心的點坐標的計數和求和。


KMeansUpdateCentroids 作用是基於計算出來的點計數和坐標,計算新的聚類中心。


Alink在整個計算過程中維護一個特殊節點來記住待求中心點當前的結果。


這就是為啥迭代時候需要區分奇數次和偶數次的原因了。奇數次就表示老大哥,偶數次就表示新大哥。每次superstep只會計算一批大哥,留下另外一批大哥做距離比對。


另外要注意的一點是:普通的迭代計算,是通過Tail給Head回傳用戶數據,但是KMeans這裏的實現並沒有採用這個辦法,而是把計算出來的中心點都存在共享變量中,在各個intermediate之間互相交互。


public class KMeansAssignCluster extends ComputeFunction {
public void calc(ComContext context) {
......
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
}
/** 具體業務邏輯代碼
* Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
*/
}
}

public class KMeansUpdateCentroids extends ComputeFunction {
public void calc(ComContext context) {
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
}
/** 具體業務邏輯代碼
* Update the centroids based on the sum of points and point number belonging to the same cluster.
*/
}

6.4 KMeansOutputModel


這裏要特殊說明,因為KMeansOutputModel是最終輸出模型,而KMeans算法的實現是:所有subtask都擁有所有中心點,就是說所有subtask都會有相同的模型,就沒有必要全部輸出,所以這裏限定了第一個subtask才能輸出,其他的都不輸出。


	@Override
public List <Row> calc(ComContext context) {
// 只有第一個subtask才輸出模型數據。
if (context.getTaskId() != 0) {
return null;
}

....

modelData.params = new KMeansTrainModelData.ParamSummary();
modelData.params.k = k;
modelData.params.vectorColName = vectorColName;
modelData.params.distanceType = distanceType;
modelData.params.vectorSize = vectorSize;
modelData.params.latitudeColName = latitudeColName;
modelData.params.longtitudeColName = longtitudeColName;

RowCollector collector = new RowCollector();
new KMeansModelDataConverter().save(modelData, collector);
return collector.getRows();
}

0x07 參考


幾種并行計算模型的區別(BSP LogP PRAM)


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚類、K-Means、例子、細節


Flink-Gelly:Iterative Graph Processing


從BSP模型到Apache Hama


Flink DataSet迭代運算


幾種并行計算模型的區別(BSP LogP PRAM)


Flink架構,源碼及debug


Flink 之 Dataflow、Task、subTask、Operator Chains、Slot 介紹


Flink 任務和調度


Flink運行時之生成作業圖

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!



還在煩惱搬家費用要多少哪?台中大展搬家線上試算搬家費用,從此不再擔心「物品怎麼計費」、「多少車才能裝完」




Orignal From: Alink漫談(五) : 迭代計算和Superstep_台中搬家公司

留言

這個網誌中的熱門文章

Python 併發總結,多線程,多進程,異步IO

1 測量函數運行時間 import time def profile(func): def wrapper(*args, ** kwargs): import time start = time.time() func( *args, ** kwargs) end = time.time() print ' COST: {} ' .format(end - start) return wrapper @profile def fib(n): if n<= 2 : return 1 return fib(n-1) + fib(n-2 ) fib( 35 )   2 啟動多個線程,並等待完成   2.1 使用threading.enumerate() import threading for i in range(2 ): t = threading.Thread(target=fib, args=(35 ,)) t.start() main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue t.join()   2.2 先保存啟動的線程 threads = [] for i in range(5 ): t = Thread(target=foo, args= (i,)) threads.append(t) t.start() for t in threads: t.join()   3 使用信號量,限制同時能有幾個線程訪問臨界區 from threading import Semaphore import time sema = Semaphor...

高雄十大包子名店出爐

, 圖文:吳恩文 高雄包子大賽落幕了,我只能就我個人意見, 介紹一下前十名這些包子,但是不能代表其他四位評審的意見,雖然身為評審長,我通常不會第一個表示意見,以免影響其他評審, 我主要工作是負責發問。   這次參賽的素包子很少,而且都不夠細致,又偏油,我不愛, 但是第一名的甜芝麻包-熔岩黑金包,竟然是素食得名- 漢來蔬食巨蛋店。   這包子賣相太好,竹炭粉的黑色外皮刷上金粉,一上桌,眾人驚呼, 搶拍照,內餡是芝麻餡,混一點花生醬增稠,加入白糖芝麻油, 熔岩爆漿的程度剛剛好,我一直以為芝麻要配豬油才行、 但是選到好的黑芝麻油一樣不減香醇, 當下有二位評審就想宅配回家。   尤其特別的是,黑芝麻餡室溫易化,師傅必須要輪班躲在冷藏室內, 穿著大外套才能包,一天包不了多少,我笑說,漢來美食,集團餐廳那麼多,實力雄厚,根本是「 奧運選手報名參加村裡運動會」嘛,其他都是小包子店啊, 但是沒辦法,顯然大家都覺得它好看又好吃, 目前限定漢來蔬食高雄巨蛋店,二顆88元,可以冷凍宅配, 但是要排一陣子,因為供不應求,聽說,四月份, 台北sogo店開始會賣。   第二名的包子,左營寬來順早餐店,顯然平易近人的多,一顆肉包, 十塊錢,是所有參賽者中最便宜的,當然,個頭也小, 它的包子皮明顯和其他不同,灰灰的老麵,薄但紮實有嚼勁, 肉餡新鮮帶汁,因為打了些水,味道極其簡單,就是蔥薑,塩, 香油,薑味尤其明顯,是老眷村的味道, 而特別的是老闆娘是台灣本省人, 當年完全是依據眷村老兵的口味一步一步調整而來,沒有加什麼糖、 五香粉,胡椒粉,油蔥酥。就是蔥薑豬肉和老麵香,能得名, 應該是它的平實無華,鮮美簡單,打動人心。   這是標準的心靈美食,可以撫慰人心,得名之前,寛來順已經天天排隊,現在,恐怕要排更久了, 建議大家六七點早點上門。   第三名,「專十一」很神奇,我記得比賽最後, 大家連吃了幾家不能引起共鳴的包子,有些累,到了專十一, 就坐著等包子,其他評審一吃,就催我趕快試,我一吃, 也醒了大半。   它的包子皮厚薄適中,但是高筋麵粉高些,老麵加一點點酵母, 我心中,它的皮屬一屬二,至於餡又多又好吃,蛋黃還是切丁拌入, 不是整顆放,吃起來「美味、均衡、飽滿」。一顆二十元。   老闆是陸軍專科十一期畢業取名專十一,...

韋伯連續劇終於更新 期待第一季順利完結

  地球天文學界的跳票大王詹姆斯·韋伯空間望遠鏡 (James Webb Space Telescope,縮寫為 JWST)自 1996 年以來斷斷續續不按劇本演出的連續劇終於讓焦慮的觀眾們又等到了一次更新:五層遮陽罩測試順利完成。 裝配完成的韋伯望遠鏡與好夥伴遮陽罩同框啦。Credit: NASA   嚴格的測試是任何空間任務順利成功的重中之重。遮陽罩,這個韋伯望遠鏡異常重要的親密夥伴,要是無法正常運轉的話,韋伯的這一季天文界連續劇說不準就要一直拖更了。   詹姆斯·韋伯空間望遠鏡是歷史上造出的最先進的空間望遠鏡。它不僅是一架紅外望遠鏡,還具有特別高的靈敏度。但想要達到辣么高的靈敏度來研究系外行星和遙遠的宇宙童年,韋伯童鞋必須非常"冷靜",體溫升高的話,靈敏度會大大折損。這個時候,遮陽罩就要大顯身手啦。   遮陽罩在韋伯的設計中至關重要。韋伯望遠鏡會被發射到拉格朗日 L2 點,運行軌道很高,遠離太陽、地球與月球。太陽是韋伯的主要熱量干擾的來源,其次是地球與月球。遮陽罩會有效阻斷來自這三大熱源的能量並保護韋伯維持在工作溫度正常運轉。這個工作溫度指的是零下 220 攝氏度(-370 華氏度;50 開爾文)。 上圖中我們可以看出,韋伯望遠鏡的配置大致可分為兩部分:紅色較熱的一面溫度為 85 攝氏度,藍色較冷的一面溫度達到零下 233 攝氏度。紅色的這部分中,儀器包括太陽能板、通信設備、計算機、以及轉向裝置。藍色部分的主要裝置包括鏡面、探測器、濾光片等。Credit: STSci.   遮陽罩的那一部分和望遠鏡的鏡面這部分可以產生非常極端的溫差。遮陽的這面溫度可以達到 110 攝氏度,足以煮熟雞蛋,而背陰處的部分溫度極低,足以凍結氧氣。   工程師們剛剛完成了五層遮陽罩的測試,按照韋伯在 L2 時的運行狀態安裝了遮陽罩。L2 距離地球約 160 萬公里。NASA 表示這些測試使用了航天器的自帶系統來展開遮陽罩,測試目前都已成功完成。韋伯望遠鏡遮陽罩負責人 James Cooper 介紹說這是遮陽罩"第一次在望遠鏡系統的电子設備的控制下展開。儘管這個任務非常艱巨,難度高,但測試順利完成,遮陽罩展開時的狀態非常驚艷"。   遮陽罩由五層 Kapton 製成。Kapton 是一種聚酰亞胺薄膜材料, 耐高溫絕...