HBASE-13028 Cleanup MapReduce InputFormats
Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
This commit is contained in:
parent
05e0b46d52
commit
4d0de57a77
|
@ -27,7 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
|
@ -49,6 +50,15 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
|
||||
|
||||
public void configure(JobConf job) {
|
||||
try {
|
||||
initialize(job);
|
||||
} catch (Exception e) {
|
||||
LOG.error(StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initialize(JobConf job) throws IOException {
|
||||
Path[] tableNames = FileInputFormat.getInputPaths(job);
|
||||
String colArg = job.get(COLUMN_LIST);
|
||||
String[] colNames = colArg.split(" ");
|
||||
|
@ -57,12 +67,8 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
m_cols[i] = Bytes.toBytes(colNames[i]);
|
||||
}
|
||||
setInputColumns(m_cols);
|
||||
try {
|
||||
setHTable(
|
||||
new HTable(HBaseConfiguration.create(job), TableName.valueOf(tableNames[0].getName())));
|
||||
} catch (Exception e) {
|
||||
LOG.error(StringUtils.stringifyException(e));
|
||||
}
|
||||
Connection connection = ConnectionFactory.createConnection(job);
|
||||
initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
|
||||
}
|
||||
|
||||
public void validateInput(JobConf job) throws IOException {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.mapred;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -25,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
|
@ -39,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter;
|
|||
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
|
||||
* byte[] of input columns and optionally a {@link Filter}.
|
||||
* Subclasses may use other TableRecordReader implementations.
|
||||
*
|
||||
* Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
|
||||
* function properly. Each of the entry points to this class used by the MapReduce framework,
|
||||
* {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
|
||||
* will call {@link #initialize(JobConf)} as a convenient centralized location to handle
|
||||
* retrieving the necessary configuration information. If your subclass overrides either of these
|
||||
* methods, either call the parent version or call initialize yourself.
|
||||
*
|
||||
* <p>
|
||||
* An example of a subclass:
|
||||
* <pre>
|
||||
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
* class ExampleTIF extends TableInputFormatBase {
|
||||
*
|
||||
* @Override
|
||||
* public void configure(JobConf job) {
|
||||
* try {
|
||||
* HTable exampleTable = new HTable(HBaseConfiguration.create(job),
|
||||
* Bytes.toBytes("exampleTable"));
|
||||
* // mandatory
|
||||
* setHTable(exampleTable);
|
||||
* byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
* Bytes.toBytes("columnB") };
|
||||
* // mandatory
|
||||
* setInputColumns(inputColumns);
|
||||
* // optional
|
||||
* Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
* setRowFilter(exampleFilter);
|
||||
* } catch (IOException exception) {
|
||||
* throw new RuntimeException("Failed to configure for job.", exception);
|
||||
* }
|
||||
* {@literal @}Override
|
||||
* protected void initialize(JobConf context) throws IOException {
|
||||
* // We are responsible for the lifecycle of this connection until we hand it over in
|
||||
* // initializeTable.
|
||||
* Connection connection =
|
||||
* ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
* TableName tableName = TableName.valueOf("exampleTable");
|
||||
* // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
|
||||
* initializeTable(connection, tableName);
|
||||
* byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
* Bytes.toBytes("columnB") };
|
||||
* // mandatory
|
||||
* setInputColumns(inputColumns);
|
||||
* // optional, by default we'll get everything for the given columns.
|
||||
* Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
* setRowFilter(exampleFilter);
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
|
@ -73,9 +84,17 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
private byte [][] inputColumns;
|
||||
private HTable table;
|
||||
private Connection connection;
|
||||
private TableRecordReader tableRecordReader;
|
||||
private Filter rowFilter;
|
||||
|
||||
private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
|
||||
"initialized. Ensure you call initializeTable either in your constructor or initialize " +
|
||||
"method";
|
||||
private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
|
||||
" previous error. Please look at the previous logs lines from" +
|
||||
" the task's full log for more details.";
|
||||
|
||||
/**
|
||||
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
|
||||
* the default.
|
||||
|
@ -86,19 +105,63 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
|
||||
InputSplit split, JobConf job, Reporter reporter)
|
||||
throws IOException {
|
||||
TableSplit tSplit = (TableSplit) split;
|
||||
TableRecordReader trr = this.tableRecordReader;
|
||||
// if no table record reader was provided use default
|
||||
if (trr == null) {
|
||||
trr = new TableRecordReader();
|
||||
// In case a subclass uses the deprecated approach or calls initializeTable directly
|
||||
if (table == null) {
|
||||
initialize(job);
|
||||
}
|
||||
// null check in case our child overrides getTable to not throw.
|
||||
try {
|
||||
if (getTable() == null) {
|
||||
// initialize() must not have been implemented in the subclass.
|
||||
throw new IOException(INITIALIZATION_ERROR);
|
||||
}
|
||||
} catch (IllegalStateException exception) {
|
||||
throw new IOException(INITIALIZATION_ERROR, exception);
|
||||
}
|
||||
|
||||
TableSplit tSplit = (TableSplit) split;
|
||||
// if no table record reader was provided use default
|
||||
final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
|
||||
this.tableRecordReader;
|
||||
trr.setStartRow(tSplit.getStartRow());
|
||||
trr.setEndRow(tSplit.getEndRow());
|
||||
trr.setHTable(this.table);
|
||||
trr.setInputColumns(this.inputColumns);
|
||||
trr.setRowFilter(this.rowFilter);
|
||||
trr.init();
|
||||
return trr;
|
||||
return new RecordReader<ImmutableBytesWritable, Result>() {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
trr.close();
|
||||
closeTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableBytesWritable createKey() {
|
||||
return trr.createKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result createValue() {
|
||||
return trr.createValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPos() throws IOException {
|
||||
return trr.getPos();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getProgress() throws IOException {
|
||||
return trr.getProgress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
|
||||
return trr.next(key, value);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -121,8 +184,18 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
*/
|
||||
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
|
||||
if (this.table == null) {
|
||||
throw new IOException("No table was provided");
|
||||
initialize(job);
|
||||
}
|
||||
// null check in case our child overrides getTable to not throw.
|
||||
try {
|
||||
if (getTable() == null) {
|
||||
// initialize() must not have been implemented in the subclass.
|
||||
throw new IOException(INITIALIZATION_ERROR);
|
||||
}
|
||||
} catch (IllegalStateException exception) {
|
||||
throw new IOException(INITIALIZATION_ERROR, exception);
|
||||
}
|
||||
|
||||
byte [][] startKeys = this.table.getStartKeys();
|
||||
if (startKeys == null || startKeys.length == 0) {
|
||||
throw new IOException("Expecting at least one region");
|
||||
|
@ -149,6 +222,22 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to initialize the table information.
|
||||
*
|
||||
* @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
|
||||
* @param tableName The {@link TableName} of the table to process.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
|
||||
if (table != null || connection != null) {
|
||||
LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
|
||||
"reference; TableInputFormatBase will not close these old references when done.");
|
||||
}
|
||||
this.table = (HTable) connection.getTable(tableName);
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param inputColumns to be passed in {@link Result} to the map task.
|
||||
*/
|
||||
|
@ -158,8 +247,20 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
|
||||
/**
|
||||
* Allows subclasses to get the {@link HTable}.
|
||||
* @deprecated use {@link #getTable()}
|
||||
*/
|
||||
@Deprecated
|
||||
protected HTable getHTable() {
|
||||
return (HTable) getTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the {@link Table}.
|
||||
*/
|
||||
protected Table getTable() {
|
||||
if (table == null) {
|
||||
throw new IllegalStateException(NOT_INITIALIZED);
|
||||
}
|
||||
return this.table;
|
||||
}
|
||||
|
||||
|
@ -167,7 +268,9 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
* Allows subclasses to set the {@link HTable}.
|
||||
*
|
||||
* @param table to get the data from
|
||||
* @deprecated use {@link #initializeTable(Connection,TableName)}
|
||||
*/
|
||||
@Deprecated
|
||||
protected void setHTable(HTable table) {
|
||||
this.table = table;
|
||||
}
|
||||
|
@ -190,4 +293,40 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
protected void setRowFilter(Filter rowFilter) {
|
||||
this.rowFilter = rowFilter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle subclass specific set up.
|
||||
* Each of the entry points used by the MapReduce framework,
|
||||
* {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
|
||||
* will call {@link #initialize(JobConf)} as a convenient centralized location to handle
|
||||
* retrieving the necessary configuration information and calling
|
||||
* {@link #initializeTable(Connection, TableName)}.
|
||||
*
|
||||
* Subclasses should implement their initialize call such that it is safe to call multiple times.
|
||||
* The current TableInputFormatBase implementation relies on a non-null table reference to decide
|
||||
* if an initialize call is needed, but this behavior may change in the future. In particular,
|
||||
* it is critical that initializeTable not be called multiple times since this will leak
|
||||
* Connection instances.
|
||||
*
|
||||
*/
|
||||
protected void initialize(JobConf job) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the Table and related objects that were initialized via
|
||||
* {@link #initializeTable(Connection, TableName)}.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void closeTable() throws IOException {
|
||||
close(table, connection);
|
||||
table = null;
|
||||
connection = null;
|
||||
}
|
||||
|
||||
private void close(Closeable... closables) throws IOException {
|
||||
for (Closeable c : closables) {
|
||||
if(c != null) { c.close(); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,7 +175,9 @@ implements Configurable {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void initialize() {
|
||||
protected void initialize(JobContext context) throws IOException {
|
||||
// Do we have to worry about mis-matches between the Configuration from setConf and the one
|
||||
// in this context?
|
||||
TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
|
||||
try {
|
||||
initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
|
||||
|
|
|
@ -63,38 +63,39 @@ import org.apache.hadoop.util.StringUtils;
|
|||
* A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
|
||||
* an {@link Scan} instance that defines the input columns etc. Subclasses may use
|
||||
* other TableRecordReader implementations.
|
||||
*
|
||||
* Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
|
||||
* function properly. Each of the entry points to this class used by the MapReduce framework,
|
||||
* {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
|
||||
* will call {@link #initialize(JobContext)} as a convenient centralized location to handle
|
||||
* retrieving the necessary configuration information. If your subclass overrides either of these
|
||||
* methods, either call the parent version or call initialize yourself.
|
||||
*
|
||||
* <p>
|
||||
* An example of a subclass:
|
||||
* <pre>
|
||||
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
* class ExampleTIF extends TableInputFormatBase {
|
||||
*
|
||||
* private JobConf job;
|
||||
*
|
||||
* @Override
|
||||
* public void configure(JobConf job) {
|
||||
* try {
|
||||
* this.job = job;
|
||||
* byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
* Bytes.toBytes("columnB") };
|
||||
* // optional
|
||||
* Scan scan = new Scan();
|
||||
* for (byte[] family : inputColumns) {
|
||||
* scan.addFamily(family);
|
||||
* }
|
||||
* Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
* scan.setFilter(exampleFilter);
|
||||
* setScan(scan);
|
||||
* } catch (IOException exception) {
|
||||
* throw new RuntimeException("Failed to configure for job.", exception);
|
||||
* }
|
||||
*
|
||||
* protected void initialize() {
|
||||
* Connection connection =
|
||||
* ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
* {@literal @}Override
|
||||
* protected void initialize(JobContext context) throws IOException {
|
||||
* // We are responsible for the lifecycle of this connection until we hand it over in
|
||||
* // initializeTable.
|
||||
* Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
|
||||
* job.getConfiguration()));
|
||||
* TableName tableName = TableName.valueOf("exampleTable");
|
||||
* // mandatory
|
||||
* // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
|
||||
* initializeTable(connection, tableName);
|
||||
* }
|
||||
* byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
* Bytes.toBytes("columnB") };
|
||||
* // optional, by default we'll get everything for the table.
|
||||
* Scan scan = new Scan();
|
||||
* for (byte[] family : inputColumns) {
|
||||
* scan.addFamily(family);
|
||||
* }
|
||||
* Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
* scan.setFilter(exampleFilter);
|
||||
* setScan(scan);
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
|
@ -105,6 +106,13 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
|
||||
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
|
||||
private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
|
||||
"initialized. Ensure you call initializeTable either in your constructor or initialize " +
|
||||
"method";
|
||||
private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
|
||||
" previous error. Please look at the previous logs lines from" +
|
||||
" the task's full log for more details.";
|
||||
|
||||
/** Holds the details for the internal scanner.
|
||||
*
|
||||
* @see Scan */
|
||||
|
@ -141,14 +149,18 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
|
||||
InputSplit split, TaskAttemptContext context)
|
||||
throws IOException {
|
||||
// Just in case a subclass is relying on JobConfigurable magic.
|
||||
if (table == null) {
|
||||
initialize();
|
||||
initialize(context);
|
||||
}
|
||||
if (getTable() == null) {
|
||||
// initialize() must not have been implemented in the subclass.
|
||||
throw new IOException("Cannot create a record reader because of a" +
|
||||
" previous error. Please look at the previous logs lines from" +
|
||||
" the task's full log for more details.");
|
||||
// null check in case our child overrides getTable to not throw.
|
||||
try {
|
||||
if (getTable() == null) {
|
||||
// initialize() must not have been implemented in the subclass.
|
||||
throw new IOException(INITIALIZATION_ERROR);
|
||||
}
|
||||
} catch (IllegalStateException exception) {
|
||||
throw new IOException(INITIALIZATION_ERROR, exception);
|
||||
}
|
||||
TableSplit tSplit = (TableSplit) split;
|
||||
LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
|
||||
|
@ -213,13 +225,20 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
public List<InputSplit> getSplits(JobContext context) throws IOException {
|
||||
boolean closeOnFinish = false;
|
||||
|
||||
// Just in case a subclass is relying on JobConfigurable magic.
|
||||
if (table == null) {
|
||||
initialize();
|
||||
initialize(context);
|
||||
closeOnFinish = true;
|
||||
}
|
||||
if (table == null) {
|
||||
// initialize() wasn't implemented, so the table is null.
|
||||
throw new IOException("No table was provided.");
|
||||
|
||||
// null check in case our child overrides getTable to not throw.
|
||||
try {
|
||||
if (getTable() == null) {
|
||||
// initialize() must not have been implemented in the subclass.
|
||||
throw new IOException(INITIALIZATION_ERROR);
|
||||
}
|
||||
} catch (IllegalStateException exception) {
|
||||
throw new IOException(INITIALIZATION_ERROR, exception);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -293,6 +312,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated mistakenly made public in 0.98.7. scope will change to package-private
|
||||
*/
|
||||
@Deprecated
|
||||
public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
|
||||
String hostName = this.reverseDNSCacheMap.get(ipAddress);
|
||||
if (hostName == null) {
|
||||
|
@ -341,7 +364,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
/**
|
||||
* Allows subclasses to get the {@link HTable}.
|
||||
*
|
||||
* @deprecated
|
||||
* @deprecated use {@link #getTable()}
|
||||
*/
|
||||
@Deprecated
|
||||
protected HTable getHTable() {
|
||||
|
@ -353,7 +376,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
*/
|
||||
protected RegionLocator getRegionLocator() {
|
||||
if (regionLocator == null) {
|
||||
initialize();
|
||||
throw new IllegalStateException(NOT_INITIALIZED);
|
||||
}
|
||||
return regionLocator;
|
||||
}
|
||||
|
@ -363,7 +386,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
*/
|
||||
protected Table getTable() {
|
||||
if (table == null) {
|
||||
initialize();
|
||||
throw new IllegalStateException(NOT_INITIALIZED);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
@ -373,7 +396,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
*/
|
||||
protected Admin getAdmin() {
|
||||
if (admin == null) {
|
||||
initialize();
|
||||
throw new IllegalStateException(NOT_INITIALIZED);
|
||||
}
|
||||
return admin;
|
||||
}
|
||||
|
@ -381,6 +404,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
/**
|
||||
* Allows subclasses to set the {@link HTable}.
|
||||
*
|
||||
* Will attempt to reuse the underlying Connection for our own needs, including
|
||||
* retreiving an Admin interface to the HBase cluster.
|
||||
*
|
||||
* @param table The table to get the data from.
|
||||
* @throws IOException
|
||||
* @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
|
||||
|
@ -417,6 +443,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
* @throws IOException
|
||||
*/
|
||||
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
|
||||
if (table != null || connection != null) {
|
||||
LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
|
||||
"reference; TableInputFormatBase will not close these old references when done.");
|
||||
}
|
||||
this.table = connection.getTable(tableName);
|
||||
this.regionLocator = connection.getRegionLocator(tableName);
|
||||
this.admin = connection.getAdmin();
|
||||
|
@ -453,12 +483,21 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
}
|
||||
|
||||
/**
|
||||
* This method will be called when any of the following are referenced, but not yet initialized:
|
||||
* admin, regionLocator, table. Subclasses will have the opportunity to call
|
||||
* {@link #initializeTable(Connection, TableName)}
|
||||
* Handle subclass specific set up.
|
||||
* Each of the entry points used by the MapReduce framework,
|
||||
* {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
|
||||
* will call {@link #initialize(JobContext)} as a convenient centralized location to handle
|
||||
* retrieving the necessary configuration information and calling
|
||||
* {@link #initializeTable(Connection, TableName)}.
|
||||
*
|
||||
* Subclasses should implement their initialize call such that it is safe to call multiple times.
|
||||
* The current TableInputFormatBase implementation relies on a non-null table reference to decide
|
||||
* if an initialize call is needed, but this behavior may change in the future. In particular,
|
||||
* it is critical that initializeTable not be called multiple times since this will leak
|
||||
* Connection instances.
|
||||
*
|
||||
*/
|
||||
protected void initialize() {
|
||||
|
||||
protected void initialize(JobContext context) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
import org.apache.hadoop.mapred.JobClient;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.JobConfigurable;
|
||||
|
@ -321,8 +324,30 @@ public class TestTableInputFormat {
|
|||
LOG.info("testing use of an InputFormat taht extends InputFormatBase");
|
||||
final Table table = createTable(Bytes.toBytes("exampleTable"),
|
||||
new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
|
||||
testInputFormat(ExampleTIF.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
|
||||
LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
|
||||
+ "as it was given in 0.98.");
|
||||
final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
|
||||
new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
|
||||
testInputFormat(ExampleDeprecatedTIF.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
|
||||
LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
|
||||
+ "using JobConfigurable.");
|
||||
final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
|
||||
new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
|
||||
testInputFormat(ExampleJobConfigurableTIF.class);
|
||||
}
|
||||
|
||||
void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
|
||||
final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
|
||||
job.setInputFormat(ExampleTIF.class);
|
||||
job.setInputFormat(clazz);
|
||||
job.setOutputFormat(NullOutputFormat.class);
|
||||
job.setMapperClass(ExampleVerifier.class);
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -372,13 +397,13 @@ public class TestTableInputFormat {
|
|||
|
||||
}
|
||||
|
||||
public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
try {
|
||||
HTable exampleTable = new HTable(HBaseConfiguration.create(job),
|
||||
Bytes.toBytes("exampleTable"));
|
||||
Bytes.toBytes("exampleDeprecatedTable"));
|
||||
// mandatory
|
||||
setHTable(exampleTable);
|
||||
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
|
@ -395,5 +420,46 @@ public class TestTableInputFormat {
|
|||
|
||||
}
|
||||
|
||||
public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
try {
|
||||
initialize(job);
|
||||
} catch (IOException exception) {
|
||||
throw new RuntimeException("Failed to initialize.", exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initialize(JobConf job) throws IOException {
|
||||
initialize(job, "exampleJobConfigurableTable");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class ExampleTIF extends TableInputFormatBase {
|
||||
|
||||
@Override
|
||||
protected void initialize(JobConf job) throws IOException {
|
||||
initialize(job, "exampleTable");
|
||||
}
|
||||
|
||||
protected void initialize(JobConf job, String table) throws IOException {
|
||||
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
TableName tableName = TableName.valueOf(table);
|
||||
// mandatory
|
||||
initializeTable(connection, tableName);
|
||||
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
Bytes.toBytes("columnB") };
|
||||
// mandatory
|
||||
setInputColumns(inputColumns);
|
||||
Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
// optional
|
||||
setRowFilter(exampleFilter);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
|
|||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.Mapper.Context;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -342,6 +343,16 @@ public class TestTableInputFormat {
|
|||
testInputFormat(ExampleTIF.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobConfigurableExtensionOfTableInputFormatBase()
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
|
||||
"using JobConfigurable.");
|
||||
final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
|
||||
new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
|
||||
testInputFormat(ExampleJobConfigurableTIF.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeprecatedExtensionOfTableInputFormatBase()
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
|
@ -422,13 +433,43 @@ public class TestTableInputFormat {
|
|||
|
||||
}
|
||||
|
||||
public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
|
||||
|
||||
private JobConf job;
|
||||
public static class ExampleJobConfigurableTIF extends TableInputFormatBase
|
||||
implements JobConfigurable {
|
||||
|
||||
@Override
|
||||
public void configure(JobConf job) {
|
||||
this.job = job;
|
||||
try {
|
||||
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
|
||||
// mandatory
|
||||
initializeTable(connection, tableName);
|
||||
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
Bytes.toBytes("columnB") };
|
||||
//optional
|
||||
Scan scan = new Scan();
|
||||
for (byte[] family : inputColumns) {
|
||||
scan.addFamily(family);
|
||||
}
|
||||
Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
|
||||
scan.setFilter(exampleFilter);
|
||||
setScan(scan);
|
||||
} catch (IOException exception) {
|
||||
throw new RuntimeException("Failed to initialize.", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class ExampleTIF extends TableInputFormatBase {
|
||||
|
||||
@Override
|
||||
protected void initialize(JobContext job) throws IOException {
|
||||
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
|
||||
job.getConfiguration()));
|
||||
TableName tableName = TableName.valueOf("exampleTable");
|
||||
// mandatory
|
||||
initializeTable(connection, tableName);
|
||||
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
|
||||
Bytes.toBytes("columnB") };
|
||||
//optional
|
||||
|
@ -441,22 +482,6 @@ public class TestTableInputFormat {
|
|||
setScan(scan);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initialize() {
|
||||
if (job == null) {
|
||||
throw new IllegalStateException("must have already gotten the JobConf before initialize " +
|
||||
"is called.");
|
||||
}
|
||||
try {
|
||||
Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
|
||||
TableName tableName = TableName.valueOf("exampleTable");
|
||||
// mandatory
|
||||
initializeTable(connection, tableName);
|
||||
} catch (IOException exception) {
|
||||
throw new RuntimeException("Failed to initialize.", exception);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue