位置:首頁 > 大數據教學 > Hadoop教學 > MapReduce計數器和連接

MapReduce計數器和連接


在MapReduce的計數器是用於收集關於 MapReduce 工作的統計信息的機製。這個信息在MapReduce的作業處理的問題的診斷是很有用的。 計數器類似於將在 map 或 reduce 在代碼日誌信息中。

通常情況下,這些計數器在一個程序(map 或 reduce)中定義,當一個特定事件或條件(特定於該計數器)發生執行期間遞增。計數器是一個很好的應用來從輸入數據集跟蹤有效和無效的記錄。

有兩種類型的計數器:

1. Hadoop 內置計數器: 有一些內置計數器存在每個作業中。下麵是內置計數器組:

  • MapReduce任務計數器 - 收集任務的具體信息(例如,輸入記錄的數量)在它的執行期間。
  • 文件係統計數器 - 收集信息像由一個任務讀取或寫入的字節數
  • FileInputFormat計數器 - 收集通過FileInputFormat讀取的字節數的信息
  • FileOutputFormat計數器 - 收集的字節數量的信息通過 FileOutputFormat 寫入
  • Job 計數器- 這些計數器使用 JobTracker。它們收集統計數據包括如,任務發起了作業的數量。

2. 用戶定義的計數器

除了內置的計數器,用戶可以定義自己的計數器,通過使用編程語言提供了類似的功能。 例如,在 Java 的枚舉用於定義用戶定義的計數器。

一個MapClass例子使用計數器計算缺失和無效值的數量:

 
publicstaticclassMapClass
            extendsMapReduceBase
            implementsMapper<LongWritable, Text, Text, Text>
{
    staticenumSalesCounters { MISSING, INVALID };
    publicvoidmap ( LongWritable key, Text value,
                 OutputCollector<Text, Text> output,
                 Reporter reporter) throwsIOException
    {
        
        //Input string is split using ',' and stored in 'fields' array
        String fields[] = value.toString().split(",", -20);
        //Value at 4th index is country. It is stored in 'country' variable
        String country = fields[4];
        
        //Value at 8th index is sales data. It is stored in 'sales' variable
        String sales = fields[8];
      
        if(country.length() == 0) {
            reporter.incrCounter(SalesCounters.MISSING, 1);
        } elseif(sales.startsWith("\"")) {
            reporter.incrCounter(SalesCounters.INVALID, 1);
        } else{
            output.collect(newText(country), newText(sales + ",1"));
        }
    }
}

上麵的代碼片段顯示在 Map Reduce 實現計數器的示例。

在這裡,SalesCounters是用“枚舉”定義的計數器。它被用來計算 MISSING 和 INVALID 的輸入記錄。

在代碼段中,如果 “country” 字段的長度為零那麼它的值丟失,因此相應的計數器 SalesCounters.MISSING 遞增。

接下來,如果 “sales” 字段開頭是符號 '' ,則記錄被視為無效。這通過遞增計數器 SalesCounters.INVALID 來表示。

MapReduce 連接

連接兩個大的數據集可以使用 MapReduce Join 來實現。然而,這個過程需要編寫大量的代碼來執行實際的連接操作。

連接兩個數據集開始是通過比較每個數據集的大小。如果因為相比其他數據集一個數據集小,那麼小數據集被分布到集群中的每個數據節點。一旦分散,無論是 Mapper 或 Reducer 使用更小的數據集進行查找匹配的大型數據集的記錄,然後結合這些記錄,形成輸出記錄。

這取決於在實際連接進行的地方,這個連接分為:

1. 映射端連接 - 當該聯接是由映射器執行的,它稱為映射端鏈接。在這種類型中,聯結前的數據由映射函數實際來消耗的處理。它是強製性的,輸入到每個映射是在分區中的形式,並且是按排序順序。另外,必須有一個相等數目的分區,它必須由連接鍵進行排序。

2. Reduce端連接- 當連接是通過減速器進行的,稱為reduce端連接。冇有必要在此連接有數據集中在以結構化形式(或分區)。

在這裡,映射端的處理發出連接這兩個表的關鍵字和對應的元組。作為該處理的效果,所有的元組相同連接鍵都落在相同的 reducer,然後使用相同的連接鍵連接記錄。

整體處理流程示於下圖。