{{{ #!html
HBase 程式設計
TableMapper 使用方式
}}} [[PageOutline]] = example 1 = {{{ #!java public static void main(String[] args) throws Exception { Job myJob = new Job(); myJob.setJobName("myJob"); myJob.setJarByClass(MyClass.class); myJob.setMapOutputKeyClass(Text.class); myJob.setMapOutputValueClass(Text.class); myJob.setOutputKeyClass(Text.class); myJob.setOutputValueClass(Put.class); Scan myScan = new Scan("".getBytes(),"12345".getBytes()); myScan.addColumn("Resume:Text".getBytes()); TableMapReduceUtil.initTableMapperJob("inputTable", myScan, Map.class, Text.class, Text.class, myJob); TableMapReduceUtil.initTableReducerJob("outputTable", Reduce.class, myJob); myJob.setMapperClass(Map.class); myJob.setReducerClass(Reduce.class); myJob.setInputFormatClass(TableInputFormat.class); myJob.setOutputFormatClass(TableOutputFormat.class); myJob.setNumReduceTasks(12); myJob.submit(); while(!myJob.isComplete()) { Thread.currentThread().sleep(10000); System.out.println("Map: " + (myJob.mapProgress() * 100) + "% ... Reduce: " + (myJob.reduceProgress() * 100) + "%"); } if(myJob.isSuccessful()) { System.out.println("Job Successful."); } else { System.out.println("Job Failed."); } } public static class Map extends TableMapper { public void map(ImmutableBytesWritable key, Result value, Mapper.Context context) throws IOException, InterruptedException { } } public static class Reduce extends TableReducer { public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { } } }}} = Example 2 = {{{ #!java public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { try { context.write(row, resultToPut(row, value)); } catch (InterruptedException e) { e.printStackTrace(); } } }}} == 完整 == {{{ #!java /** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * Import data written by {@link Export}. */ public class Import { final static String NAME = "import"; /** * Write table content out to files in hdfs. */ static class Importer extends TableMapper { /** * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, * org.apache.hadoop.mapreduce.Mapper.Context) */ @Override public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { try { context.write(row, resultToPut(row, value)); } catch (InterruptedException e) { e.printStackTrace(); } } private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { Put put = new Put(key.get()); for (KeyValue kv : result.raw()) { put.add(kv); } return put; } } /** * Sets up the actual job. * * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; Path inputDir = new Path(args[1]); Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(Importer.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(Importer.class); // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); return job; } /* * @param errorMsg Error message. Can be null. */ private static void usage(final String errorMsg) { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: Import "); } /** * Main entry point. * * @param args The command line parameters. * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { HBaseConfiguration conf = new HBaseConfiguration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); System.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } } }}} = Example 3 = {{{ #!java public static class ComputeSimilarity extends TableMapper implements Configurable { private Configuration conf = null; private DenseMatrix matrix; public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { DenseVector v = new DenseVector(value); Put put = new Put(key.get()); for (int i = 0; i < matrix.getRows(); i++) { double dotProduct = matrix.getRow(i).dot(v); if (BytesUtil.getRowIndex(key.get()) == i) { dotProduct = 0; } put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String .valueOf(i)), Bytes.toBytes(dotProduct)); } context.write(key, put); } }}} == 完整 == {{{ #!java import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hama.Constants; import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; import org.apache.hama.matrix.DenseMatrix; import org.apache.hama.matrix.DenseVector; import org.apache.hama.matrix.Matrix; import org.apache.hama.matrix.Vector; import org.apache.hama.util.BytesUtil; import org.apache.log4j.Logger; public class TestCosineSimilarityMatrix extends HamaCluster { static final Logger LOG = Logger.getLogger(TestCosineSimilarityMatrix.class); private int SIZE = 10; private Matrix m1; private Matrix symmetricMatrix; private HamaConfiguration conf; /** * @throws UnsupportedEncodingException */ public TestCosineSimilarityMatrix() throws UnsupportedEncodingException { super(); } public void setUp() throws Exception { super.setUp(); conf = getConf(); m1 = DenseMatrix.random(conf, SIZE, SIZE); symmetricMatrix = new DenseMatrix(conf, SIZE, SIZE); } public void testCosineSimilarity() throws IOException { Job job = new Job(conf, "set MR job test"); job.getConfiguration().set("input.matrix", m1.getPath()); Scan scan = new Scan(); scan.addFamily(Constants.COLUMNFAMILY); TableMapReduceUtil.initTableMapperJob(m1.getPath(), scan, ComputeSimilarity.class, ImmutableBytesWritable.class, Put.class, job); TableMapReduceUtil.initTableReducerJob(symmetricMatrix.getPath(), IdentityTableReducer.class, job); job.setNumReduceTasks(0); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } Vector v1 = m1.getRow(0); Vector v2 = m1.getRow(2); assertEquals(v1.dot(v2), symmetricMatrix.get(0, 2)); } public static class ComputeSimilarity extends TableMapper implements Configurable { private Configuration conf = null; private DenseMatrix matrix; public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { DenseVector v = new DenseVector(value); Put put = new Put(key.get()); for (int i = 0; i < matrix.getRows(); i++) { double dotProduct = matrix.getRow(i).dot(v); if (BytesUtil.getRowIndex(key.get()) == i) { dotProduct = 0; } put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String .valueOf(i)), Bytes.toBytes(dotProduct)); } context.write(key, put); } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; try { matrix = new DenseMatrix(new HamaConfiguration(conf), conf .get("input.matrix")); } catch (IOException e) { e.printStackTrace(); } } } } }}}