| | 26 | |
| | 27 | |
| | 28 | {{{ |
| | 29 | #!java |
| | 30 | |
| | 31 | import java.io.IOException; |
| | 32 | import java.nio.ByteBuffer; |
| | 33 | import java.util.*; |
| | 34 | |
| | 35 | import org.apache.cassandra.thrift.Column; |
| | 36 | import org.apache.cassandra.thrift.ColumnOrSuperColumn; |
| | 37 | import org.apache.cassandra.thrift.Mutation; |
| | 38 | import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; |
| | 39 | import org.slf4j.Logger; |
| | 40 | import org.slf4j.LoggerFactory; |
| | 41 | |
| | 42 | import static com.google.common.base.Charsets.UTF_8; |
| | 43 | |
| | 44 | import org.apache.cassandra.db.IColumn; |
| | 45 | import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; |
| | 46 | import org.apache.cassandra.hadoop.ConfigHelper; |
| | 47 | import org.apache.cassandra.thrift.SlicePredicate; |
| | 48 | import org.apache.cassandra.utils.ByteBufferUtil; |
| | 49 | import org.apache.hadoop.conf.Configuration; |
| | 50 | import org.apache.hadoop.conf.Configured; |
| | 51 | import org.apache.hadoop.fs.Path; |
| | 52 | import org.apache.hadoop.io.IntWritable; |
| | 53 | import org.apache.hadoop.io.Text; |
| | 54 | import org.apache.hadoop.mapreduce.Job; |
| | 55 | import org.apache.hadoop.mapreduce.Mapper; |
| | 56 | import org.apache.hadoop.mapreduce.Reducer; |
| | 57 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| | 58 | import org.apache.hadoop.util.Tool; |
| | 59 | import org.apache.hadoop.util.ToolRunner; |
| | 60 | |
| | 61 | /** |
| | 62 | * This counts the occurrences of words in ColumnFamily Standard1, that has a single column (that we care about) |
| | 63 | * "text" containing a sequence of words. |
| | 64 | * |
| | 65 | * For each word, we output the total number of occurrences across all texts. |
| | 66 | * |
| | 67 | * When outputting to Cassandra, we write the word counts as a {word, count} column/value pair, |
| | 68 | * with a row key equal to the name of the source column we read the words from. |
| | 69 | */ |
| | 70 | public class WordCount extends Configured implements Tool |
| | 71 | { |
| | 72 | private static final Logger logger = LoggerFactory.getLogger(WordCount.class); |
| | 73 | |
| | 74 | static final String KEYSPACE = "wordcount"; |
| | 75 | static final String COLUMN_FAMILY = "input_words"; |
| | 76 | |
| | 77 | static final String OUTPUT_REDUCER_VAR = "output_reducer"; |
| | 78 | static final String OUTPUT_COLUMN_FAMILY = "output_words"; |
| | 79 | private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; |
| | 80 | |
| | 81 | private static final String CONF_COLUMN_NAME = "columnname"; |
| | 82 | |
| | 83 | public static void main(String[] args) throws Exception |
| | 84 | { |
| | 85 | // Let ToolRunner handle generic command-line options |
| | 86 | ToolRunner.run(new Configuration(), new WordCount(), args); |
| | 87 | System.exit(0); |
| | 88 | } |
| | 89 | |
| | 90 | public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> |
| | 91 | { |
| | 92 | private final static IntWritable one = new IntWritable(1); |
| | 93 | private Text word = new Text(); |
| | 94 | private ByteBuffer sourceColumn; |
| | 95 | |
| | 96 | protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) |
| | 97 | throws IOException, InterruptedException |
| | 98 | { |
| | 99 | sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); |
| | 100 | } |
| | 101 | |
| | 102 | public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException |
| | 103 | { |
| | 104 | IColumn column = columns.get(sourceColumn); |
| | 105 | if (column == null) |
| | 106 | return; |
| | 107 | String value = ByteBufferUtil.string(column.value()); |
| | 108 | logger.debug("read " + key + ":" + value + " from " + context.getInputSplit()); |
| | 109 | |
| | 110 | StringTokenizer itr = new StringTokenizer(value); |
| | 111 | while (itr.hasMoreTokens()) |
| | 112 | { |
| | 113 | word.set(itr.nextToken()); |
| | 114 | context.write(word, one); |
| | 115 | } |
| | 116 | } |
| | 117 | } |
| | 118 | |
| | 119 | public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> |
| | 120 | { |
| | 121 | public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException |
| | 122 | { |
| | 123 | int sum = 0; |
| | 124 | for (IntWritable val : values) |
| | 125 | sum += val.get(); |
| | 126 | context.write(key, new IntWritable(sum)); |
| | 127 | } |
| | 128 | } |
| | 129 | |
| | 130 | public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> |
| | 131 | { |
| | 132 | private ByteBuffer outputKey; |
| | 133 | |
| | 134 | protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) |
| | 135 | throws IOException, InterruptedException |
| | 136 | { |
| | 137 | outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); |
| | 138 | } |
| | 139 | |
| | 140 | public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException |
| | 141 | { |
| | 142 | int sum = 0; |
| | 143 | for (IntWritable val : values) |
| | 144 | sum += val.get(); |
| | 145 | context.write(outputKey, Collections.singletonList(getMutation(word, sum))); |
| | 146 | } |
| | 147 | |
| | 148 | private static Mutation getMutation(Text word, int sum) |
| | 149 | { |
| | 150 | Column c = new Column(); |
| | 151 | c.setName(Arrays.copyOf(word.getBytes(), word.getLength())); |
| | 152 | c.setValue(ByteBufferUtil.bytes(String.valueOf(sum))); |
| | 153 | c.setTimestamp(System.currentTimeMillis()); |
| | 154 | |
| | 155 | Mutation m = new Mutation(); |
| | 156 | m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); |
| | 157 | m.column_or_supercolumn.setColumn(c); |
| | 158 | return m; |
| | 159 | } |
| | 160 | } |
| | 161 | |
| | 162 | public int run(String[] args) throws Exception |
| | 163 | { |
| | 164 | String outputReducerType = "filesystem"; |
| | 165 | if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) |
| | 166 | { |
| | 167 | String[] s = args[0].split("="); |
| | 168 | if (s != null && s.length == 2) |
| | 169 | outputReducerType = s[1]; |
| | 170 | } |
| | 171 | logger.info("output reducer type: " + outputReducerType); |
| | 172 | |
| | 173 | for (int i = 0; i < WordCountSetup.TEST_COUNT; i++) |
| | 174 | { |
| | 175 | String columnName = "text" + i; |
| | 176 | getConf().set(CONF_COLUMN_NAME, columnName); |
| | 177 | |
| | 178 | Job job = new Job(getConf(), "wordcount"); |
| | 179 | job.setJarByClass(WordCount.class); |
| | 180 | job.setMapperClass(TokenizerMapper.class); |
| | 181 | |
| | 182 | if (outputReducerType.equalsIgnoreCase("filesystem")) |
| | 183 | { |
| | 184 | job.setCombinerClass(ReducerToFilesystem.class); |
| | 185 | job.setReducerClass(ReducerToFilesystem.class); |
| | 186 | job.setOutputKeyClass(Text.class); |
| | 187 | job.setOutputValueClass(IntWritable.class); |
| | 188 | FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); |
| | 189 | } |
| | 190 | else |
| | 191 | { |
| | 192 | job.setReducerClass(ReducerToCassandra.class); |
| | 193 | |
| | 194 | job.setMapOutputKeyClass(Text.class); |
| | 195 | job.setMapOutputValueClass(IntWritable.class); |
| | 196 | job.setOutputKeyClass(ByteBuffer.class); |
| | 197 | job.setOutputValueClass(List.class); |
| | 198 | |
| | 199 | job.setOutputFormatClass(ColumnFamilyOutputFormat.class); |
| | 200 | |
| | 201 | ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); |
| | 202 | } |
| | 203 | |
| | 204 | job.setInputFormatClass(ColumnFamilyInputFormat.class); |
| | 205 | |
| | 206 | |
| | 207 | ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); |
| | 208 | ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); |
| | 209 | ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); |
| | 210 | ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); |
| | 211 | SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName))); |
| | 212 | ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); |
| | 213 | |
| | 214 | job.waitForCompletion(true); |
| | 215 | } |
| | 216 | return 0; |
| | 217 | } |
| | 218 | } |
| | 219 | }}} |