wiki:waue/2010/0115-HelloHadoopV2

Hello Hadoop V2

  • hellohadoop v2 main
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    //  HelloHadoopV2
    //  說明: 
    //    此程式碼比HelloHadoop 增加 
    //    * 檢查輸出資料夾是否存在並刪除 
    //    * input 資料夾內的資料若大於兩個,則資料不會被覆蓋
    //    * map 與 reduce 拆開以利程式再利用
    //
    //  測試方法:
    //    將此程式運作在hadoop 0.20 平台上,執行:
    //    ---------------------------
    //    hadoop jar HelloHadoopV2.jar 
    //    ---------------------------
    //
    //  注意:
    //  1.  在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
    //    請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
    //  2.  運算完後,程式將執行結果放在hdfs 的輸出路徑為 "/user/$YOUR_NAME/output-hh2"
    //    
    
    public class HelloHadoopV2 {
    
    
      public static void main(String[] args) throws IOException,
          InterruptedException, ClassNotFoundException {
    
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Hadoop Hello World 2");
        job.setJarByClass(HelloHadoopV2.class);
        // 設定 map and reduce 以及 Combiner class
        job.setMapperClass(HelloMapperV2.class);
        job.setCombinerClass(HelloReducerV2.class);
        job.setReducerClass(HelloReducerV2.class);
    
        // 設定map的輸出型態
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 設定reduce的輸出型態
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    
        FileInputFormat.addInputPath(job, new Path("input"));
    
        FileOutputFormat.setOutputPath(job, new Path("output-hh2"));
    
        // 呼叫checkAndDelete函式,檢查是否存在該資料夾,若有則刪除之
        CheckAndDelete.checkAndDelete("output-hh2", conf);
    
        boolean status = job.waitForCompletion(true);
    
        if (status) {
          System.err.println("Integrate Alert Job Finished !");
    
        } else {
          System.err.println("Integrate Alert Job Failed !");
          System.exit(1);
        }
      }
    }
    
    
  • mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HelloMapperV2 extends Mapper<LongWritable, Text, Text, Text> {

  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    context.write(new Text(key.toString()), value);
  }

}

  • reducer
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class HelloReducerV2 extends Reducer<Text, Text, Text, Text> {
  public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {

    String str = new String("");
    Text final_key = new Text();
    Text final_value = new Text();
    // 將key值相同的values,透過 && 符號分隔之
    for (Text tmp : values) {
      str += tmp.toString() + " &&";
    }

    final_key.set(key);
    final_value.set(str);

    context.write(final_key, final_value);
  }
}
Last modified 14 years ago Last modified on Jan 20, 2010, 9:27:21 PM