Changes between Initial Version and Version 1 of waue/2011/0426_7


Ignore:
Timestamp:
Apr 25, 2011, 4:26:56 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2011/0426_7

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4Hadoop 與 RDBMS 的支援
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6Hadoop 0.20 + JDBC + MySQL 5
     7</big></big></div>
     8}}}
     9[[PageOutline]]
     10
     11= 說明 =
     12
     13 * 需先安裝 hadoop 0.20 , apache2 , MySQL 5 server & client , phpmyadmin
     14 * 需將 [http://dev.mysql.com/downloads/connector/j/ mysql-connector-java-*.jar] 放到 lib 目錄下
     15 * MySQL 內,先建立一個school 的資料庫,內含 teacher 的table ,並新增一些資料,如下:
     16{{{
     17#!sql
     18DROP TABLE IF EXISTS `school`.`teacher`;
     19CREATE TABLE  `school`.`teacher` (
     20  `id` int(11) default NULL,
     21  `name` char(20) default NULL,
     22  `age` int(11) default NULL,
     23  `departmentID` int(11) default NULL
     24) ENGINE=InnoDB DEFAULT CHARSET=latin1;
     25}}}
     26
     27[[Image(wiki:NCHCCloudCourse100928_MYSQL:pd1.png)]]
     28
     29= 程式碼 =
     30
     31 = DBAccess.java =
     32
     33{{{
     34#!java
     35package db;
     36import java.io.DataInput;
     37import java.io.DataOutput;
     38import java.io.IOException;
     39import java.sql.PreparedStatement;
     40import java.sql.ResultSet;
     41import java.sql.SQLException;
     42import org.apache.hadoop.fs.FileSystem;
     43import org.apache.hadoop.fs.Path;
     44import org.apache.hadoop.io.LongWritable;
     45import org.apache.hadoop.io.Text;
     46import org.apache.hadoop.io.Writable;
     47import org.apache.hadoop.mapred.FileOutputFormat;
     48import org.apache.hadoop.mapred.JobClient;
     49import org.apache.hadoop.mapred.JobConf;
     50import org.apache.hadoop.mapred.MapReduceBase;
     51import org.apache.hadoop.mapred.Mapper;
     52import org.apache.hadoop.mapred.OutputCollector;
     53import org.apache.hadoop.mapred.Reporter;
     54import org.apache.hadoop.mapred.lib.IdentityReducer;
     55import org.apache.hadoop.mapred.lib.db.DBConfiguration;
     56import org.apache.hadoop.mapred.lib.db.DBInputFormat;
     57import org.apache.hadoop.mapred.lib.db.DBWritable;
     58public class DBAccess {
     59        @SuppressWarnings("deprecation")
     60        public static void main(String[] args) throws IOException {
     61    String[] argc={"jdbc:mysql://localhost/school","root", "itri"}; argv=argc;
     62       
     63    try {
     64     
     65      JobConf conf = new JobConf(DBAccess.class);
     66        Class.forName("com.mysql.jdbc.Driver");
     67      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
     68          argv[0], argv[1], argv[2]);
     69                        conf.setOutputKeyClass(LongWritable.class);
     70                        conf.setOutputValueClass(Text.class);
     71                        conf.setInputFormat(DBInputFormat.class);
     72                        Path dstPath = new Path("dboutput3");
     73                        FileOutputFormat.setOutputPath(conf, dstPath);
     74                        String[] fields = { "id", "name", "age", "departmentID" };
     75                        DBInputFormat.setInput(conf, TeacherRecord.class, "teacher", null,
     76                                        "id", fields);
     77                        conf.setMapperClass(DBAccessMapper.class);
     78                        conf.setReducerClass(IdentityReducer.class);
     79                       
     80                        FileSystem hdfs = dstPath.getFileSystem(conf);
     81                        if (hdfs.exists(dstPath)) {
     82                                hdfs.delete(dstPath, true);
     83                        }
     84                        JobClient.runJob(conf);
     85                }
     86                catch(ClassNotFoundException e) {
     87                    System.err.println("mysql.jdbc.Driver not found"); 
     88                }
     89        }
     90}
     91}}}
     92
     93 = DBAccessMapper.java =
     94
     95{{{
     96#!java
     97package db;
     98import java.io.IOException;
     99import org.apache.hadoop.io.LongWritable;
     100import org.apache.hadoop.io.Text;
     101import org.apache.hadoop.mapred.MapReduceBase;
     102import org.apache.hadoop.mapred.Mapper;
     103import org.apache.hadoop.mapred.OutputCollector;
     104import org.apache.hadoop.mapred.Reporter;
     105public class DBAccessMapper extends MapReduceBase implements
     106                Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
     107        public void map(LongWritable key, TeacherRecord value,
     108                        OutputCollector<LongWritable, Text> collector, Reporter reporter)
     109                        throws IOException {
     110                collector.collect(new LongWritable(value.id),
     111                new Text(value.toString()));
     112        }
     113}
     114}}}
     115
     116 = !TeacherRecord.java =
     117
     118{{{
     119#!java
     120package db;
     121import java.io.DataInput;
     122import java.io.DataOutput;
     123import java.io.IOException;
     124import java.sql.PreparedStatement;
     125import java.sql.ResultSet;
     126import java.sql.SQLException;
     127import org.apache.hadoop.io.Text;
     128import org.apache.hadoop.io.Writable;
     129import org.apache.hadoop.mapred.lib.db.DBWritable;
     130public class TeacherRecord implements Writable, DBWritable {
     131        int id;
     132        String name;
     133        int age;
     134        int departmentID;
     135        @Override
     136        public void readFields(DataInput in) throws IOException {
     137                // TODO Auto-generated method stub
     138                this.id = in.readInt();
     139                this.name = Text.readString(in);
     140                this.age = in.readInt();
     141                this.departmentID = in.readInt();
     142        }
     143        @Override
     144        public void write(DataOutput out) throws IOException {
     145                // TODO Auto-generated method stub
     146                out.writeInt(this.id);
     147                Text.writeString(out, this.name);
     148                out.writeInt(this.age);
     149                out.writeInt(this.departmentID);
     150        }
     151        @Override
     152        public void readFields(ResultSet result) throws SQLException {
     153                // TODO Auto-generated method stub
     154                this.id = result.getInt(1);
     155                this.name = result.getString(2);
     156                this.age = result.getInt(3);
     157                this.departmentID = result.getInt(4);
     158        }
     159        @Override
     160        public void write(PreparedStatement stmt) throws SQLException {
     161                // TODO Auto-generated method stub
     162                stmt.setInt(1, this.id);
     163                stmt.setString(2, this.name);
     164                stmt.setInt(3, this.age);
     165                stmt.setInt(4, this.departmentID);
     166        }
     167        @Override
     168        public String toString() {
     169                // TODO Auto-generated method stub
     170                return new String(this.name + " " + this.age + " " + this.departmentID);
     171        }
     172}
     173}}}
     174
     175
     176
     177 = 執行結果 =
     178
     179{{{
     180$ /opt/hadoop/bin/hadoop dfs -cat dboutput/part-00000
     1810       waue 29 920
     1821       rock 30 1231
     1831       2 3 4
     184}}}
     185
     186 * 引用 [http://jaguar13.javaeye.com/blog/683392]