diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
index e1220fbc18d..a7d23d49d90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
@@ -27,7 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -49,6 +50,15 @@ public class TableInputFormat extends TableInputFormatBase implements
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
public void configure(JobConf job) {
+ try {
+ initialize(job);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ protected void initialize(JobConf job) throws IOException {
Path[] tableNames = FileInputFormat.getInputPaths(job);
String colArg = job.get(COLUMN_LIST);
String[] colNames = colArg.split(" ");
@@ -57,12 +67,8 @@ public class TableInputFormat extends TableInputFormatBase implements
m_cols[i] = Bytes.toBytes(colNames[i]);
}
setInputColumns(m_cols);
- try {
- setHTable(
- new HTable(HBaseConfiguration.create(job), TableName.valueOf(tableNames[0].getName())));
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
+ Connection connection = ConnectionFactory.createConnection(job);
+ initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
}
public void validateInput(JobConf job) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
index b181dac5b16..da457cf4d38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.mapred;
+import java.io.Closeable;
import java.io.IOException;
import org.apache.commons.logging.Log;
@@ -25,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.InputFormat;
@@ -39,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter;
* A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
* byte[] of input columns and optionally a {@link Filter}.
* Subclasses may use other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
*
* An example of a subclass:
*
- * class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ * class ExampleTIF extends TableInputFormatBase {
*
- * @Override
- * public void configure(JobConf job) {
- * try {
- * HTable exampleTable = new HTable(HBaseConfiguration.create(job),
- * Bytes.toBytes("exampleTable"));
- * // mandatory
- * setHTable(exampleTable);
- * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- * Bytes.toBytes("columnB") };
- * // mandatory
- * setInputColumns(inputColumns);
- * // optional
- * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- * setRowFilter(exampleFilter);
- * } catch (IOException exception) {
- * throw new RuntimeException("Failed to configure for job.", exception);
- * }
+ * {@literal @}Override
+ * protected void initialize(JobConf context) throws IOException {
+ * // We are responsible for the lifecycle of this connection until we hand it over in
+ * // initializeTable.
+ * Connection connection =
+ * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ * TableName tableName = TableName.valueOf("exampleTable");
+ * // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ * initializeTable(connection, tableName);
+ * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ * Bytes.toBytes("columnB") };
+ * // mandatory
+ * setInputColumns(inputColumns);
+ * // optional, by default we'll get everything for the given columns.
+ * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ * setRowFilter(exampleFilter);
* }
* }
*
@@ -73,9 +84,17 @@ implements InputFormat {
private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
private byte [][] inputColumns;
private HTable table;
+ private Connection connection;
private TableRecordReader tableRecordReader;
private Filter rowFilter;
+ private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+ "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+ "method";
+ private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+ " previous error. Please look at the previous logs lines from" +
+ " the task's full log for more details.";
+
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
@@ -86,19 +105,63 @@ implements InputFormat {
public RecordReader getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException {
- TableSplit tSplit = (TableSplit) split;
- TableRecordReader trr = this.tableRecordReader;
- // if no table record reader was provided use default
- if (trr == null) {
- trr = new TableRecordReader();
+ // In case a subclass uses the deprecated approach or calls initializeTable directly
+ if (table == null) {
+ initialize(job);
}
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
+ }
+
+ TableSplit tSplit = (TableSplit) split;
+ // if no table record reader was provided use default
+ final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
+ this.tableRecordReader;
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
trr.init();
- return trr;
+ return new RecordReader() {
+
+ @Override
+ public void close() throws IOException {
+ trr.close();
+ closeTable();
+ }
+
+ @Override
+ public ImmutableBytesWritable createKey() {
+ return trr.createKey();
+ }
+
+ @Override
+ public Result createValue() {
+ return trr.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return trr.getPos();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return trr.getProgress();
+ }
+
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+ return trr.next(key, value);
+ }
+ };
}
/**
@@ -121,8 +184,18 @@ implements InputFormat {
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
if (this.table == null) {
- throw new IOException("No table was provided");
+ initialize(job);
}
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
+ }
+
byte [][] startKeys = this.table.getStartKeys();
if (startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
@@ -149,6 +222,22 @@ implements InputFormat {
return splits;
}
+ /**
+ * Allows subclasses to initialize the table information.
+ *
+ * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
+ * @param tableName The {@link TableName} of the table to process.
+ * @throws IOException
+ */
+ protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+ if (table != null || connection != null) {
+ LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+ "reference; TableInputFormatBase will not close these old references when done.");
+ }
+ this.table = (HTable) connection.getTable(tableName);
+ this.connection = connection;
+ }
+
/**
* @param inputColumns to be passed in {@link Result} to the map task.
*/
@@ -158,8 +247,20 @@ implements InputFormat {
/**
* Allows subclasses to get the {@link HTable}.
+ * @deprecated use {@link #getTable()}
*/
+ @Deprecated
protected HTable getHTable() {
+ return (HTable) getTable();
+ }
+
+ /**
+ * Allows subclasses to get the {@link Table}.
+ */
+ protected Table getTable() {
+ if (table == null) {
+ throw new IllegalStateException(NOT_INITIALIZED);
+ }
return this.table;
}
@@ -167,7 +268,9 @@ implements InputFormat {
* Allows subclasses to set the {@link HTable}.
*
* @param table to get the data from
+ * @deprecated use {@link #initializeTable(Connection,TableName)}
*/
+ @Deprecated
protected void setHTable(HTable table) {
this.table = table;
}
@@ -190,4 +293,40 @@ implements InputFormat {
protected void setRowFilter(Filter rowFilter) {
this.rowFilter = rowFilter;
}
+
+ /**
+ * Handle subclass specific set up.
+ * Each of the entry points used by the MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information and calling
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * Subclasses should implement their initialize call such that it is safe to call multiple times.
+ * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+ * if an initialize call is needed, but this behavior may change in the future. In particular,
+ * it is critical that initializeTable not be called multiple times since this will leak
+ * Connection instances.
+ *
+ */
+ protected void initialize(JobConf job) throws IOException {
+ }
+
+ /**
+ * Close the Table and related objects that were initialized via
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * @throws IOException
+ */
+ protected void closeTable() throws IOException {
+ close(table, connection);
+ table = null;
+ connection = null;
+ }
+
+ private void close(Closeable... closables) throws IOException {
+ for (Closeable c : closables) {
+ if(c != null) { c.close(); }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index 8896eb08389..bc2537b7517 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -175,7 +175,9 @@ implements Configurable {
}
@Override
- protected void initialize() {
+ protected void initialize(JobContext context) throws IOException {
+ // Do we have to worry about mis-matches between the Configuration from setConf and the one
+ // in this context?
TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
try {
initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index 44725dc39f3..26c6a614b09 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -63,38 +63,39 @@ import org.apache.hadoop.util.StringUtils;
* A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
* an {@link Scan} instance that defines the input columns etc. Subclasses may use
* other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
+ * function properly. Each of the entry points to this class used by the MapReduce framework,
+ * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
+ * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information. If your subclass overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
*
* An example of a subclass:
*
- * class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ * class ExampleTIF extends TableInputFormatBase {
*
- * private JobConf job;
- *
- * @Override
- * public void configure(JobConf job) {
- * try {
- * this.job = job;
- * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- * Bytes.toBytes("columnB") };
- * // optional
- * Scan scan = new Scan();
- * for (byte[] family : inputColumns) {
- * scan.addFamily(family);
- * }
- * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- * scan.setFilter(exampleFilter);
- * setScan(scan);
- * } catch (IOException exception) {
- * throw new RuntimeException("Failed to configure for job.", exception);
- * }
- *
- * protected void initialize() {
- * Connection connection =
- * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ * {@literal @}Override
+ * protected void initialize(JobContext context) throws IOException {
+ * // We are responsible for the lifecycle of this connection until we hand it over in
+ * // initializeTable.
+ * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+ * job.getConfiguration()));
* TableName tableName = TableName.valueOf("exampleTable");
- * // mandatory
+ * // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
* initializeTable(connection, tableName);
- * }
+ * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ * Bytes.toBytes("columnB") };
+ * // optional, by default we'll get everything for the table.
+ * Scan scan = new Scan();
+ * for (byte[] family : inputColumns) {
+ * scan.addFamily(family);
+ * }
+ * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ * scan.setFilter(exampleFilter);
+ * setScan(scan);
+ * }
* }
*
*/
@@ -105,6 +106,13 @@ extends InputFormat {
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+ private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
+ "initialized. Ensure you call initializeTable either in your constructor or initialize " +
+ "method";
+ private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
+ " previous error. Please look at the previous logs lines from" +
+ " the task's full log for more details.";
+
/** Holds the details for the internal scanner.
*
* @see Scan */
@@ -141,14 +149,18 @@ extends InputFormat {
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException {
+ // Just in case a subclass is relying on JobConfigurable magic.
if (table == null) {
- initialize();
+ initialize(context);
}
- if (getTable() == null) {
- // initialize() must not have been implemented in the subclass.
- throw new IOException("Cannot create a record reader because of a" +
- " previous error. Please look at the previous logs lines from" +
- " the task's full log for more details.");
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
}
TableSplit tSplit = (TableSplit) split;
LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
@@ -213,13 +225,20 @@ extends InputFormat {
public List getSplits(JobContext context) throws IOException {
boolean closeOnFinish = false;
+ // Just in case a subclass is relying on JobConfigurable magic.
if (table == null) {
- initialize();
+ initialize(context);
closeOnFinish = true;
}
- if (table == null) {
- // initialize() wasn't implemented, so the table is null.
- throw new IOException("No table was provided.");
+
+ // null check in case our child overrides getTable to not throw.
+ try {
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
+ throw new IOException(INITIALIZATION_ERROR);
+ }
+ } catch (IllegalStateException exception) {
+ throw new IOException(INITIALIZATION_ERROR, exception);
}
try {
@@ -293,6 +312,10 @@ extends InputFormat {
}
}
+ /**
+ * @deprecated mistakenly made public in 0.98.7. scope will change to package-private
+ */
+ @Deprecated
public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
@@ -341,7 +364,7 @@ extends InputFormat {
/**
* Allows subclasses to get the {@link HTable}.
*
- * @deprecated
+ * @deprecated use {@link #getTable()}
*/
@Deprecated
protected HTable getHTable() {
@@ -353,7 +376,7 @@ extends InputFormat {
*/
protected RegionLocator getRegionLocator() {
if (regionLocator == null) {
- initialize();
+ throw new IllegalStateException(NOT_INITIALIZED);
}
return regionLocator;
}
@@ -363,7 +386,7 @@ extends InputFormat {
*/
protected Table getTable() {
if (table == null) {
- initialize();
+ throw new IllegalStateException(NOT_INITIALIZED);
}
return table;
}
@@ -373,7 +396,7 @@ extends InputFormat {
*/
protected Admin getAdmin() {
if (admin == null) {
- initialize();
+ throw new IllegalStateException(NOT_INITIALIZED);
}
return admin;
}
@@ -381,6 +404,9 @@ extends InputFormat {
/**
* Allows subclasses to set the {@link HTable}.
*
+ * Will attempt to reuse the underlying Connection for our own needs, including
+ * retreiving an Admin interface to the HBase cluster.
+ *
* @param table The table to get the data from.
* @throws IOException
* @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
@@ -417,6 +443,10 @@ extends InputFormat {
* @throws IOException
*/
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
+ if (table != null || connection != null) {
+ LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
+ "reference; TableInputFormatBase will not close these old references when done.");
+ }
this.table = connection.getTable(tableName);
this.regionLocator = connection.getRegionLocator(tableName);
this.admin = connection.getAdmin();
@@ -453,12 +483,21 @@ extends InputFormat {
}
/**
- * This method will be called when any of the following are referenced, but not yet initialized:
- * admin, regionLocator, table. Subclasses will have the opportunity to call
- * {@link #initializeTable(Connection, TableName)}
+ * Handle subclass specific set up.
+ * Each of the entry points used by the MapReduce framework,
+ * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
+ * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
+ * retrieving the necessary configuration information and calling
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * Subclasses should implement their initialize call such that it is safe to call multiple times.
+ * The current TableInputFormatBase implementation relies on a non-null table reference to decide
+ * if an initialize call is needed, but this behavior may change in the future. In particular,
+ * it is critical that initializeTable not be called multiple times since this will leak
+ * Connection instances.
+ *
*/
- protected void initialize() {
-
+ protected void initialize(JobContext context) throws IOException {
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
index 4626b611ac3..edc8cbe1a63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
@@ -36,6 +36,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
@@ -321,8 +324,30 @@ public class TestTableInputFormat {
LOG.info("testing use of an InputFormat taht extends InputFormatBase");
final Table table = createTable(Bytes.toBytes("exampleTable"),
new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleTIF.class);
+ }
+
+ @Test
+ public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+ + "as it was given in 0.98.");
+ final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleDeprecatedTIF.class);
+ }
+
+ @Test
+ public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+ + "using JobConfigurable.");
+ final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleJobConfigurableTIF.class);
+ }
+
+ void testInputFormat(Class extends InputFormat> clazz) throws IOException {
final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
- job.setInputFormat(ExampleTIF.class);
+ job.setInputFormat(clazz);
job.setOutputFormat(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
@@ -372,13 +397,13 @@ public class TestTableInputFormat {
}
- public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
@Override
public void configure(JobConf job) {
try {
HTable exampleTable = new HTable(HBaseConfiguration.create(job),
- Bytes.toBytes("exampleTable"));
+ Bytes.toBytes("exampleDeprecatedTable"));
// mandatory
setHTable(exampleTable);
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
@@ -395,5 +420,46 @@ public class TestTableInputFormat {
}
+ public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ initialize(job);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to initialize.", exception);
+ }
+ }
+
+ @Override
+ protected void initialize(JobConf job) throws IOException {
+ initialize(job, "exampleJobConfigurableTable");
+ }
+ }
+
+
+ public static class ExampleTIF extends TableInputFormatBase {
+
+ @Override
+ protected void initialize(JobConf job) throws IOException {
+ initialize(job, "exampleTable");
+ }
+
+ protected void initialize(JobConf job, String table) throws IOException {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ TableName tableName = TableName.valueOf(table);
+ // mandatory
+ initializeTable(connection, tableName);
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ // mandatory
+ setInputColumns(inputColumns);
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ // optional
+ setRowFilter(exampleFilter);
+ }
+
+ }
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
index 26029619e46..566a6429c91 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.junit.AfterClass;
@@ -342,6 +343,16 @@ public class TestTableInputFormat {
testInputFormat(ExampleTIF.class);
}
+ @Test
+ public void testJobConfigurableExtensionOfTableInputFormatBase()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
+ "using JobConfigurable.");
+ final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleJobConfigurableTIF.class);
+ }
+
@Test
public void testDeprecatedExtensionOfTableInputFormatBase()
throws IOException, InterruptedException, ClassNotFoundException {
@@ -422,13 +433,43 @@ public class TestTableInputFormat {
}
- public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
- private JobConf job;
+ public static class ExampleJobConfigurableTIF extends TableInputFormatBase
+ implements JobConfigurable {
@Override
public void configure(JobConf job) {
- this.job = job;
+ try {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
+ // mandatory
+ initializeTable(connection, tableName);
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ //optional
+ Scan scan = new Scan();
+ for (byte[] family : inputColumns) {
+ scan.addFamily(family);
+ }
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ scan.setFilter(exampleFilter);
+ setScan(scan);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to initialize.", exception);
+ }
+ }
+ }
+
+
+ public static class ExampleTIF extends TableInputFormatBase {
+
+ @Override
+ protected void initialize(JobContext job) throws IOException {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+ job.getConfiguration()));
+ TableName tableName = TableName.valueOf("exampleTable");
+ // mandatory
+ initializeTable(connection, tableName);
byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
Bytes.toBytes("columnB") };
//optional
@@ -441,22 +482,6 @@ public class TestTableInputFormat {
setScan(scan);
}
- @Override
- protected void initialize() {
- if (job == null) {
- throw new IllegalStateException("must have already gotten the JobConf before initialize " +
- "is called.");
- }
- try {
- Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- TableName tableName = TableName.valueOf("exampleTable");
- // mandatory
- initializeTable(connection, tableName);
- } catch (IOException exception) {
- throw new RuntimeException("Failed to initialize.", exception);
- }
- }
-
}
}