HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x AND remove bit where we can make < maps than regions

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@789846 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-30 17:42:53 +00:00
parent 97816a31a1
commit 94165dbc02
20 changed files with 744 additions and 1065 deletions

View File

@ -421,6 +421,9 @@ Release 0.20.0 - Unreleased
HBASE-1587 Update ganglia config and doc to account for ganglia 3.1 and
hadoop-4675
HBASE-1589 Up zk maxClientCnxns from default of 10 to 20 or 30 or so
HBASE-1385 Revamp TableInputFormat, needs updating to match hadoop 0.20.x
AND remove bit where we can make < maps than regions
(Lars George via Stack)
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue

View File

@ -108,6 +108,33 @@ public class Scan implements Writable {
this.stopRow = stopRow;
}
/**
* Creates a new instance of this class while copying all values.
*
* @param scan The scan instance to copy from.
* @throws IOException When copying the values fails.
*/
public Scan(Scan scan) throws IOException {
startRow = scan.getStartRow();
stopRow = scan.getStopRow();
maxVersions = scan.getMaxVersions();
filter = scan.getFilter(); // clone?
oldFilter = scan.getOldFilter(); // clone?
TimeRange ctr = scan.getTimeRange();
tr = new TimeRange(ctr.getMin(), ctr.getMax());
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
for (byte[] fam : fams.keySet()) {
NavigableSet<byte[]> cols = fams.get(fam);
if (cols != null && cols.size() > 0) {
for (byte[] col : cols) {
addColumn(fam, col);
}
} else {
addFamily(fam);
}
}
}
/**
* Get all columns from the specified family.
* <p>
@ -137,26 +164,89 @@ public class Scan implements Writable {
return this;
}
/**
* Parses a combined family and qualifier and adds either both or just the
* family in case there is not qualifier. This assumes the older colon
* divided notation, e.g. "data:contents" or "meta:".
* <p>
* Note: It will through an error when the colon is missing.
*
* @param familyAndQualifier
* @return A reference to this instance.
* @throws IllegalArgumentException When the colon is missing.
*/
public Scan addColumn(byte[] familyAndQualifier) {
byte[][] fq = KeyValue.parseColumn(familyAndQualifier);
if (fq[1].length > 0) {
addColumn(fq[0], fq[1]);
} else {
addFamily(fq[0]);
}
return this;
}
/**
* Adds an array of columns specified the old format, family:qualifier.
* <p>
* Overrides previous calls to addFamily for any families in the input.
*
* @param columns array of columns, formatted as <pre>family:qualifier</pre>
*/
public Scan addColumns(byte [][] columns) {
for(int i=0; i<columns.length; i++) {
try {
byte [][] split = KeyValue.parseColumn(columns[i]);
if (split[1].length == 0) {
addFamily(split[0]);
} else {
addColumn(split[0], split[1]);
}
} catch(Exception e) {}
addColumn(columns[i]);
}
return this;
}
/**
* Convenience method to help parse old style (or rather user entry on the
* command line) column definitions, e.g. "data:contents mime:". The columns
* must be space delimited and always have a colon (":") to denote family
* and qualifier.
*
* @param columns The columns to parse.
* @return A reference to this instance.
*/
public Scan addColumns(String columns) {
String[] cols = columns.split(" ");
for (String col : cols) {
addColumn(Bytes.toBytes(col));
}
return this;
}
/**
* Helps to convert the binary column families and qualifiers to a text
* representation, e.g. "data:mimetype data:contents meta:". Binary values
* are properly encoded using {@link Bytes#toBytesBinary(String)}.
*
* @return The columns in an old style string format.
*/
public String getInputColumns() {
String cols = "";
for (Map.Entry<byte[], NavigableSet<byte[]>> e :
familyMap.entrySet()) {
byte[] fam = e.getKey();
if (cols.length() > 0) cols += " ";
NavigableSet<byte[]> quals = e.getValue();
// check if this family has qualifiers
if (quals != null && quals.size() > 0) {
String cs = "";
for (byte[] qual : quals) {
if (cs.length() > 0) cs += " ";
// encode values to make parsing easier later
cs += Bytes.toStringBinary(fam) + ":" + Bytes.toStringBinary(qual);
}
cols += cs;
} else {
// only add the family but with old style delimiter
cols += Bytes.toStringBinary(fam) + ":";
}
}
return cols;
}
/**
* Get versions of columns only within the specified timestamp range,

View File

@ -27,9 +27,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Example table column indexing class. Runs a mapreduce job to index
@ -51,32 +52,36 @@ import org.apache.hadoop.mapred.JobConf;
* </ul>
*/
public class BuildTableIndex {
private static final String USAGE = "Usage: BuildTableIndex " +
"-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
"-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
"-r <numReduceTasks> -indexConf <iconfFile>\n" +
"-indexDir <indexDir> -table <tableName>\n -columns <columnName1> " +
"[<columnName2> ...]";
/**
* Prints the usage message and exists the program.
*
* @param message The message to print first.
*/
private static void printUsage(String message) {
System.err.println(message);
System.err.println(USAGE);
System.exit(-1);
}
/** default constructor */
public BuildTableIndex() {
super();
}
/**
* @param args
* @throws IOException
* Creates a new job.
* @param conf
*
* @param args The command line arguments.
* @throws IOException When reading the configuration fails.
*/
public void run(String[] args) throws IOException {
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
if (args.length < 6) {
printUsage("Too few arguments");
}
int numMapTasks = 1;
int numReduceTasks = 1;
String iconfFile = null;
String indexDir = null;
@ -85,9 +90,7 @@ public class BuildTableIndex {
// parse args
for (int i = 0; i < args.length - 1; i++) {
if ("-m".equals(args[i])) {
numMapTasks = Integer.parseInt(args[++i]);
} else if ("-r".equals(args[i])) {
if ("-r".equals(args[i])) {
numReduceTasks = Integer.parseInt(args[++i]);
} else if ("-indexConf".equals(args[i])) {
iconfFile = args[++i];
@ -111,7 +114,6 @@ public class BuildTableIndex {
"be specified");
}
Configuration conf = new HBaseConfiguration();
if (iconfFile != null) {
// set index configuration content from a file
String content = readContent(iconfFile);
@ -121,52 +123,31 @@ public class BuildTableIndex {
conf.set("hbase.index.conf", content);
}
if (columnNames != null) {
JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
tableName, columnNames.toString());
JobClient.runJob(jobConf);
}
Job job = new Job(conf, "build index for table " + tableName);
// number of indexes to partition into
job.setNumReduceTasks(numReduceTasks);
Scan scan = new Scan();
scan.addColumns(columnNames.toString());
// use identity map (a waste, but just as an example)
IdentityTableMapper.initJob(tableName, scan,
IdentityTableMapper.class, job);
// use IndexTableReduce to build a Lucene index
job.setReducerClass(IndexTableReducer.class);
FileOutputFormat.setOutputPath(job, new Path(indexDir));
job.setOutputFormatClass(IndexOutputFormat.class);
return job;
}
/**
* @param conf
* @param numMapTasks
* @param numReduceTasks
* @param indexDir
* @param tableName
* @param columnNames
* @return JobConf
*/
public JobConf createJob(Configuration conf, int numMapTasks,
int numReduceTasks, String indexDir, String tableName,
String columnNames) {
JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
jobConf.setJobName("build index for table " + tableName);
jobConf.setNumMapTasks(numMapTasks);
// number of indexes to partition into
jobConf.setNumReduceTasks(numReduceTasks);
// use identity map (a waste, but just as an example)
IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
jobConf);
// use IndexTableReduce to build a Lucene index
jobConf.setReducerClass(IndexTableReduce.class);
FileOutputFormat.setOutputPath(jobConf, new Path(indexDir));
jobConf.setOutputFormat(IndexOutputFormat.class);
return jobConf;
}
/*
* Read xml file of indexing configurations. The xml format is similar to
* Reads xml file of indexing configurations. The xml format is similar to
* hbase-default.xml and hadoop-default.xml. For an example configuration,
* see the <code>createIndexConfContent</code> method in TestTableIndex
* @param fileName File to read.
* @return XML configuration read from file
* @throws IOException
* see the <code>createIndexConfContent</code> method in TestTableIndex.
*
* @param fileName The file to read.
* @return XML configuration read from file.
* @throws IOException When the XML is broken.
*/
private String readContent(String fileName) throws IOException {
private static String readContent(String fileName) throws IOException {
File file = new File(fileName);
int length = (int) file.length();
if (length == 0) {
@ -195,11 +176,17 @@ public class BuildTableIndex {
}
/**
* @param args
* @throws IOException
* The main entry point.
*
* @param args The command line arguments.
* @throws Exception When running the job fails.
*/
public static void main(String[] args) throws IOException {
BuildTableIndex build = new BuildTableIndex();
build.run(args);
public static void main(String[] args) throws Exception {
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -1,158 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Extract grouping columns from input record
*/
public class GroupingTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable,Result> {
/**
* JobConf parameter to specify the columns used to produce the key passed to
* collect from the map phase
*/
public static final String GROUP_COLUMNS =
"hbase.mapred.groupingtablemap.columns";
protected byte [][] m_columns;
/**
* Use this before submitting a TableMap job. It will appropriately set up the
* JobConf.
*
* @param table table to be processed
* @param columns space separated list of columns to fetch
* @param groupColumns space separated list of columns used to form the key
* used in collect
* @param mapper map class
* @param job job configuration object
*/
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns, String groupColumns,
Class<? extends TableMap> mapper, JobConf job) {
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class, Result.class, job);
job.set(GROUP_COLUMNS, groupColumns);
}
@Override
public void configure(JobConf job) {
super.configure(job);
String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
m_columns = new byte[cols.length][];
for(int i = 0; i < cols.length; i++) {
m_columns[i] = Bytes.toBytes(cols[i]);
}
}
/**
* Extract the grouping columns from value to construct a new key.
*
* Pass the new key and value to reduce.
* If any of the grouping columns are not found in the value, the record is skipped.
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable,Result> output,
Reporter reporter) throws IOException {
byte[][] keyVals = extractKeyValues(value);
if(keyVals != null) {
ImmutableBytesWritable tKey = createGroupKey(keyVals);
output.collect(tKey, value);
}
}
/**
* Extract columns values from the current record. This method returns
* null if any of the columns are not found.
*
* Override this method if you want to deal with nulls differently.
*
* @param r
* @return array of byte values
*/
protected byte[][] extractKeyValues(Result r) {
byte[][] keyVals = null;
ArrayList<byte[]> foundList = new ArrayList<byte[]>();
int numCols = m_columns.length;
if (numCols > 0) {
for (KeyValue value: r.list()) {
byte [] column = value.getColumn();
for (int i = 0; i < numCols; i++) {
if (Bytes.equals(column, m_columns[i])) {
foundList.add(value.getValue());
break;
}
}
}
if(foundList.size() == numCols) {
keyVals = foundList.toArray(new byte[numCols][]);
}
}
return keyVals;
}
/**
* Create a key by concatenating multiple column values.
* Override this function in order to produce different types of keys.
*
* @param vals
* @return key generated by concatenating multiple column values
*/
protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
if(vals == null) {
return null;
}
StringBuilder sb = new StringBuilder();
for(int i = 0; i < vals.length; i++) {
if(i > 0) {
sb.append(" ");
}
try {
sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
}
}

View File

@ -23,44 +23,47 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* This is used to partition the output keys into groups of keys.
* Keys are grouped according to the regions that currently exist
* so that each reducer fills a single region so load is distributed.
*
* @param <K2>
* @param <V2>
* @param <KEY> The type of the key.
* @param <VALUE> The type of the value.
*/
public class HRegionPartitioner<K2,V2>
implements Partitioner<ImmutableBytesWritable, V2> {
public class HRegionPartitioner<KEY, VALUE>
extends Partitioner<ImmutableBytesWritable, VALUE>
implements Configurable {
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
private Configuration conf = null;
private HTable table;
private byte[][] startKeys;
public void configure(JobConf job) {
try {
this.table = new HTable(new HBaseConfiguration(job),
job.get(TableOutputFormat.OUTPUT_TABLE));
} catch (IOException e) {
LOG.error(e);
}
try {
this.startKeys = this.table.getStartKeys();
} catch (IOException e) {
LOG.error(e);
}
}
/**
* Gets the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key The key to be partitioned.
* @param value The entry value.
* @param numPartitions The total number of partitions.
* @return The partition number for the <code>key</code>.
* @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
* java.lang.Object, java.lang.Object, int)
*/
@Override
public int getPartition(ImmutableBytesWritable key,
V2 value, int numPartitions) {
VALUE value, int numPartitions) {
byte[] region = null;
// Only one region return 0
if (this.startKeys.length == 1){
@ -86,4 +89,39 @@ implements Partitioner<ImmutableBytesWritable, V2> {
// if above fails to find start key that match we need to return something
return 0;
}
/**
* Returns the current configuration.
*
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Sets the configuration. This is used to determine the start keys for the
* given table.
*
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
try {
this.table = new HTable(new HBaseConfiguration(conf),
configuration.get(TableOutputFormat.OUTPUT_TABLE));
} catch (IOException e) {
LOG.error(e);
}
try {
this.startKeys = this.table.getStartKeys();
} catch (IOException e) {
LOG.error(e);
}
}
}

View File

@ -1,74 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Pass the given key and record as-is to reduce
*/
public class IdentityTableMap
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, Result> {
/** constructor */
public IdentityTableMap() {
super();
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table table name
* @param columns columns to scan
* @param mapper mapper class
* @param job job configuration
*/
@SuppressWarnings("unchecked")
public static void initJob(String table, String columns,
Class<? extends TableMap> mapper, JobConf job) {
TableMapReduceUtil.initTableMapJob(table, columns, mapper,
ImmutableBytesWritable.class, Result.class, job);
}
/**
* Pass the key, value to reduce
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public void map(ImmutableBytesWritable key, Result value,
OutputCollector<ImmutableBytesWritable,Result> output,
Reporter reporter) throws IOException {
// convert
output.collect(key, value);
}
}

View File

@ -1,59 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Write to table each key, record pair
*/
public class IdentityTableReduce
extends MapReduceBase
implements TableReduce<ImmutableBytesWritable, Put> {
@SuppressWarnings("unused")
private static final Log LOG =
LogFactory.getLog(IdentityTableReduce.class.getName());
/**
* No aggregation, output pairs of (key, record)
* @param key
* @param values
* @param output
* @param reporter
* @throws IOException
*/
public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
throws IOException {
while(values.hasNext()) {
output.collect(key, values.next());
}
}
}

View File

@ -44,9 +44,10 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
/**
* Configuration parameters for building a Lucene index
* Configuration parameters for building a Lucene index.
*/
public class IndexConfiguration extends Configuration {
private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
static final String HBASE_COLUMN_NAME = "hbase.column.name";

View File

@ -27,13 +27,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Similarity;
@ -41,29 +38,40 @@ import org.apache.lucene.search.Similarity;
* Create a local index, unwrap Lucene documents created by reduce, add them to
* the index, and copy the index to the destination.
*/
public class IndexOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
public class IndexOutputFormat
extends FileOutputFormat<ImmutableBytesWritable, LuceneDocumentWrapper> {
static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
/** Random generator. */
private Random random = new Random();
/**
* Returns the record writer.
*
* @param context The current task context.
* @return The record writer.
* @throws IOException When there is an issue with the writer.
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>
getRecordWriter(final FileSystem fs, JobConf job, String name,
final Progressable progress)
getRecordWriter(TaskAttemptContext context)
throws IOException {
final Path perm = new Path(FileOutputFormat.getOutputPath(job), name);
final Path temp = job.getLocalPath("index/_"
+ Integer.toString(random.nextInt()));
final Path perm = new Path(FileOutputFormat.getOutputPath(context),
FileOutputFormat.getUniqueFile(context, "part", ""));
// null for "dirsProp" means no predefined directories
final Path temp = context.getConfiguration().getLocalPath(
"mapred.local.dir", "index/_" + Integer.toString(random.nextInt()));
LOG.info("To index into " + perm);
FileSystem fs = FileSystem.get(context.getConfiguration());
// delete old, if any
fs.delete(perm, true);
final IndexConfiguration indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
String content = context.getConfiguration().get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
@ -99,65 +107,7 @@ public class IndexOutputFormat extends
}
}
writer.setUseCompoundFile(indexConf.isUseCompoundFile());
return new RecordWriter<ImmutableBytesWritable, LuceneDocumentWrapper>() {
boolean closed;
private long docCount = 0;
public void write(ImmutableBytesWritable key,
LuceneDocumentWrapper value)
throws IOException {
// unwrap and index doc
Document doc = value.get();
writer.addDocument(doc);
docCount++;
progress.progress();
}
public void close(final Reporter reporter) throws IOException {
// spawn a thread to give progress heartbeats
Thread prog = new Thread() {
@Override
public void run() {
while (!closed) {
try {
reporter.setStatus("closing");
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
} catch (Throwable e) {
return;
}
}
}
};
try {
prog.start();
// optimize index
if (indexConf.doOptimize()) {
if (LOG.isInfoEnabled()) {
LOG.info("Optimizing index.");
}
writer.optimize();
}
// close index
writer.close();
if (LOG.isInfoEnabled()) {
LOG.info("Done indexing " + docCount + " docs.");
}
// copy to perm destination in dfs
fs.completeLocalOutput(perm, temp);
if (LOG.isInfoEnabled()) {
LOG.info("Copy done.");
}
} finally {
closed = true;
}
}
};
return new IndexRecordWriter(context, fs, writer, indexConf, perm, temp);
}
}

View File

@ -1,109 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Construct a Lucene document per row, which is consumed by IndexOutputFormat
* to build a Lucene index
*/
public class IndexTableReduce extends MapReduceBase implements
Reducer<ImmutableBytesWritable, Result, ImmutableBytesWritable, LuceneDocumentWrapper> {
private static final Log LOG = LogFactory.getLog(IndexTableReduce.class);
private IndexConfiguration indexConf;
@Override
public void configure(JobConf job) {
super.configure(job);
indexConf = new IndexConfiguration();
String content = job.get("hbase.index.conf");
if (content != null) {
indexConf.addFromXML(content);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Index conf: " + indexConf);
}
}
@Override
public void close() throws IOException {
super.close();
}
public void reduce(ImmutableBytesWritable key, Iterator<Result> values,
OutputCollector<ImmutableBytesWritable, LuceneDocumentWrapper> output,
Reporter reporter)
throws IOException {
if (!values.hasNext()) {
return;
}
Document doc = new Document();
// index and store row key, row key already UTF-8 encoded
Field keyField = new Field(indexConf.getRowkeyName(),
Bytes.toString(key.get(), key.getOffset(), key.getLength()),
Field.Store.YES, Field.Index.UN_TOKENIZED);
keyField.setOmitNorms(true);
doc.add(keyField);
while (values.hasNext()) {
Result value = values.next();
// each column (name-value pair) is a field (name-value pair)
for (KeyValue kv: value.list()) {
// name is already UTF-8 encoded
String column = Bytes.toString(kv.getColumn());
byte[] columnValue = kv.getValue();
Field.Store store = indexConf.isStore(column)?
Field.Store.YES: Field.Store.NO;
Field.Index index = indexConf.isIndex(column)?
(indexConf.isTokenize(column)?
Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
Field.Index.NO;
// UTF-8 encode value
Field field = new Field(column, Bytes.toString(columnValue),
store, index);
field.setBoost(indexConf.getBoost(column));
field.setOmitNorms(indexConf.isOmitNorms(column));
doc.add(field);
}
}
output.collect(key, new LuceneDocumentWrapper(doc));
}
}

View File

@ -29,6 +29,8 @@ import org.apache.lucene.document.Document;
* It doesn't really serialize/deserialize a lucene document.
*/
public class LuceneDocumentWrapper implements Writable {
/** The document to add to the index. */
protected Document doc;
/**

View File

@ -21,73 +21,71 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* A job with a map to count rows.
* Map outputs table rows IF the input row has columns that have content.
* Uses an {@link IdentityReducer}
* A job with a just a map phase to count rows. Map outputs table rows IF the
* input row has columns that have content.
*/
public class RowCounter extends Configured implements Tool {
// Name of this 'program'
public class RowCounter {
/** Name of this 'program'. */
static final String NAME = "rowcounter";
/**
* Mapper that runs the count.
*/
static class RowCounterMapper
implements TableMap<ImmutableBytesWritable, Result> {
private static enum Counters {ROWS}
extends TableMapper<ImmutableBytesWritable, Result> {
/** Counter enumeration to count the actual rows. */
private static enum Counters { ROWS }
/**
* Maps the data.
*
* @param row The current table row key.
* @param values The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
* org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(ImmutableBytesWritable row, Result values,
OutputCollector<ImmutableBytesWritable, Result> output,
Reporter reporter)
Context context)
throws IOException {
boolean content = !values.list().isEmpty();
for (KeyValue value: values.list()) {
if (value.getValue().length > 0) {
content = true;
context.getCounter(Counters.ROWS).increment(1);
break;
}
}
if (!content) {
// Don't count rows that are all empty values.
return;
}
// Give out same value every time. We're only interested in the row/key
reporter.incrCounter(Counters.ROWS, 1);
}
public void configure(JobConf jc) {
// Nothing to do.
}
public void close() throws IOException {
// Nothing to do.
}
}
/**
* @param args
* @return the JobConf
* @throws IOException
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public JobConf createSubmittableJob(String[] args) throws IOException {
JobConf c = new JobConf(getConf(), getClass());
c.setJobName(NAME);
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
Job job = new Job(conf, NAME);
job.setJarByClass(RowCounter.class);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 2;
@ -97,38 +95,34 @@ public class RowCounter extends Configured implements Tool {
}
sb.append(args[i]);
}
Scan scan = new Scan();
scan.addColumns(sb.toString());
// Second argument is the table name.
TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
c.setNumReduceTasks(0);
// First arg is the output directory.
FileOutputFormat.setOutputPath(c, new Path(args[0]));
return c;
}
static int printUsage() {
System.out.println(NAME +
" <outputdir> <tablename> <column1> [<column2>...]");
return -1;
}
public int run(final String[] args) throws Exception {
// Make sure there are at least 3 parameters
if (args.length < 3) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
TableMapReduceUtil.initTableMapperJob(args[1], scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
job.setNumReduceTasks(0);
// first argument is the output directory.
FileOutputFormat.setOutputPath(job, new Path(args[0]));
return job;
}
/**
* @param args
* @throws Exception
* Main entry point.
*
* @param args The command line parameters.
* @throws Exception When running the job fails.
*/
public static void main(String[] args) throws Exception {
HBaseConfiguration c = new HBaseConfiguration();
int errCode = ToolRunner.run(c, new RowCounter(), args);
System.exit(errCode);
HBaseConfiguration conf = new HBaseConfiguration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 3) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
System.err.println("Usage: " + NAME +
" <outputdir> <tablename> <column1> [<column2>...]");
System.exit(-1);
}
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

View File

@ -23,60 +23,64 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.util.StringUtils;
/**
* Convert HBase tabular data into a format that is consumable by Map/Reduce.
*/
public class TableInputFormat extends TableInputFormatBase implements
JobConfigurable {
public class TableInputFormat extends TableInputFormatBase
implements Configurable {
private final Log LOG = LogFactory.getLog(TableInputFormat.class);
/** Job parameter that specifies the output table. */
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
/** Space delimited list of columns. */
public static final String SCAN = "hbase.mapreduce.scan";
/** The configuration. */
private Configuration conf = null;
/**
* space delimited list of columns
* Returns the current configuration.
*
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
*/
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
@Override
public Configuration getConf() {
return conf;
}
public void configure(JobConf job) {
Path[] tableNames = FileInputFormat.getInputPaths(job);
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
byte [][] m_cols = new byte[colNames.length][];
for (int i = 0; i < m_cols.length; i++) {
m_cols[i] = Bytes.toBytes(colNames[i]);
}
setInputColumns(m_cols);
/**
* Sets the configuration. This is used to set the details for the table to
* be scanned.
*
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
*/
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
String tableName = conf.get(INPUT_TABLE);
try {
setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
setHTable(new HTable(new HBaseConfiguration(conf), tableName));
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
Scan scan = null;
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
} catch (IOException e) {
LOG.error("An error occurred.", e);
}
setScan(scan);
}
public void validateInput(JobConf job) throws IOException {
// expecting exactly one path
Path [] tableNames = FileInputFormat.getInputPaths(job);
if (tableNames == null || tableNames.length > 1) {
throw new IOException("expecting one table name");
}
// connected to table?
if (getHTable() == null) {
throw new IOException("could not connect to table '" +
tableNames[0].getName() + "'");
}
// expecting at least one column
String colArg = job.get(COLUMN_LIST);
if (colArg == null || colArg.length() == 0) {
throw new IOException("expecting at least one column");
}
}
}

View File

@ -20,8 +20,8 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,23 +31,19 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.StopRowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils;
/**
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
* byte[] of input columns and optionally a {@link RowFilterInterface}.
* Subclasses may use other TableRecordReader implementations.
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
* {@link Scan} instance that defines the input columns etc. Subclasses may use
* other TableRecordReader implementations.
* <p>
* An example of a subclass:
* <pre>
@ -72,155 +68,146 @@ import org.apache.hadoop.util.StringUtils;
* }
* </pre>
*/
public abstract class TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {
extends InputFormat<ImmutableBytesWritable, Result> {
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
private byte [][] inputColumns;
private HTable table;
private TableRecordReader tableRecordReader;
private RowFilterInterface rowFilter;
/** Holds the details for the internal scanner. */
private Scan scan = null;
/** The table to scan. */
private HTable table = null;
/** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null;
/**
* Iterate over an HBase table data, return (Text, RowResult) pairs
* Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
* pairs.
*/
protected class TableRecordReader
implements RecordReader<ImmutableBytesWritable, Result> {
private byte [] startRow;
private byte [] endRow;
private byte [] lastRow;
private RowFilterInterface trrRowFilter;
private ResultScanner scanner;
private HTable htable;
private byte [][] trrInputColumns;
extends RecordReader<ImmutableBytesWritable, Result> {
private ResultScanner scanner = null;
private Scan scan = null;
private HTable htable = null;
private byte[] lastRow = null;
private ImmutableBytesWritable key = null;
private Result value = null;
/**
* Restart from survivable exceptions by creating a new scanner.
*
* @param firstRow
* @throws IOException
* @param firstRow The first row to start at.
* @throws IOException When restarting fails.
*/
public void restart(byte[] firstRow) throws IOException {
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
final Set<RowFilterInterface> rowFiltersSet =
new HashSet<RowFilterInterface>();
rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
rowFiltersSet.add(trrRowFilter);
Scan scan = new Scan(startRow);
scan.addColumns(trrInputColumns);
// scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
// rowFiltersSet));
this.scanner = this.htable.getScanner(scan);
} else {
Scan scan = new Scan(firstRow, endRow);
scan.addColumns(trrInputColumns);
this.scanner = this.htable.getScanner(scan);
}
} else {
Scan scan = new Scan(firstRow);
scan.addColumns(trrInputColumns);
// scan.setFilter(trrRowFilter);
this.scanner = this.htable.getScanner(scan);
}
Scan newScan = new Scan(scan);
newScan.setStartRow(firstRow);
this.scanner = this.htable.getScanner(newScan);
}
/**
* Build the scanner. Not done in constructor to allow for extension.
*
* @throws IOException
* @throws IOException When restarting the scan fails.
*/
public void init() throws IOException {
restart(startRow);
restart(scan.getStartRow());
}
/**
* @param htable the {@link HTable} to scan.
* Sets the HBase table.
*
* @param htable The {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
this.htable = htable;
}
/**
* @param inputColumns the columns to be placed in {@link Result}.
* Sets the scan defining the actual details like columns etc.
*
* @param scan The scan to set.
*/
public void setInputColumns(final byte [][] inputColumns) {
this.trrInputColumns = inputColumns;
public void setScan(Scan scan) {
this.scan = scan;
}
/**
* @param startRow the first row in the split
* Closes the split.
*
* @see org.apache.hadoop.mapreduce.RecordReader#close()
*/
public void setStartRow(final byte [] startRow) {
this.startRow = startRow;
}
/**
*
* @param endRow the last row in the split
*/
public void setEndRow(final byte [] endRow) {
this.endRow = endRow;
}
/**
* @param rowFilter the {@link RowFilterInterface} to be used.
*/
public void setRowFilter(RowFilterInterface rowFilter) {
this.trrRowFilter = rowFilter;
}
@Override
public void close() {
this.scanner.close();
}
/**
* @return ImmutableBytesWritable
*
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
public ImmutableBytesWritable createKey() {
return new ImmutableBytesWritable();
}
/**
* @return RowResult
*
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
public Result createValue() {
return new Result();
}
public long getPos() {
// This should be the ordinal tuple in the range;
// not clear how to calculate...
return 0;
}
public float getProgress() {
// Depends on the total number of tuples and getPos
return 0;
}
/**
* @param key HStoreKey as input key.
* @param value MapWritable as input value
* @return true if there was more data
* Returns the current key.
*
* @return The current key.
* @throws IOException
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
*/
public boolean next(ImmutableBytesWritable key, Result value)
throws IOException {
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
/**
* Returns the current value.
*
* @return The current value.
* @throws IOException When the value is faulty.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
*/
@Override
public Result getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* Initializes the reader.
*
* @param inputsplit The split to work with.
* @param context The current task context.
* @throws IOException When setting up the reader fails.
* @throws InterruptedException When the job is aborted.
* @see org.apache.hadoop.mapreduce.RecordReader#initialize(
* org.apache.hadoop.mapreduce.InputSplit,
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public void initialize(InputSplit inputsplit,
TaskAttemptContext context) throws IOException,
InterruptedException {
}
/**
* Positions the record reader to the next record.
*
* @return <code>true</code> if there was another record.
* @throws IOException When reading the record failed.
* @throws InterruptedException When the job was aborted.
* @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) key = new ImmutableBytesWritable();
if (value == null) value = new Result();
Result result;
try {
result = this.scanner.next();
} catch (UnknownScannerException e) {
LOG.debug("recovered from " + StringUtils.stringifyException(e));
restart(lastRow);
this.scanner.next(); // skip presumed already mapped row
result = this.scanner.next();
scanner.next(); // skip presumed already mapped row
result = scanner.next();
}
if (result != null && result.size() > 0) {
key.set(result.getRow());
lastRow = key.get();
@ -229,17 +216,35 @@ implements InputFormat<ImmutableBytesWritable, Result> {
}
return false;
}
/**
* The current progress of the record reader through its data.
*
* @return A number between 0.0 and 1.0, the fraction of the data read.
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
*/
@Override
public float getProgress() {
// Depends on the total number of tuples
return 0;
}
}
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
*
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*
* @param split The split to work with.
* @param context The current context.
* @return The newly created record reader.
* @throws IOException When creating the reader fails.
* @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
* org.apache.hadoop.mapreduce.InputSplit,
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
@Override
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
TableSplit tSplit = (TableSplit) split;
TableRecordReader trr = this.tableRecordReader;
@ -247,45 +252,38 @@ implements InputFormat<ImmutableBytesWritable, Result> {
if (trr == null) {
trr = new TableRecordReader();
}
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
Scan sc = new Scan(scan);
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
trr.setHTable(table);
trr.init();
return trr;
}
/**
* Calculates the splits that will serve as input for the map tasks.
* <ul>
* Splits are created in number equal to the smallest between numSplits and
* the number of {@link HRegion}s in the table. If the number of splits is
* smaller than the number of {@link HRegion}s then splits are spanned across
* multiple {@link HRegion}s and are grouped the most evenly possible. In the
* case splits are uneven the bigger splits are placed first in the
* {@link InputSplit} array.
* Calculates the splits that will serve as input for the map tasks. The
* number of splits matches the number of regions in a table.
*
* @param job the map task {@link JobConf}
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
*
* @return the input splits
*
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
* @param context The current job context.
* @return The list of input splits.
* @throws IOException When creating the list of splits fails.
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
* org.apache.hadoop.mapreduce.JobContext)
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
byte [][] startKeys = this.table.getStartKeys();
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
byte [][] startKeys = table.getStartKeys();
if (startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
throw new IOException("Expecting at least one region.");
}
if (this.table == null) {
throw new IOException("No table was provided");
if (table == null) {
throw new IOException("No table was provided.");
}
if (this.inputColumns == null || this.inputColumns.length == 0) {
throw new IOException("Expecting at least one column");
if (!scan.hasFamilies()) {
throw new IOException("Expecting at least one column.");
}
int realNumSplits = numSplits > startKeys.length? startKeys.length:
numSplits;
int realNumSplits = startKeys.length;
InputSplit[] splits = new InputSplit[realNumSplits];
int middle = startKeys.length / realNumSplits;
int startPos = 0;
@ -300,14 +298,7 @@ implements InputFormat<ImmutableBytesWritable, Result> {
LOG.info("split: " + i + "->" + splits[i]);
startPos = lastPos;
}
return splits;
}
/**
* @param inputColumns to be passed in {@link Result} to the map task.
*/
protected void setInputColumns(byte [][] inputColumns) {
this.inputColumns = inputColumns;
return Arrays.asList(splits);
}
/**
@ -320,28 +311,39 @@ implements InputFormat<ImmutableBytesWritable, Result> {
/**
* Allows subclasses to set the {@link HTable}.
*
* @param table to get the data from
* @param table The table to get the data from.
*/
protected void setHTable(HTable table) {
this.table = table;
}
/**
* Gets the scan defining the actual details like columns etc.
*
* @return The internal scan instance.
*/
public Scan getScan() {
if (scan == null) scan = new Scan();
return scan;
}
/**
* Sets the scan defining the actual details like columns etc.
*
* @param scan The scan to set.
*/
public void setScan(Scan scan) {
this.scan = scan;
}
/**
* Allows subclasses to set the {@link TableRecordReader}.
*
* @param tableRecordReader
* to provide other {@link TableRecordReader} implementations.
* @param tableRecordReader A different {@link TableRecordReader}
* implementation.
*/
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
/**
* Allows subclasses to set the {@link RowFilterInterface} to be used.
*
* @param rowFilter
*/
protected void setRowFilter(RowFilterInterface rowFilter) {
this.rowFilter = rowFilter;
}
}

View File

@ -1,38 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Mapper;
/**
* Scan an HBase table to sort by a specified sort column.
* If the column does not exist, the record is not passed to Reduce.
*
* @param <K> WritableComparable key class
* @param <V> Writable value class
*/
public interface TableMap<K extends WritableComparable<K>, V extends Writable>
extends Mapper<ImmutableBytesWritable, Result, K, V> {
}

View File

@ -19,45 +19,80 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
/**
* Utility for {@link TableMap} and {@link TableReduce}
* Utility for {@link TableMapper} and {@link TableReducer}
*/
@SuppressWarnings("unchecked")
public class TableMapReduceUtil {
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @throws IOException When setting up the details fails.
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, JobConf job) {
job.setInputFormat(TableInputFormat.class);
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<? extends WritableComparable> outputKeyClass,
Class<? extends Writable> outputValueClass, Job job) throws IOException {
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
job.getConfiguration().set(TableInputFormat.SCAN,
convertScanToString(scan));
}
/**
* Writes the given scan into a Base64 encoded string.
*
* @param scan The scan to write out.
* @return The scan saved in a Base64 encoded string.
* @throws IOException When writing the scan fails.
*/
static String convertScanToString(Scan scan) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(out);
scan.write(dos);
return Base64.encodeBytes(out.toByteArray());
}
/**
* Converts the given Base64 string back into a Scan instance.
*
* @param base64 The scan details.
* @return The newly created Scan instance.
* @throws IOException When reading the scan instance fails.
*/
static Scan convertStringToScan(String base64) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
DataInputStream dis = new DataInputStream(bis);
Scan scan = new Scan();
scan.readFields(dis);
return scan;
}
/**
@ -66,13 +101,13 @@ public class TableMapReduceUtil {
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job)
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job)
throws IOException {
initTableReduceJob(table, reducer, job, null);
initTableReducerJob(table, reducer, job, null);
}
/**
@ -81,22 +116,23 @@ public class TableMapReduceUtil {
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job, Class partitioner)
throws IOException {
job.setOutputFormat(TableOutputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
HTable outputTable = new HTable(new HBaseConfiguration(
job.getConfiguration()), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(outputTable.getRegionsInfo().size());
@ -111,73 +147,45 @@ public class TableMapReduceUtil {
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void limitNumReduceTasks(String table, JobConf job)
public static void limitNumReduceTasks(String table, Job job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
HTable outputTable = new HTable(new HBaseConfiguration(
job.getConfiguration()), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumReduceTasks() > regions)
job.setNumReduceTasks(regions);
}
/**
* Ensures that the given number of map tasks for the given job
* configuration does not exceed the number of regions for the given table.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void limitNumMapTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
if (job.getNumMapTasks() > regions)
job.setNumMapTasks(regions);
}
/**
* Sets the number of reduce tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumReduceTasks(String table, JobConf job)
public static void setNumReduceTasks(String table, Job job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
HTable outputTable = new HTable(new HBaseConfiguration(
job.getConfiguration()), table);
int regions = outputTable.getRegionsInfo().size();
job.setNumReduceTasks(regions);
}
/**
* Sets the number of map tasks for the given job configuration to the
* number of regions the given table has.
*
* @param table The table to get the region count for.
* @param job The current job configuration to adjust.
* @throws IOException When retrieving the table details fails.
*/
public static void setNumMapTasks(String table, JobConf job)
throws IOException {
HTable outputTable = new HTable(new HBaseConfiguration(job), table);
int regions = outputTable.getRegionsInfo().size();
job.setNumMapTasks(regions);
}
/**
* Sets the number of rows to return and cache with each scanner iteration.
* Higher caching values will enable faster mapreduce jobs at the expense of
* requiring more heap to contain the cached rows.
*
* @param job The current job configuration to adjust.
* @param job The current job to adjust.
* @param batchSize The number of rows to return in batch with each scanner
* iteration.
*/
public static void setScannerCaching(JobConf job, int batchSize) {
job.setInt("hbase.client.scanner.caching", batchSize);
public static void setScannerCaching(Job job, int batchSize) {
job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
}
}

View File

@ -23,68 +23,89 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Convert Map/Reduce output and write it to an HBase table
* Convert Map/Reduce output and write it to an HBase table.
*/
public class TableOutputFormat extends
FileOutputFormat<ImmutableBytesWritable, Put> {
/** JobConf parameter that specifies the output table */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
/**
* Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
* and write to an HBase table
*/
protected static class TableRecordWriter
implements RecordWriter<ImmutableBytesWritable, Put> {
private HTable m_table;
extends RecordWriter<ImmutableBytesWritable, Put> {
/** The table to write to. */
private HTable table;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
*
* @param table
* @param table The table to write to.
*/
public TableRecordWriter(HTable table) {
m_table = table;
this.table = table;
}
public void close(Reporter reporter)
throws IOException {
m_table.flushCommits();
/**
* Closes the writer, in this case flush table commits.
*
* @param context The context.
* @throws IOException When closing the writer fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public void close(TaskAttemptContext context)
throws IOException {
table.flushCommits();
}
public void write(ImmutableBytesWritable key,
Put value) throws IOException {
m_table.put(new Put(value));
/**
* Writes a key/value pair into the table.
*
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
* @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
*/
@Override
public void write(ImmutableBytesWritable key, Put value)
throws IOException {
table.put(new Put(value));
}
}
@Override
@SuppressWarnings("unchecked")
public RecordWriter getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress) throws IOException {
/**
* Creates a new record writer.
*
* @param context The current task context.
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
* @throws InterruptedException When the jobs is cancelled.
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
// expecting exactly one path
String tableName = job.get(OUTPUT_TABLE);
String tableName = context.getConfiguration().get(OUTPUT_TABLE);
HTable table = null;
try {
table = new HTable(new HBaseConfiguration(job), tableName);
table = new HTable(new HBaseConfiguration(context.getConfiguration()),
tableName);
} catch(IOException e) {
LOG.error(e);
throw e;
@ -92,14 +113,5 @@ public class TableOutputFormat extends
table.setAutoFlush(false);
return new TableRecordWriter(table);
}
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException, InvalidJobConfException, IOException {
String tableName = job.get(OUTPUT_TABLE);
if(tableName == null) {
throw new IOException("Must specify table name");
}
}
}

View File

@ -1,38 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Reducer;
/**
* Write a table, sorting by the input key
*
* @param <K> key class
* @param <V> value class
*/
@SuppressWarnings("unchecked")
public interface TableReduce<K extends WritableComparable, V extends Writable>
extends Reducer<K, V, ImmutableBytesWritable, Put> {
}

View File

@ -25,88 +25,152 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
* A table split corresponds to a key range [low, high)
* A table split corresponds to a key range (low, high). All references to row
* below refer to the key of the row.
*/
public class TableSplit implements InputSplit, Comparable<TableSplit> {
private byte [] m_tableName;
private byte [] m_startRow;
private byte [] m_endRow;
private String m_regionLocation;
public class TableSplit extends InputSplit
implements Writable, Comparable<TableSplit> {
private byte [] tableName;
private byte [] startRow;
private byte [] endRow;
private String regionLocation;
/** default constructor */
/** Default constructor. */
public TableSplit() {
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, "");
}
/**
* Constructor
* @param tableName
* @param startRow
* @param endRow
* @param location
* Creates a new instance while assigning all variables.
*
* @param tableName The name of the current table.
* @param startRow The start row of the split.
* @param endRow The end row of the split.
* @param location The location of the region.
*/
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
final String location) {
this.m_tableName = tableName;
this.m_startRow = startRow;
this.m_endRow = endRow;
this.m_regionLocation = location;
this.tableName = tableName;
this.startRow = startRow;
this.endRow = endRow;
this.regionLocation = location;
}
/** @return table name */
/**
* Returns the table name.
*
* @return The table name.
*/
public byte [] getTableName() {
return this.m_tableName;
return tableName;
}
/** @return starting row key */
/**
* Returns the start row.
*
* @return The start row.
*/
public byte [] getStartRow() {
return this.m_startRow;
return startRow;
}
/** @return end row key */
/**
* Returns the end row.
*
* @return The end row.
*/
public byte [] getEndRow() {
return this.m_endRow;
return endRow;
}
/** @return the region's hostname */
/**
* Returns the region location.
*
* @return The region's location.
*/
public String getRegionLocation() {
return this.m_regionLocation;
return regionLocation;
}
/**
* Returns the region's location as an array.
*
* @return The array containing the region location.
* @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
*/
@Override
public String[] getLocations() {
return new String[] {this.m_regionLocation};
return new String[] {regionLocation};
}
/**
* Returns the length of the split.
*
* @return The length of the split.
* @see org.apache.hadoop.mapreduce.InputSplit#getLength()
*/
@Override
public long getLength() {
// Not clear how to obtain this... seems to be used only for sorting splits
return 0;
}
/**
* Reads the values of each field.
*
* @param in The input to read from.
* @throws IOException When reading the input fails.
*/
@Override
public void readFields(DataInput in) throws IOException {
this.m_tableName = Bytes.readByteArray(in);
this.m_startRow = Bytes.readByteArray(in);
this.m_endRow = Bytes.readByteArray(in);
this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
tableName = Bytes.readByteArray(in);
startRow = Bytes.readByteArray(in);
endRow = Bytes.readByteArray(in);
regionLocation = Bytes.toString(Bytes.readByteArray(in));
}
/**
* Writes the field values to the output.
*
* @param out The output to write to.
* @throws IOException When writing the values to the output fails.
*/
@Override
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.m_tableName);
Bytes.writeByteArray(out, this.m_startRow);
Bytes.writeByteArray(out, this.m_endRow);
Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
Bytes.writeByteArray(out, tableName);
Bytes.writeByteArray(out, startRow);
Bytes.writeByteArray(out, endRow);
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
}
/**
* Returns the details about this instance as a string.
*
* @return The values of this instance as a string.
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return m_regionLocation + ":" +
Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
return regionLocation + ":" +
Bytes.toStringBinary(startRow) + "," + Bytes.toStringBinary(endRow);
}
public int compareTo(TableSplit o) {
return Bytes.compareTo(getStartRow(), o.getStartRow());
/**
* Compares this split against the given one.
*
* @param split The split to compare to.
* @return The result of the comparison.
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(TableSplit split) {
return Bytes.compareTo(getStartRow(), split.getStartRow());
}
}

View File

@ -75,10 +75,10 @@ directory and the hbase conf into a job jar <code>conf/</code> directory.
<p>HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat},
and data sink, {@link org.apache.hadoop.hbase.mapreduce.TableOutputFormat TableOutputFormat}, for MapReduce jobs.
Writing MapReduce jobs that read or write HBase, you'll probably want to subclass
{@link org.apache.hadoop.hbase.mapreduce.TableMap TableMap} and/or
{@link org.apache.hadoop.hbase.mapreduce.TableReduce TableReduce}. See the do-nothing
pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMap IdentityTableMap} and
{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReduce IdentityTableReduce} for basic usage. For a more
{@link org.apache.hadoop.hbase.mapreduce.TableMapper TableMapper} and/or
{@link org.apache.hadoop.hbase.mapreduce.TableReducer TableReducer}. See the do-nothing
pass-through classes {@link org.apache.hadoop.hbase.mapreduce.IdentityTableMapper IdentityTableMapper} and
{@link org.apache.hadoop.hbase.mapreduce.IdentityTableReducer IdentityTableReducer} for basic usage. For a more
involved example, see {@link org.apache.hadoop.hbase.mapreduce.BuildTableIndex BuildTableIndex}
or review the <code>org.apache.hadoop.hbase.mapreduce.TestTableMapReduce</code> unit test.
</p>