wiki:waue/2010/0118

Version 6 (modified by waue, 14 years ago) (diff)

--

Hadoop Debug

  • 原本
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordIndex {
   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
      Text map_key = new Text();
      Text map_value = new Text();
      String line = value.toString();
      StringTokenizer st = new StringTokenizer(line.toLowerCase());
      while (st.hasMoreTokens()) {
        String word = st.nextToken();
        map_key.set(word);
        map_value.set(fileSplit.getPath().getName() + ":" + line);
        output.collect(map_key, map_value);
      }
    }
  }


  static public class wordindexR extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
      String v = "";
      StringBuilder ret = new StringBuilder("\n");
      for (Text val : values) {
        v += val.toString().trim();
        if (v.length() > 0)
          ret.append(v + "\n");
      }

      output.collect((Text) key, new Text(ret.toString()));
    }
  }

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {
    // debug using
    String[] argv = { "/user/waue/input", "/user/waue/output-wi" };
    args = argv;

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length < 2) {
      System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
      return;
    }
    Job job = new Job(conf, "word index");
    job.setJobName("word inverted index");
    job.setJarByClass(WordIndex.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(wordindexM.class);
    job.setReducerClass(wordindexR.class);
    job.setCombinerClass(wordindexR.class);

    FileInputFormat.setInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    long start = System.nanoTime();

    job.waitForCompletion(true);

    long time = System.nanoTime() - start;
    System.err.println(time * (1E-9) + " secs.");
  }
}

遇到問題:

10/01/18 20:52:39 INFO input.FileInputFormat: Total input paths to process : 2
10/01/18 20:52:39 INFO mapred.JobClient: Running job: job_201001181452_0038
10/01/18 20:52:40 INFO mapred.JobClient:  map 0% reduce 0%
10/01/18 20:52:50 INFO mapred.JobClient: Task Id : attempt_201001181452_0038_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable

  • 目前認為是
   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

用了新的 org.apache.hadoop.mapreduce.Mapper 可以省略掉不用 extends .. implements ... 的宣告,

不過它預設搭配的map 實做方法是 "public void map(LongWritable? key, Text value, Context context) " 而非 "public void map(LongWritable? key, Text value, OutputCollector?<Text, Text> output, Reporter reporter) "

  • 解決辦法