| | 1 | {{{ |
| | 2 | import java.io.File; |
| | 3 | import java.io.IOException; |
| | 4 | import java.util.Map; |
| | 5 | |
| | 6 | import org.apache.commons.logging.Log; |
| | 7 | import org.apache.commons.logging.LogFactory; |
| | 8 | import org.apache.hadoop.fs.FileUtil; |
| | 9 | import org.apache.hadoop.hbase.HColumnDescriptor; |
| | 10 | import org.apache.hadoop.hbase.HConstants; |
| | 11 | import org.apache.hadoop.hbase.HTableDescriptor; |
| | 12 | import org.apache.hadoop.hbase.MultiRegionTable; |
| | 13 | import org.apache.hadoop.hbase.client.HTable; |
| | 14 | import org.apache.hadoop.hbase.client.Result; |
| | 15 | import org.apache.hadoop.hbase.client.Scan; |
| | 16 | import org.apache.hadoop.hbase.client.ResultScanner; |
| | 17 | import org.apache.hadoop.hbase.io.BatchUpdate; |
| | 18 | import org.apache.hadoop.hbase.io.Cell; |
| | 19 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| | 20 | import org.apache.hadoop.hbase.io.RowResult; |
| | 21 | import org.apache.hadoop.hbase.util.Bytes; |
| | 22 | import org.apache.hadoop.mapred.JobClient; |
| | 23 | import org.apache.hadoop.mapred.JobConf; |
| | 24 | import org.apache.hadoop.mapred.MapReduceBase; |
| | 25 | import org.apache.hadoop.mapred.MiniMRCluster; |
| | 26 | import org.apache.hadoop.mapred.OutputCollector; |
| | 27 | import org.apache.hadoop.mapred.Reporter; |
| | 28 | |
| | 29 | /** |
| | 30 | * Test Map/Reduce job over HBase tables. The map/reduce process we're testing |
| | 31 | * on our tables is simple - take every row in the table, reverse the value of |
| | 32 | * a particular cell, and write it back to the table. |
| | 33 | */ |
| | 34 | public class TestTableMapReduce extends MultiRegionTable { |
| | 35 | private static final Log LOG = |
| | 36 | LogFactory.getLog(TestTableMapReduce.class.getName()); |
| | 37 | |
| | 38 | static final String MULTI_REGION_TABLE_NAME = "mrtest"; |
| | 39 | static final String INPUT_COLUMN = "contents:"; |
| | 40 | static final String OUTPUT_COLUMN = "text:"; |
| | 41 | |
| | 42 | private static final byte [][] columns = new byte [][] { |
| | 43 | Bytes.toBytes(INPUT_COLUMN), |
| | 44 | Bytes.toBytes(OUTPUT_COLUMN) |
| | 45 | }; |
| | 46 | |
| | 47 | /** constructor */ |
| | 48 | public TestTableMapReduce() { |
| | 49 | super(INPUT_COLUMN); |
| | 50 | desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); |
| | 51 | desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); |
| | 52 | desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); |
| | 53 | } |
| | 54 | |
| | 55 | /** |
| | 56 | * Pass the given key and processed record reduce |
| | 57 | */ |
| | 58 | public static class ProcessContentsMapper |
| | 59 | extends MapReduceBase |
| | 60 | implements TableMap<ImmutableBytesWritable, BatchUpdate> { |
| | 61 | /** |
| | 62 | * Pass the key, and reversed value to reduce |
| | 63 | * @param key |
| | 64 | * @param value |
| | 65 | * @param output |
| | 66 | * @param reporter |
| | 67 | * @throws IOException |
| | 68 | */ |
| | 69 | public void map(ImmutableBytesWritable key, RowResult value, |
| | 70 | OutputCollector<ImmutableBytesWritable, BatchUpdate> output, |
| | 71 | Reporter reporter) |
| | 72 | throws IOException { |
| | 73 | if (value.size() != 1) { |
| | 74 | throw new IOException("There should only be one input column"); |
| | 75 | } |
| | 76 | byte [][] keys = value.keySet().toArray(new byte[value.size()][]); |
| | 77 | if(!Bytes.equals(keys[0], Bytes.toBytes(INPUT_COLUMN))) { |
| | 78 | throw new IOException("Wrong input column. Expected: '" + INPUT_COLUMN |
| | 79 | + "' but got: '" + Bytes.toString(keys[0]) + "'"); |
| | 80 | } |
| | 81 | |
| | 82 | // Get the original value and reverse it |
| | 83 | |
| | 84 | String originalValue = |
| | 85 | new String(value.get(keys[0]).getValue(), HConstants.UTF8_ENCODING); |
| | 86 | StringBuilder newValue = new StringBuilder(); |
| | 87 | for(int i = originalValue.length() - 1; i >= 0; i--) { |
| | 88 | newValue.append(originalValue.charAt(i)); |
| | 89 | } |
| | 90 | |
| | 91 | // Now set the value to be collected |
| | 92 | |
| | 93 | BatchUpdate outval = new BatchUpdate(key.get()); |
| | 94 | outval.put(OUTPUT_COLUMN, Bytes.toBytes(newValue.toString())); |
| | 95 | output.collect(key, outval); |
| | 96 | } |
| | 97 | } |
| | 98 | |
| | 99 | /** |
| | 100 | * Test a map/reduce against a multi-region table |
| | 101 | * @throws IOException |
| | 102 | */ |
| | 103 | public void testMultiRegionTable() throws IOException { |
| | 104 | runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); |
| | 105 | } |
| | 106 | |
| | 107 | private void runTestOnTable(HTable table) throws IOException { |
| | 108 | MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); |
| | 109 | |
| | 110 | JobConf jobConf = null; |
| | 111 | try { |
| | 112 | LOG.info("Before map/reduce startup"); |
| | 113 | jobConf = new JobConf(conf, TestTableMapReduce.class); |
| | 114 | jobConf.setJobName("process column contents"); |
| | 115 | jobConf.setNumReduceTasks(1); |
| | 116 | TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()), |
| | 117 | INPUT_COLUMN, ProcessContentsMapper.class, |
| | 118 | ImmutableBytesWritable.class, BatchUpdate.class, jobConf); |
| | 119 | TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()), |
| | 120 | IdentityTableReduce.class, jobConf); |
| | 121 | |
| | 122 | LOG.info("Started " + Bytes.toString(table.getTableName())); |
| | 123 | JobClient.runJob(jobConf); |
| | 124 | LOG.info("After map/reduce completion"); |
| | 125 | |
| | 126 | // verify map-reduce results |
| | 127 | verify(Bytes.toString(table.getTableName())); |
| | 128 | } finally { |
| | 129 | mrCluster.shutdown(); |
| | 130 | if (jobConf != null) { |
| | 131 | FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); |
| | 132 | } |
| | 133 | } |
| | 134 | } |
| | 135 | |
| | 136 | private void verify(String tableName) throws IOException { |
| | 137 | HTable table = new HTable(conf, tableName); |
| | 138 | boolean verified = false; |
| | 139 | long pause = conf.getLong("hbase.client.pause", 5 * 1000); |
| | 140 | int numRetries = conf.getInt("hbase.client.retries.number", 5); |
| | 141 | for (int i = 0; i < numRetries; i++) { |
| | 142 | try { |
| | 143 | LOG.info("Verification attempt #" + i); |
| | 144 | verifyAttempt(table); |
| | 145 | verified = true; |
| | 146 | break; |
| | 147 | } catch (NullPointerException e) { |
| | 148 | // If here, a cell was empty. Presume its because updates came in |
| | 149 | // after the scanner had been opened. Wait a while and retry. |
| | 150 | LOG.debug("Verification attempt failed: " + e.getMessage()); |
| | 151 | } |
| | 152 | try { |
| | 153 | Thread.sleep(pause); |
| | 154 | } catch (InterruptedException e) { |
| | 155 | // continue |
| | 156 | } |
| | 157 | } |
| | 158 | assertTrue(verified); |
| | 159 | } |
| | 160 | |
| | 161 | /** |
| | 162 | * Looks at every value of the mapreduce output and verifies that indeed |
| | 163 | * the values have been reversed. |
| | 164 | * @param table Table to scan. |
| | 165 | * @throws IOException |
| | 166 | * @throws NullPointerException if we failed to find a cell value |
| | 167 | */ |
| | 168 | private void verifyAttempt(final HTable table) throws IOException, NullPointerException { |
| | 169 | Scan scan = new Scan(); |
| | 170 | scan.addColumns(columns); |
| | 171 | ResultScanner scanner = table.getScanner(scan); |
| | 172 | try { |
| | 173 | for (Result r : scanner) { |
| | 174 | if (LOG.isDebugEnabled()) { |
| | 175 | if (r.size() > 2 ) { |
| | 176 | throw new IOException("Too many results, expected 2 got " + |
| | 177 | r.size()); |
| | 178 | } |
| | 179 | } |
| | 180 | byte[] firstValue = null; |
| | 181 | byte[] secondValue = null; |
| | 182 | int count = 0; |
| | 183 | for(Map.Entry<byte [], Cell> e: r.getRowResult().entrySet()) { |
| | 184 | if (count == 0) { |
| | 185 | firstValue = e.getValue().getValue(); |
| | 186 | } |
| | 187 | if (count == 1) { |
| | 188 | secondValue = e.getValue().getValue(); |
| | 189 | } |
| | 190 | count++; |
| | 191 | if (count == 2) { |
| | 192 | break; |
| | 193 | } |
| | 194 | } |
| | 195 | |
| | 196 | String first = ""; |
| | 197 | if (firstValue == null) { |
| | 198 | throw new NullPointerException(Bytes.toString(r.getRow()) + |
| | 199 | ": first value is null"); |
| | 200 | } |
| | 201 | first = new String(firstValue, HConstants.UTF8_ENCODING); |
| | 202 | |
| | 203 | String second = ""; |
| | 204 | if (secondValue == null) { |
| | 205 | throw new NullPointerException(Bytes.toString(r.getRow()) + |
| | 206 | ": second value is null"); |
| | 207 | } |
| | 208 | byte[] secondReversed = new byte[secondValue.length]; |
| | 209 | for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { |
| | 210 | secondReversed[i] = secondValue[j]; |
| | 211 | } |
| | 212 | second = new String(secondReversed, HConstants.UTF8_ENCODING); |
| | 213 | |
| | 214 | if (first.compareTo(second) != 0) { |
| | 215 | if (LOG.isDebugEnabled()) { |
| | 216 | LOG.debug("second key is not the reverse of first. row=" + |
| | 217 | r.getRow() + ", first value=" + first + ", second value=" + |
| | 218 | second); |
| | 219 | } |
| | 220 | fail(); |
| | 221 | } |
| | 222 | } |
| | 223 | } finally { |
| | 224 | scanner.close(); |
| | 225 | } |
| | 226 | } |
| | 227 | } |
| | 228 | }}} |