Changes between Initial Version and Version 1 of waue/2011/0425_Itri3CalculateMR


Ignore:
Timestamp:
Apr 25, 2011, 4:20:45 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2011/0425_Itri3CalculateMR

    v1 v1  
     1{{{
     2#!java
     3
     4package itri;
     5
     6//ITRI serial program number 3
     7//0. after Itri2Count
     8//1. run it by MapReduce
     9
     10
     11import java.io.IOException;
     12
     13import org.apache.hadoop.conf.Configuration;
     14import org.apache.hadoop.hbase.client.Put;
     15import org.apache.hadoop.hbase.client.Result;
     16import org.apache.hadoop.hbase.client.Scan;
     17import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     18import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
     19import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     20import org.apache.hadoop.hbase.mapreduce.TableMapper;
     21import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     22import org.apache.hadoop.hbase.mapreduce.TableReducer;
     23import org.apache.hadoop.hbase.util.Bytes;
     24import org.apache.hadoop.io.Text;
     25import org.apache.hadoop.mapreduce.Job;
     26
     27public class Itri3CalculateMR {
     28        public static class HtMap extends TableMapper<Text, Text> {
     29
     30                public void map(ImmutableBytesWritable key, Result value,
     31                                Context context) throws IOException, InterruptedException {
     32
     33                        String row = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
     34                                        Bytes.toBytes("Locate")));
     35
     36                        int sum = 0;
     37
     38                        for (int i = 0; i < 4; i++) {
     39                                String v = Bytes.toString(value.getValue(Bytes
     40                                                .toBytes("Products"), Bytes.toBytes("P" + (i + 1))));
     41                                String c = Bytes.toString(value.getValue(Bytes
     42                                                .toBytes("Turnover"), Bytes.toBytes("P" + (i + 1))));
     43                                if (v != null ) {
     44                                        if(c == null) c="0";
     45                                        System.err.println("p=" + v);
     46                                        System.err.println("c=" + c);
     47
     48                                        sum += Integer.parseInt(v) * Integer.parseInt(c);
     49                                        System.err.println("T" + row + ":" + "p[" + i + "]*" + "c["
     50                                                        + i + "] => " + v + "*" + c + "+="
     51                                                        + (sum));
     52                                       
     53                                }
     54
     55                        }
     56
     57                        context.write(new Text("T" + row), new Text(String.valueOf(sum)));
     58
     59                }
     60        }
     61
     62
     63        public static class HtReduce extends TableReducer<Text, Text, Text> {
     64
     65                public void reduce(Text key, Iterable<Text> values, Context context)
     66                                throws IOException, InterruptedException {
     67                        String sum = "";
     68                        for (Text i : values) {
     69                                sum += i.toString();
     70                        }
     71
     72                        Put put = new Put(Bytes.toBytes(key.toString()));
     73
     74                        put.add(Bytes.toBytes("Turnover"), Bytes.toBytes("Sum"), Bytes
     75                                        .toBytes(sum));
     76
     77                        context.write(new Text(), put);
     78
     79                }
     80        }
     81
     82        public static void main(String args[]) throws Exception {
     83
     84                String tablename = "itri";
     85
     86                Scan myScan = new Scan();
     87                myScan.addColumn("Detail:Locate".getBytes());
     88                myScan.addColumn("Products:P1".getBytes());
     89                myScan.addColumn("Products:P2".getBytes());
     90                myScan.addColumn("Products:P3".getBytes());
     91                myScan.addColumn("Products:P4".getBytes());
     92                myScan.addColumn("Turnover:P1".getBytes());
     93                myScan.addColumn("Turnover:P2".getBytes());
     94                myScan.addColumn("Turnover:P3".getBytes());
     95                myScan.addColumn("Turnover:P4".getBytes());
     96
     97                Configuration conf = new Configuration();
     98
     99                Job job = new Job(conf, "Calculating ");
     100
     101                job.setJarByClass(Itri3CalculateMR.class);
     102
     103                job.setMapperClass(HtMap.class);
     104                job.setReducerClass(HtReduce.class);
     105
     106                job.setMapOutputKeyClass(Text.class);
     107                job.setMapOutputValueClass(Text.class);
     108
     109                job.setInputFormatClass(TableInputFormat.class);
     110                job.setOutputFormatClass(TableOutputFormat.class);
     111
     112                TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
     113                                Text.class, Text.class, job);
     114                TableMapReduceUtil.initTableReducerJob(tablename, HtReduce.class, job);
     115
     116                System.exit(job.waitForCompletion(true) ? 0 : 1);
     117
     118        }
     119}
     120
     121}}}