HBASE-1850 src/examples/mapred do not compile after HBASE-1822
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@816323 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f294977d2
commit
dfd1091da5
|
@ -33,6 +33,7 @@ Release 0.21.0 - Unreleased
|
|||
for when Writable is not Configurable (Stack via jgray)
|
||||
HBASE-1847 Delete latest of a null qualifier when non-null qualifiers
|
||||
exist throws a RuntimeException
|
||||
HBASE-1850 src/examples/mapred do not compile after HBASE-1822
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -1,139 +1,148 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.mapred.TableReduce;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapReduceBase;
|
||||
import org.apache.hadoop.mapred.Mapper;
|
||||
import org.apache.hadoop.mapred.OutputCollector;
|
||||
import org.apache.hadoop.mapred.Reporter;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
|
||||
/*
|
||||
* Sample uploader.
|
||||
*
|
||||
/**
|
||||
* Sample Uploader MapReduce
|
||||
* <p>
|
||||
* This is EXAMPLE code. You will need to change it to work for your context.
|
||||
*
|
||||
* Uses TableReduce to put the data into hbase. Change the InputFormat to suit
|
||||
* your data. Use the map to massage the input so it fits hbase. Currently its
|
||||
* just a pass-through map. In the reduce, you need to output a row and a
|
||||
* map of columns to cells. Change map and reduce to suit your input.
|
||||
*
|
||||
* <p>The below is wired up to handle an input whose format is a text file
|
||||
* which has a line format as follow:
|
||||
* <pre>
|
||||
* row columnname columndata
|
||||
* </pre>
|
||||
*
|
||||
* <p>The table and columnfamily we're to insert into must preexist.
|
||||
*
|
||||
* <p>
|
||||
* Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
|
||||
* to suit your data. In this example, we are importing a CSV file.
|
||||
* <p>
|
||||
* <pre>row,family,qualifier,value</pre>
|
||||
* <p>
|
||||
* The table and columnfamily we're to insert into must preexist.
|
||||
* <p>
|
||||
* There is no reducer in this example as it is not necessary and adds
|
||||
* significant overhead. If you need to do any massaging of data before
|
||||
* inserting into HBase, you can do this in the map as well.
|
||||
* <p>Do the following to start the MR job:
|
||||
* <pre>
|
||||
* ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
|
||||
* ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
|
||||
* </pre>
|
||||
*
|
||||
* <p>This code was written against hbase 0.1 branch.
|
||||
* <p>
|
||||
* This code was written against HBase 0.21 trunk.
|
||||
*/
|
||||
public class SampleUploader extends MapReduceBase
|
||||
implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>>,
|
||||
Tool {
|
||||
public class SampleUploader {
|
||||
|
||||
private static final String NAME = "SampleUploader";
|
||||
private Configuration conf;
|
||||
|
||||
public JobConf createSubmittableJob(String[] args)
|
||||
throws IOException {
|
||||
JobConf c = new JobConf(getConf(), SampleUploader.class);
|
||||
c.setJobName(NAME);
|
||||
FileInputFormat.setInputPaths(c, new Path(args[0]));
|
||||
c.setMapperClass(this.getClass());
|
||||
c.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
c.setMapOutputValueClass(HbaseMapWritable.class);
|
||||
c.setReducerClass(TableUploader.class);
|
||||
TableMapReduceUtil.initTableReduceJob(args[1], TableUploader.class, c);
|
||||
return c;
|
||||
}
|
||||
static class Uploader
|
||||
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
|
||||
|
||||
public void map(LongWritable k, Text v,
|
||||
OutputCollector<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> output,
|
||||
Reporter r)
|
||||
private long checkpoint = 100;
|
||||
private long count = 0;
|
||||
|
||||
@Override
|
||||
public void map(LongWritable key, Text line, Context context)
|
||||
throws IOException {
|
||||
// Lines are space-delimited; first item is row, next the columnname and
|
||||
// then the third the cell value.
|
||||
String tmp = v.toString();
|
||||
if (tmp.length() == 0) {
|
||||
|
||||
// Input is a CSV file
|
||||
// Each map() is a single line, where the key is the line number
|
||||
// Each line is comma-delimited; row,family,qualifier,value
|
||||
|
||||
// Split CSV line
|
||||
String [] values = line.toString().split(",");
|
||||
if(values.length != 4) {
|
||||
return;
|
||||
}
|
||||
String [] splits = v.toString().split(" ");
|
||||
HbaseMapWritable<byte [], byte []> mw =
|
||||
new HbaseMapWritable<byte [], byte []>();
|
||||
mw.put(Bytes.toBytes(splits[1]), Bytes.toBytes(splits[2]));
|
||||
byte [] row = Bytes.toBytes(splits[0]);
|
||||
r.setStatus("Map emitting " + splits[0] + " for record " + k.toString());
|
||||
output.collect(new ImmutableBytesWritable(row), mw);
|
||||
|
||||
// Extract each value
|
||||
byte [] row = Bytes.toBytes(values[0]);
|
||||
byte [] family = Bytes.toBytes(values[1]);
|
||||
byte [] qualifier = Bytes.toBytes(values[2]);
|
||||
byte [] value = Bytes.toBytes(values[3]);
|
||||
|
||||
// Create Put
|
||||
Put put = new Put(row);
|
||||
put.add(family, qualifier, value);
|
||||
|
||||
// Uncomment below to disable WAL. This will improve performance but means
|
||||
// you will experience data loss in the case of a RegionServer crash.
|
||||
// put.setWriteToWAL(false);
|
||||
|
||||
try {
|
||||
context.write(new ImmutableBytesWritable(row), put);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
public static class TableUploader extends MapReduceBase
|
||||
implements TableReduce<ImmutableBytesWritable, HbaseMapWritable<byte [], byte []>> {
|
||||
public void reduce(ImmutableBytesWritable k, Iterator<HbaseMapWritable<byte [], byte []>> v,
|
||||
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
|
||||
Reporter r)
|
||||
// Set status every checkpoint lines
|
||||
if(++count % checkpoint == 0) {
|
||||
context.setStatus("Emitting Put " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Job configuration.
|
||||
*/
|
||||
public static Job configureJob(Configuration conf, String [] args)
|
||||
throws IOException {
|
||||
while (v.hasNext()) {
|
||||
r.setStatus("Reducer committing " + k);
|
||||
BatchUpdate bu = new BatchUpdate(k.get());
|
||||
while (v.hasNext()) {
|
||||
HbaseMapWritable<byte [], byte []> hmw = v.next();
|
||||
for (Entry<byte [], byte []> e: hmw.entrySet()) {
|
||||
bu.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
output.collect(k, bu);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int printUsage() {
|
||||
System.out.println(NAME + " <input> <table_name>");
|
||||
return -1;
|
||||
}
|
||||
|
||||
public int run(@SuppressWarnings("unused") String[] args) throws Exception {
|
||||
// Make sure there are exactly 2 parameters left.
|
||||
if (args.length != 2) {
|
||||
System.out.println("ERROR: Wrong number of parameters: " +
|
||||
args.length + " instead of 2.");
|
||||
return printUsage();
|
||||
}
|
||||
JobClient.runJob(createSubmittableJob(args));
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
public void setConf(final Configuration c) {
|
||||
this.conf = c;
|
||||
Path inputPath = new Path(args[0]);
|
||||
String tableName = args[1];
|
||||
Job job = new Job(conf, NAME + "_" + tableName);
|
||||
job.setJarByClass(Uploader.class);
|
||||
FileInputFormat.setInputPaths(job, inputPath);
|
||||
job.setInputFormatClass(SequenceFileInputFormat.class);
|
||||
job.setMapperClass(Uploader.class);
|
||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||
// because it sets up the TableOutputFormat.
|
||||
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
|
||||
job.setNumReduceTasks(0);
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point.
|
||||
*
|
||||
* @param args The command line parameters.
|
||||
* @throws Exception When running the job fails.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
|
||||
args);
|
||||
System.exit(errCode);
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||
if(otherArgs.length != 2) {
|
||||
System.err.println("Wrong number of arguments: " + otherArgs.length);
|
||||
System.err.println("Usage: " + NAME + " <input> <tablename>");
|
||||
System.exit(-1);
|
||||
}
|
||||
Job job = configureJob(conf, otherArgs);
|
||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue