HBASE-3996 Support multiple tables and scanners as input to the mapper in map/reduce jobs (Eran Kutner, Bryan Baugher)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1441708 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fd5a096d3e
commit
968facdd0a
|
@ -96,11 +96,14 @@ public class Scan extends OperationWithAttributes {
|
|||
|
||||
// If application wants to collect scan metrics, it needs to
|
||||
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
|
||||
"scan.attributes.metrics.enable";
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_DATA =
|
||||
"scan.attributes.metrics.data";
|
||||
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable";
|
||||
static public final String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data";
|
||||
|
||||
// If an application wants to use multiple scans over different tables each scan must
|
||||
// define this attribute with the appropriate table name by calling
|
||||
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
|
||||
static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
|
||||
|
||||
/*
|
||||
* -1 means no caching
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
||||
/**
|
||||
* Convert HBase tabular data from multiple scanners into a format that
|
||||
* is consumable by Map/Reduce.
|
||||
*
|
||||
* <p>
|
||||
* Usage example
|
||||
* </p>
|
||||
*
|
||||
* <pre>
|
||||
* List<Scan> scans = new ArrayList<Scan>();
|
||||
*
|
||||
* Scan scan1 = new Scan();
|
||||
* scan1.setStartRow(firstRow1);
|
||||
* scan1.setStopRow(lastRow1);
|
||||
* scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
|
||||
* scans.add(scan1);
|
||||
*
|
||||
* Scan scan2 = new Scan();
|
||||
* scan2.setStartRow(firstRow2);
|
||||
* scan2.setStopRow(lastRow2);
|
||||
* scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
|
||||
* scans.add(scan2);
|
||||
*
|
||||
* TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
|
||||
* IntWritable.class, job);
|
||||
* </pre>
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class MultiTableInputFormat extends MultiTableInputFormatBase implements
|
||||
Configurable {
|
||||
|
||||
/** Job parameter that specifies the scan list. */
|
||||
public static final String SCANS = "hbase.mapreduce.scans";
|
||||
|
||||
/** The configuration. */
|
||||
private Configuration conf = null;
|
||||
|
||||
/**
|
||||
* Returns the current configuration.
|
||||
*
|
||||
* @return The current configuration.
|
||||
* @see org.apache.hadoop.conf.Configurable#getConf()
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the configuration. This is used to set the details for the tables to
|
||||
* be scanned.
|
||||
*
|
||||
* @param configuration The configuration to set.
|
||||
* @see org.apache.hadoop.conf.Configurable#setConf(
|
||||
* org.apache.hadoop.conf.Configuration)
|
||||
*/
|
||||
@Override
|
||||
public void setConf(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
String[] rawScans = conf.getStrings(SCANS);
|
||||
if (rawScans.length <= 0) {
|
||||
throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
|
||||
+ SCANS);
|
||||
}
|
||||
List<Scan> scans = new ArrayList<Scan>();
|
||||
|
||||
for (int i = 0; i < rawScans.length; i++) {
|
||||
try {
|
||||
scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
|
||||
}
|
||||
}
|
||||
this.setScans(scans);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,216 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
|
||||
/**
|
||||
* A base for {@link MultiTableInputFormat}s. Receives a list of
|
||||
* {@link Scan} instances that define the input tables and
|
||||
* filters etc. Subclasses may use other TableRecordReader implementations.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class MultiTableInputFormatBase extends
|
||||
InputFormat<ImmutableBytesWritable, Result> {
|
||||
|
||||
final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
|
||||
|
||||
/** Holds the set of scans used to define the input. */
|
||||
private List<Scan> scans;
|
||||
|
||||
/** The reader scanning the table, can be a custom one. */
|
||||
private TableRecordReader tableRecordReader = null;
|
||||
|
||||
/**
|
||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses the
|
||||
* default.
|
||||
*
|
||||
* @param split The split to work with.
|
||||
* @param context The current context.
|
||||
* @return The newly created record reader.
|
||||
* @throws IOException When creating the reader fails.
|
||||
* @throws InterruptedException when record reader initialization fails
|
||||
* @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
|
||||
* org.apache.hadoop.mapreduce.InputSplit,
|
||||
* org.apache.hadoop.mapreduce.TaskAttemptContext)
|
||||
*/
|
||||
@Override
|
||||
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
|
||||
InputSplit split, TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
TableSplit tSplit = (TableSplit) split;
|
||||
|
||||
if (tSplit.getTableName() == null) {
|
||||
throw new IOException("Cannot create a record reader because of a"
|
||||
+ " previous error. Please look at the previous logs lines from"
|
||||
+ " the task's full log for more details.");
|
||||
}
|
||||
HTable table =
|
||||
new HTable(context.getConfiguration(), tSplit.getTableName());
|
||||
|
||||
TableRecordReader trr = this.tableRecordReader;
|
||||
// if no table record reader was provided use default
|
||||
if (trr == null) {
|
||||
trr = new TableRecordReader();
|
||||
}
|
||||
Scan sc = tSplit.getScan();
|
||||
sc.setStartRow(tSplit.getStartRow());
|
||||
sc.setStopRow(tSplit.getEndRow());
|
||||
trr.setScan(sc);
|
||||
trr.setHTable(table);
|
||||
trr.initialize(split, context);
|
||||
return trr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the splits that will serve as input for the map tasks. The
|
||||
* number of splits matches the number of regions in a table.
|
||||
*
|
||||
* @param context The current job context.
|
||||
* @return The list of input splits.
|
||||
* @throws IOException When creating the list of splits fails.
|
||||
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
|
||||
*/
|
||||
@Override
|
||||
public List<InputSplit> getSplits(JobContext context) throws IOException {
|
||||
if (scans.isEmpty()) {
|
||||
throw new IOException("No scans were provided.");
|
||||
}
|
||||
List<InputSplit> splits = new ArrayList<InputSplit>();
|
||||
|
||||
for (Scan scan : scans) {
|
||||
byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
|
||||
if (tableName == null)
|
||||
throw new IOException("A scan object did not have a table name");
|
||||
HTable table = new HTable(context.getConfiguration(), tableName);
|
||||
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
|
||||
if (keys == null || keys.getFirst() == null ||
|
||||
keys.getFirst().length == 0) {
|
||||
throw new IOException("Expecting at least one region for table : "
|
||||
+ Bytes.toString(tableName));
|
||||
}
|
||||
int count = 0;
|
||||
|
||||
byte[] startRow = scan.getStartRow();
|
||||
byte[] stopRow = scan.getStopRow();
|
||||
|
||||
for (int i = 0; i < keys.getFirst().length; i++) {
|
||||
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
|
||||
continue;
|
||||
}
|
||||
String regionLocation =
|
||||
table.getRegionLocation(keys.getFirst()[i], false).getHostname();
|
||||
|
||||
// determine if the given start and stop keys fall into the range
|
||||
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
|
||||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
|
||||
(stopRow.length == 0 ||
|
||||
Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
|
||||
byte[] splitStart =
|
||||
startRow.length == 0 ||
|
||||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys
|
||||
.getFirst()[i] : startRow;
|
||||
byte[] splitStop =
|
||||
(stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i],
|
||||
stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys
|
||||
.getSecond()[i] : stopRow;
|
||||
InputSplit split =
|
||||
new TableSplit(tableName, scan, splitStart,
|
||||
splitStop, regionLocation);
|
||||
splits.add(split);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
|
||||
}
|
||||
}
|
||||
table.close();
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the given region is to be included in the InputSplit while
|
||||
* splitting the regions of a table.
|
||||
* <p>
|
||||
* This optimization is effective when there is a specific reasoning to
|
||||
* exclude an entire region from the M-R job, (and hence, not contributing to
|
||||
* the InputSplit), given the start and end keys of the same. <br>
|
||||
* Useful when we need to remember the last-processed top record and revisit
|
||||
* the [last, current) interval for M-R processing, continuously. In addition
|
||||
* to reducing InputSplits, reduces the load on the region server as well, due
|
||||
* to the ordering of the keys. <br>
|
||||
* <br>
|
||||
* Note: It is possible that <code>endKey.length() == 0 </code> , for the last
|
||||
* (recent) region. <br>
|
||||
* Override this method, if you want to bulk exclude regions altogether from
|
||||
* M-R. By default, no region is excluded( i.e. all regions are included).
|
||||
*
|
||||
* @param startKey Start key of the region
|
||||
* @param endKey End key of the region
|
||||
* @return true, if this region needs to be included as part of the input
|
||||
* (default).
|
||||
*/
|
||||
protected boolean includeRegionInSplit(final byte[] startKey,
|
||||
final byte[] endKey) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the list of {@link Scan} objects.
|
||||
*/
|
||||
protected List<Scan> getScans() {
|
||||
return this.scans;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to set the list of {@link Scan} objects.
|
||||
*
|
||||
* @param scans The list of {@link Scan} used to define the input
|
||||
*/
|
||||
protected void setScans(List<Scan> scans) {
|
||||
this.scans = scans;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to set the {@link TableRecordReader}.
|
||||
*
|
||||
* @param tableRecordReader A different {@link TableRecordReader}
|
||||
* implementation.
|
||||
*/
|
||||
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
|
||||
this.tableRecordReader = tableRecordReader;
|
||||
}
|
||||
}
|
|
@ -23,8 +23,10 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLDecoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Base64;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -218,6 +221,67 @@ public class TableMapReduceUtil {
|
|||
initTableMapperJob(table, scan, mapper, outputKeyClass,
|
||||
outputValueClass, job, addDependencyJars, TableInputFormat.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a Multi TableMap job. It will appropriately set
|
||||
* up the job.
|
||||
*
|
||||
* @param scans The list of {@link Scan} objects to read from.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job to adjust. Make sure the passed job is carrying
|
||||
* all necessary HBase configuration.
|
||||
* @throws IOException When setting up the details fails.
|
||||
*/
|
||||
public static void initTableMapperJob(List<Scan> scans,
|
||||
Class<? extends TableMapper> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, Job job) throws IOException {
|
||||
initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
|
||||
true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a Multi TableMap job. It will appropriately set
|
||||
* up the job.
|
||||
*
|
||||
* @param scans The list of {@link Scan} objects to read from.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job to adjust. Make sure the passed job is carrying
|
||||
* all necessary HBase configuration.
|
||||
* @param addDependencyJars upload HBase jars and jars for any of the
|
||||
* configured job classes via the distributed cache (tmpjars).
|
||||
* @throws IOException When setting up the details fails.
|
||||
*/
|
||||
public static void initTableMapperJob(List<Scan> scans,
|
||||
Class<? extends TableMapper> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, Job job,
|
||||
boolean addDependencyJars) throws IOException {
|
||||
job.setInputFormatClass(MultiTableInputFormat.class);
|
||||
if (outputValueClass != null) {
|
||||
job.setMapOutputValueClass(outputValueClass);
|
||||
}
|
||||
if (outputKeyClass != null) {
|
||||
job.setMapOutputKeyClass(outputKeyClass);
|
||||
}
|
||||
job.setMapperClass(mapper);
|
||||
HBaseConfiguration.addHbaseResources(job.getConfiguration());
|
||||
List<String> scanStrings = new ArrayList<String>();
|
||||
|
||||
for (Scan scan : scans) {
|
||||
scanStrings.add(convertScanToString(scan));
|
||||
}
|
||||
job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
|
||||
scanStrings.toArray(new String[scanStrings.size()]));
|
||||
|
||||
if (addDependencyJars) {
|
||||
addDependencyJars(job);
|
||||
}
|
||||
}
|
||||
|
||||
public static void initCredentials(Job job) throws IOException {
|
||||
if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
|
||||
|
|
|
@ -23,30 +23,68 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
|
||||
/**
|
||||
* A table split corresponds to a key range (low, high). All references to row
|
||||
* below refer to the key of the row.
|
||||
* A table split corresponds to a key range (low, high) and an optional scanner.
|
||||
* All references to row below refer to the key of the row.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
@InterfaceStability.Evolving
|
||||
public class TableSplit extends InputSplit
|
||||
implements Writable, Comparable<TableSplit> {
|
||||
public static final Log LOG = LogFactory.getLog(TableSplit.class);
|
||||
|
||||
// should be < 0 (@see #readFields(DataInput))
|
||||
// version 1 supports Scan data member
|
||||
enum Version {
|
||||
UNVERSIONED(0),
|
||||
// Initial number we put on TableSplit when we introduced versioning.
|
||||
INITIAL(-1);
|
||||
|
||||
final int code;
|
||||
static final Version[] byCode;
|
||||
static {
|
||||
byCode = Version.values();
|
||||
for (int i = 0; i < byCode.length; i++) {
|
||||
if (byCode[i].code != -1 * i) {
|
||||
throw new AssertionError("Values in this enum should be descending by one");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
boolean atLeast(Version other) {
|
||||
return code <= other.code;
|
||||
}
|
||||
|
||||
static Version fromCode(int code) {
|
||||
return byCode[code * -1];
|
||||
}
|
||||
}
|
||||
|
||||
private static final Version VERSION = Version.INITIAL;
|
||||
private byte [] tableName;
|
||||
private byte [] startRow;
|
||||
private byte [] endRow;
|
||||
private String regionLocation;
|
||||
private String scan = ""; // stores the serialized form of the Scan
|
||||
|
||||
/** Default constructor. */
|
||||
public TableSplit() {
|
||||
this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
|
||||
this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.EMPTY_BYTE_ARRAY, "");
|
||||
}
|
||||
|
||||
|
@ -54,17 +92,47 @@ implements Writable, Comparable<TableSplit> {
|
|||
* Creates a new instance while assigning all variables.
|
||||
*
|
||||
* @param tableName The name of the current table.
|
||||
* @param scan The scan associated with this split.
|
||||
* @param startRow The start row of the split.
|
||||
* @param endRow The end row of the split.
|
||||
* @param location The location of the region.
|
||||
*/
|
||||
public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
|
||||
public TableSplit(byte [] tableName, Scan scan, byte [] startRow, byte [] endRow,
|
||||
final String location) {
|
||||
this.tableName = tableName;
|
||||
try {
|
||||
this.scan =
|
||||
(null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to convert Scan to String", e);
|
||||
}
|
||||
this.startRow = startRow;
|
||||
this.endRow = endRow;
|
||||
this.regionLocation = location;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance without a scanner.
|
||||
*
|
||||
* @param tableName The name of the current table.
|
||||
* @param startRow The start row of the split.
|
||||
* @param endRow The end row of the split.
|
||||
* @param location The location of the region.
|
||||
*/
|
||||
public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow,
|
||||
final String location) {
|
||||
this(tableName, null, startRow, endRow, location);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Scan object from the stored string representation.
|
||||
*
|
||||
* @return Returns a Scan object based on the stored scanner.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Scan getScan() throws IOException {
|
||||
return TableMapReduceUtil.convertStringToScan(this.scan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the table name.
|
||||
|
@ -133,10 +201,29 @@ implements Writable, Comparable<TableSplit> {
|
|||
*/
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
tableName = Bytes.readByteArray(in);
|
||||
Version version = Version.UNVERSIONED;
|
||||
// TableSplit was not versioned in the beginning.
|
||||
// In order to introduce it now, we make use of the fact
|
||||
// that tableName was written with Bytes.writeByteArray,
|
||||
// which encodes the array length as a vint which is >= 0.
|
||||
// Hence if the vint is >= 0 we have an old version and the vint
|
||||
// encodes the length of tableName.
|
||||
// If < 0 we just read the version and the next vint is the length.
|
||||
// @see Bytes#readByteArray(DataInput)
|
||||
int len = WritableUtils.readVInt(in);
|
||||
if (len < 0) {
|
||||
// what we just read was the version
|
||||
version = Version.fromCode(len);
|
||||
len = WritableUtils.readVInt(in);
|
||||
}
|
||||
tableName = new byte[len];
|
||||
in.readFully(tableName);
|
||||
startRow = Bytes.readByteArray(in);
|
||||
endRow = Bytes.readByteArray(in);
|
||||
regionLocation = Bytes.toString(Bytes.readByteArray(in));
|
||||
if (version.atLeast(Version.INITIAL)) {
|
||||
scan = Bytes.toString(Bytes.readByteArray(in));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,10 +234,12 @@ implements Writable, Comparable<TableSplit> {
|
|||
*/
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
WritableUtils.writeVInt(out, VERSION.code);
|
||||
Bytes.writeByteArray(out, tableName);
|
||||
Bytes.writeByteArray(out, startRow);
|
||||
Bytes.writeByteArray(out, endRow);
|
||||
Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
|
||||
Bytes.writeByteArray(out, Bytes.toBytes(scan));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -174,7 +263,12 @@ implements Writable, Comparable<TableSplit> {
|
|||
*/
|
||||
@Override
|
||||
public int compareTo(TableSplit split) {
|
||||
return Bytes.compareTo(getStartRow(), split.getStartRow());
|
||||
// If The table name of the two splits is the same then compare start row
|
||||
// otherwise compare based on table names
|
||||
int tableNameComparison =
|
||||
Bytes.compareTo(getTableName(), split.getTableName());
|
||||
return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
|
||||
getStartRow(), split.getStartRow());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -191,6 +285,7 @@ implements Writable, Comparable<TableSplit> {
|
|||
@Override
|
||||
public int hashCode() {
|
||||
int result = tableName != null ? Arrays.hashCode(tableName) : 0;
|
||||
result = 31 * result + (scan != null ? scan.hashCode() : 0);
|
||||
result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
|
||||
result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
|
||||
result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
|
||||
|
|
|
@ -0,0 +1,254 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Tests various scan start and stop row scenarios. This is set in a scan and
|
||||
* tested in a MapReduce job to see if that is handed over and done properly
|
||||
* too.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestMultiTableInputFormat {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class);
|
||||
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
static final String TABLE_NAME = "scantest";
|
||||
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
|
||||
static final String KEY_STARTROW = "startRow";
|
||||
static final String KEY_LASTROW = "stpRow";
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// switch TIF to log at DEBUG level
|
||||
TEST_UTIL.enableDebug(MultiTableInputFormat.class);
|
||||
TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
|
||||
// start mini hbase cluster
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
// create and fill table
|
||||
for (int i = 0; i < 3; i++) {
|
||||
HTable table =
|
||||
TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME + String.valueOf(i)),
|
||||
INPUT_FAMILY);
|
||||
TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
|
||||
TEST_UTIL.loadTable(table, INPUT_FAMILY);
|
||||
}
|
||||
// start MR cluster
|
||||
TEST_UTIL.startMiniMapReduceCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
Configuration c = TEST_UTIL.getConfiguration();
|
||||
FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
|
||||
}
|
||||
|
||||
/**
|
||||
* Pass the key and value to reducer.
|
||||
*/
|
||||
public static class ScanMapper extends
|
||||
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||
/**
|
||||
* Pass the key and value to reduce.
|
||||
*
|
||||
* @param key The key, here "aaa", "aab" etc.
|
||||
* @param value The value is the same as the key.
|
||||
* @param context The task context.
|
||||
* @throws IOException When reading the rows fails.
|
||||
*/
|
||||
@Override
|
||||
public void map(ImmutableBytesWritable key, Result value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
if (value.size() != 1) {
|
||||
throw new IOException("There should only be one input column");
|
||||
}
|
||||
Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
|
||||
value.getMap();
|
||||
if (!cf.containsKey(INPUT_FAMILY)) {
|
||||
throw new IOException("Wrong input columns. Missing: '" +
|
||||
Bytes.toString(INPUT_FAMILY) + "'.");
|
||||
}
|
||||
String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
|
||||
LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
|
||||
", value -> " + val);
|
||||
context.write(key, key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the last and first keys seen against the scanner boundaries.
|
||||
*/
|
||||
public static class ScanReducer
|
||||
extends
|
||||
Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
|
||||
NullWritable, NullWritable> {
|
||||
private String first = null;
|
||||
private String last = null;
|
||||
|
||||
protected void reduce(ImmutableBytesWritable key,
|
||||
Iterable<ImmutableBytesWritable> values, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
int count = 0;
|
||||
for (ImmutableBytesWritable value : values) {
|
||||
String val = Bytes.toStringBinary(value.get());
|
||||
LOG.debug("reduce: key[" + count + "] -> " +
|
||||
Bytes.toStringBinary(key.get()) + ", value -> " + val);
|
||||
if (first == null) first = val;
|
||||
last = val;
|
||||
count++;
|
||||
}
|
||||
assertEquals(3, count);
|
||||
}
|
||||
|
||||
protected void cleanup(Context context) throws IOException,
|
||||
InterruptedException {
|
||||
Configuration c = context.getConfiguration();
|
||||
String startRow = c.get(KEY_STARTROW);
|
||||
String lastRow = c.get(KEY_LASTROW);
|
||||
LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
|
||||
startRow + "\"");
|
||||
LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
|
||||
"\"");
|
||||
if (startRow != null && startRow.length() > 0) {
|
||||
assertEquals(startRow, first);
|
||||
}
|
||||
if (lastRow != null && lastRow.length() > 0) {
|
||||
assertEquals(lastRow, last);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanEmptyToEmpty() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
testScan(null, null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanEmptyToAPP() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
testScan(null, "app", "apo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanOBBToOPP() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
testScan("obb", "opp", "opo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanOPPToEmpty() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
testScan("opp", null, "zzz");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanYZYToEmpty() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
testScan("yzy", null, "zzz");
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests a MR scan using specific start and stop rows.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws ClassNotFoundException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void testScan(String start, String stop, String last)
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
String jobName =
|
||||
"Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
|
||||
(stop != null ? stop.toUpperCase() : "Empty");
|
||||
LOG.info("Before map/reduce startup - job " + jobName);
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
|
||||
c.set(KEY_STARTROW, start != null ? start : "");
|
||||
c.set(KEY_LASTROW, last != null ? last : "");
|
||||
|
||||
List<Scan> scans = new ArrayList<Scan>();
|
||||
|
||||
for(int i=0; i<3; i++){
|
||||
Scan scan = new Scan();
|
||||
|
||||
scan.addFamily(INPUT_FAMILY);
|
||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(TABLE_NAME + i));
|
||||
|
||||
if (start != null) {
|
||||
scan.setStartRow(Bytes.toBytes(start));
|
||||
}
|
||||
if (stop != null) {
|
||||
scan.setStopRow(Bytes.toBytes(stop));
|
||||
}
|
||||
|
||||
scans.add(scan);
|
||||
|
||||
LOG.info("scan before: " + scan);
|
||||
}
|
||||
|
||||
Job job = new Job(c, jobName);
|
||||
|
||||
TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class,
|
||||
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
job.setReducerClass(ScanReducer.class);
|
||||
job.setNumReduceTasks(1); // one to get final "first" and "last" key
|
||||
FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
|
||||
LOG.info("Started " + job.getJobName());
|
||||
job.waitForCompletion(true);
|
||||
assertTrue(job.isSuccessful());
|
||||
LOG.info("After map/reduce completion - job " + jobName);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue