程式宣告
- Program: HBaseRecordPro.java
- Editor: Waue Chen
- From : NCHC. Taiwn
- Last Update Date: 07/01/2008
程式功能
- Program would parse your record and create Hbase
- Then it sets the first line as column qualify
- Finally it stores in HBase automatically.
如何使用
- Make sure two thing :
- 1. source_file must be regular as follow:
- first line: qualify1:qualify2:...:qualifyN
- other line: records1:records2:...:recordsN
- 2. source_file path must be correct.
原始檔內容
name:locate:years waue:taiwan:1981 rock:taiwan:1981 aso:taiwan:1981 jazz:taiwan:1982
結果
* Go to hbase console, type : * hql > select * from t1_table; 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 0 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 0 | member:name | waue | +-------------------------+-------------------------+-------------------------+ | 0 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 17 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 17 | member:name | rock | +-------------------------+-------------------------+-------------------------+ | 17 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 34 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 34 | member:name | aso | +-------------------------+-------------------------+-------------------------+ | 34 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 50 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 50 | member:name | jazz | +-------------------------+-------------------------+-------------------------+ | 50 | member:years | 1982 | +-------------------------+-------------------------+-------------------------+ 4 row(s) in set. (0.31 sec) */
程式碼
package tw.org.nchc.code;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
public class HBaseRecordPro {
/* Major parameter */
// it indicates local path, not hadoop file system path
final static String source_file = "/home/waue/test.txt";
/* Minor parameter */
// column family name
final static String column_family = "member:";
// table name
final static String table_name = "HBaseRecord";
// separate char
final static String sp = ":";
// conf tmp with column qualify
final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
// data source tmp
final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
// on this sample, map is nonuse, we use reduce to handle
private static class ReduceClass extends TableReduce<LongWritable, Text> {
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<Text, MapWritable> output, Reporter reporter)
throws IOException {
// read the configure file
BufferedReader fconf = new BufferedReader(new FileReader(new File(
conf_tmp)));
String first_line = fconf.readLine();
fconf.close();
// extract cf data
String[] cf = first_line.split(sp);
int length = cf.length;
// values.next().getByte() can get value and transfer to byte form,
String stro = new String(values.next().getBytes());
String str[] = stro.split(sp);
// Column id is created dymanically,
Text[] col_n = new Text[length];
byte[][] b_l = new byte[length][];
// contents must be ImmutableBytesWritable
ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
// This map connect to hbase table and holds the columns per row
MapWritable map = new MapWritable();
map.clear();
// prepare to write data into map
for (int i = 0; i < length; i++) {
col_n[i] = new Text(column_family + cf[i]);
b_l[i] = str[i].getBytes();
w_l[i] = new ImmutableBytesWritable(b_l[i]);
// populate the current row
map.put(col_n[i], w_l[i]);
}
// add the row with the key as the row id
output.collect(new Text(key.toString()), map);
}
}
public HBaseRecordPro() {
}
// This function can split the source text into two file, \
// conf_tmp file recorded first line is used to set column qualify \
// text_tmp , ou, recorded data is used to store into table.
public String parseFirstLine(String in, String ou)
throws IOException {
BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
String first_line, data;
first_line = fi.readLine();
ff.write(first_line);
ff.flush();
do {
data = fi.readLine();
if (data == null) {
break;
} else {
fw.write(data + "\n");
fw.flush();
}
} while (true);
fw.close();
ff.close();
return first_line;
}
// tmp file delete
boolean deleteFile(String str)throws IOException{
File df = new File(str);
if(df.exists()){
if(!df.delete()){
System.err.print("delete file error !");
}
}else{
System.out.println("file not exit!");
}
return true;
}
/**
* Runs the demo.
*/
public static void main(String[] args) throws IOException {
HBaseRecordPro setup = new HBaseRecordPro();
String[] col_family = {column_family};
Path text_path = new Path(text_tmp);
setup.parseFirstLine(source_file, text_tmp);
// System.out.println(first_line);
BuildHTable build_table = new BuildHTable(table_name,
col_family);
if (!build_table.checkTableExist(table_name)) {
if (!build_table.createTable()) {
System.out.println("create table error !");
}
} else {
System.out.println("Table \"" + table_name
+ "\" has already existed !");
}
JobConf conf = new JobConf(HBaseRecordPro.class);
FileSystem fileconf = FileSystem.get(conf);
fileconf.copyFromLocalFile(true, text_path, text_path);
// Job name; you can modify to any you like
conf.setJobName("PersonDataBase");
final int mapTasks = 1;
final int reduceTasks = 1;
// Hbase table name must be correct , in our profile is t1_table
TableReduce.initJob(table_name, ReduceClass.class, conf);
// below are map-reduce profile
conf.setNumMapTasks(mapTasks);
conf.setNumReduceTasks(reduceTasks);
conf.setInputPath(text_path);
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(IdentityReducer.class);
conf.setReducerClass(ReduceClass.class);
JobClient.runJob(conf);
// delete tmp file
FileSystem.get(conf).delete(text_path);
setup.deleteFile(conf_tmp);
}
}
Last modified 15 years ago
Last modified on Jun 29, 2010, 11:27:03 AM
