| 134 | | |
| | 142 | Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 ''' OutputCollector 與 Reporter ''' |
| | 143 | |
| | 144 | {{{ |
| | 145 | #!java |
| | 146 | public class WordIndex { |
| | 147 | public static class wordindexM extends |
| | 148 | Mapper<LongWritable, Text, Text, Text> { |
| | 149 | public void map(LongWritable key, Text value, Context context) |
| | 150 | throws IOException, InterruptedException { |
| | 151 | |
| | 152 | FileSplit fileSplit = (FileSplit) context.getInputSplit(); |
| | 153 | |
| | 154 | Text map_key = new Text(); |
| | 155 | Text map_value = new Text(); |
| | 156 | String line = value.toString(); |
| | 157 | StringTokenizer st = new StringTokenizer(line.toLowerCase()); |
| | 158 | while (st.hasMoreTokens()) { |
| | 159 | String word = st.nextToken(); |
| | 160 | map_key.set(word); |
| | 161 | map_value.set(fileSplit.getPath().getName() + ":" + line); |
| | 162 | context.write(map_key, map_value); |
| | 163 | } |
| | 164 | } |
| | 165 | } |
| | 166 | |
| | 167 | |
| | 168 | static public class wordindexR extends Reducer<Text, Text, Text, Text> { |
| | 169 | |
| | 170 | public void reduce(Text key, Iterable<Text> values, |
| | 171 | OutputCollector<Text, Text> output, Reporter reporter) |
| | 172 | throws IOException { |
| | 173 | String v = ""; |
| | 174 | StringBuilder ret = new StringBuilder("\n"); |
| | 175 | for (Text val : values) { |
| | 176 | v += val.toString().trim(); |
| | 177 | if (v.length() > 0) |
| | 178 | ret.append(v + "\n"); |
| | 179 | } |
| | 180 | |
| | 181 | output.collect((Text) key, new Text(ret.toString())); |
| | 182 | } |
| | 183 | } |
| | 184 | |
| | 185 | public static void main(String[] args) throws IOException, |
| | 186 | InterruptedException, ClassNotFoundException { |
| | 187 | // debug using |
| | 188 | String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; |
| | 189 | args = argv; |
| | 190 | |
| | 191 | Configuration conf = new Configuration(); |
| | 192 | String[] otherArgs = new GenericOptionsParser(conf, args) |
| | 193 | .getRemainingArgs(); |
| | 194 | if (otherArgs.length < 2) { |
| | 195 | System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); |
| | 196 | return; |
| | 197 | } |
| | 198 | Job job = new Job(conf, "word index"); |
| | 199 | job.setJobName("word inverted index"); |
| | 200 | job.setJarByClass(WordIndex.class); |
| | 201 | |
| | 202 | job.setMapOutputKeyClass(Text.class); |
| | 203 | job.setMapOutputValueClass(Text.class); |
| | 204 | job.setOutputKeyClass(Text.class); |
| | 205 | job.setOutputValueClass(Text.class); |
| | 206 | |
| | 207 | job.setMapperClass(wordindexM.class); |
| | 208 | job.setReducerClass(wordindexR.class); |
| | 209 | job.setCombinerClass(wordindexR.class); |
| | 210 | |
| | 211 | FileInputFormat.setInputPaths(job, args[0]); |
| | 212 | HelloHadoopV2.checkAndDelete(args[1], conf); |
| | 213 | FileOutputFormat.setOutputPath(job, new Path(args[1])); |
| | 214 | |
| | 215 | long start = System.nanoTime(); |
| | 216 | |
| | 217 | job.waitForCompletion(true); |
| | 218 | |
| | 219 | long time = System.nanoTime() - start; |
| | 220 | System.err.println(time * (1E-9) + " secs."); |
| | 221 | } |
| | 222 | } |
| | 223 | |
| | 224 | }}} |