wiki:waue/2008/0701

程式宣告

  • Program: HBaseRecordPro.java
  • Editor: Waue Chen
  • From : NCHC. Taiwn
  • Last Update Date: 07/01/2008

程式功能

  • Program would parse your record and create Hbase
  • Then it sets the first line as column qualify
  • Finally it stores in HBase automatically.

如何使用

  • Make sure two thing :
  • 1. source_file must be regular as follow:
  • first line: qualify1:qualify2:...:qualifyN
  • other line: records1:records2:...:recordsN
  • 2. source_file path must be correct.

原始檔內容

name:locate:years
waue:taiwan:1981
rock:taiwan:1981
aso:taiwan:1981
jazz:taiwan:1982

結果

 * 	Go to hbase console, type : 
 * 		hql > select * from t1_table; 
 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 

+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | member:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 0                       | member:name             | waue                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | member:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
| 17                      | member:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 17                      | member:name             | rock                    |
+-------------------------+-------------------------+-------------------------+
| 17                      | member:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
| 34                      | member:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 34                      | member:name             | aso                     |
+-------------------------+-------------------------+-------------------------+
| 34                      | member:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
| 50                      | member:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 50                      | member:name             | jazz                    |
+-------------------------+-------------------------+-------------------------+
| 50                      | member:years            | 1982                    |
+-------------------------+-------------------------+-------------------------+
4 row(s) in set. (0.31 sec)

 */

程式碼

package tw.org.nchc.code;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class HBaseRecordPro {
	/* Major parameter */
	// it indicates local path, not hadoop file system path
	final static String source_file = "/home/waue/test.txt";

	/* Minor parameter */
	// column family name
	final static String column_family = "member:";

	// table name
	final static String table_name = "HBaseRecord";

	// separate char
	final static String sp = ":";
	
	// conf tmp with column qualify
	final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
	
	// data source tmp
	final static String text_tmp = "/tmp/HBaseRecord.text.tmp";

	// on this sample, map is nonuse, we use reduce to handle
	private static class ReduceClass extends TableReduce<LongWritable, Text> {
		public void reduce(LongWritable key, Iterator<Text> values,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {

			// read the configure file
			BufferedReader fconf = new BufferedReader(new FileReader(new File(
					conf_tmp)));
			String first_line = fconf.readLine();
			fconf.close();
			// extract cf data
			String[] cf = first_line.split(sp);
			int length = cf.length;
			 
			// values.next().getByte() can get value and transfer to byte form,
			String stro = new String(values.next().getBytes());
			String str[] = stro.split(sp);

			// Column id is created dymanically,
			Text[] col_n = new Text[length];
			byte[][] b_l = new byte[length][];
			// contents must be ImmutableBytesWritable
			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];

			// This map connect to hbase table and holds the columns per row
			MapWritable map = new MapWritable();
			map.clear();

			// prepare to write data into map
			for (int i = 0; i < length; i++) {
				col_n[i] = new Text(column_family + cf[i]);
				b_l[i] = str[i].getBytes();
				w_l[i] = new ImmutableBytesWritable(b_l[i]);
				// populate the current row
				map.put(col_n[i], w_l[i]);
			}
			// add the row with the key as the row id
			output.collect(new Text(key.toString()), map);
		}
	}

	public HBaseRecordPro() {
	}
	
	// This function can split the source text into two file, \
	// 	conf_tmp file recorded first line is used to set column qualify \
	//	text_tmp , ou, recorded data is used to store into table.
	public String parseFirstLine(String in, String ou)
			throws IOException {

		BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
		BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
		BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
		String first_line, data;
		first_line = fi.readLine();
		ff.write(first_line);
		ff.flush();
		do {
			data = fi.readLine();
			if (data == null) {
				break;
			} else {
				fw.write(data + "\n");
				fw.flush();
			}
		} while (true);
		fw.close();
		ff.close();
		return first_line;
	}
	// tmp file delete
	boolean deleteFile(String str)throws IOException{
		File df = new File(str);
		
		if(df.exists()){
			if(!df.delete()){
				System.err.print("delete file error !");
			}
		}else{
			System.out.println("file not exit!");
		}
		return true;
	}
	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {

		HBaseRecordPro setup = new HBaseRecordPro();
		String[] col_family = {column_family};
		Path text_path = new Path(text_tmp);
		
		setup.parseFirstLine(source_file, text_tmp);
//		System.out.println(first_line);

		BuildHTable build_table = new BuildHTable(table_name,
				col_family);
		if (!build_table.checkTableExist(table_name)) {
			if (!build_table.createTable()) {
				System.out.println("create table error !");
			}
		} else {
			System.out.println("Table \"" + table_name
					+ "\" has already existed !");
		}
		JobConf conf = new JobConf(HBaseRecordPro.class);
		FileSystem fileconf = FileSystem.get(conf);
		fileconf.copyFromLocalFile(true, text_path, text_path);
		// Job name; you can modify to any you like
		conf.setJobName("PersonDataBase");
		final int mapTasks = 1;
		final int reduceTasks = 1;
		// Hbase table name must be correct , in our profile is t1_table
		TableReduce.initJob(table_name, ReduceClass.class, conf);

		// below are map-reduce profile
		conf.setNumMapTasks(mapTasks);
		conf.setNumReduceTasks(reduceTasks);
		conf.setInputPath(text_path);
		conf.setMapperClass(IdentityMapper.class);
		conf.setCombinerClass(IdentityReducer.class);
		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);
		
		// delete tmp file
		FileSystem.get(conf).delete(text_path);
		setup.deleteFile(conf_tmp);
	}
}
Last modified 14 years ago Last modified on Jun 29, 2010, 11:27:03 AM