HBASE-11879 Change TableInputFormatBase to take interface arguments (Solomon Duskis)

This commit is contained in:
stack 2014-09-30 21:25:04 -07:00
parent e97b43a16b
commit ff31691c84
4 changed files with 72 additions and 35 deletions

View File

@ -81,7 +81,7 @@ public class MetaScanner {
* null if not interested in a particular table. * null if not interested in a particular table.
* @throws IOException e * @throws IOException e
*/ */
public static void metaScan(Configuration configuration, ClusterConnection connection, public static void metaScan(Configuration configuration, Connection connection,
MetaScannerVisitor visitor, TableName userTableName) throws IOException { MetaScannerVisitor visitor, TableName userTableName) throws IOException {
metaScan(configuration, connection, visitor, userTableName, null, Integer.MAX_VALUE, metaScan(configuration, connection, visitor, userTableName, null, Integer.MAX_VALUE,
TableName.META_TABLE_NAME); TableName.META_TABLE_NAME);
@ -127,7 +127,7 @@ public class MetaScanner {
* @param metaTableName Meta table to scan, root or meta. * @param metaTableName Meta table to scan, root or meta.
* @throws IOException e * @throws IOException e
*/ */
static void metaScan(Configuration configuration, ClusterConnection connection, static void metaScan(Configuration configuration, Connection connection,
final MetaScannerVisitor visitor, final TableName tableName, final MetaScannerVisitor visitor, final TableName tableName,
final byte[] row, final int rowLimit, final TableName metaTableName) final byte[] row, final int rowLimit, final TableName metaTableName)
throws IOException { throws IOException {
@ -278,7 +278,7 @@ public class MetaScanner {
* @throws IOException * @throws IOException
*/ */
public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf, public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf,
ClusterConnection connection, final TableName tableName, Connection connection, final TableName tableName,
final boolean offlined) throws IOException { final boolean offlined) throws IOException {
final NavigableMap<HRegionInfo, ServerName> regions = final NavigableMap<HRegionInfo, ServerName> regions =
new TreeMap<HRegionInfo, ServerName>(); new TreeMap<HRegionInfo, ServerName>();
@ -304,7 +304,7 @@ public class MetaScanner {
* Lists table regions and locations grouped by region range from META. * Lists table regions and locations grouped by region range from META.
*/ */
public static List<RegionLocations> listTableRegionLocations(Configuration conf, public static List<RegionLocations> listTableRegionLocations(Configuration conf,
ClusterConnection connection, final TableName tableName) throws IOException { Connection connection, final TableName tableName) throws IOException {
final List<RegionLocations> regions = new ArrayList<RegionLocations>(); final List<RegionLocations> regions = new ArrayList<RegionLocations>();
MetaScannerVisitor visitor = new TableMetaScannerVisitor(tableName) { MetaScannerVisitor visitor = new TableMetaScannerVisitor(tableName) {
@Override @Override

View File

@ -29,7 +29,11 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -86,8 +90,8 @@ public abstract class MultiTableInputFormatBase extends
+ " previous error. Please look at the previous logs lines from" + " previous error. Please look at the previous logs lines from"
+ " the task's full log for more details."); + " the task's full log for more details.");
} }
Table table = Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
new HTable(context.getConfiguration(), tSplit.getTable()); Table table = connection.getTable(tSplit.getTable());
TableRecordReader trr = this.tableRecordReader; TableRecordReader trr = this.tableRecordReader;
@ -100,10 +104,11 @@ public abstract class MultiTableInputFormatBase extends
sc.setStartRow(tSplit.getStartRow()); sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow()); sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc); trr.setScan(sc);
trr.setHTable(table); trr.setTable(table);
} catch (IOException ioe) { } catch (IOException ioe) {
// If there is an exception make sure that all // If there is an exception make sure that all
// resources are closed and released. // resources are closed and released.
connection.close();
table.close(); table.close();
trr.close(); trr.close();
throw ioe; throw ioe;
@ -128,31 +133,38 @@ public abstract class MultiTableInputFormatBase extends
List<InputSplit> splits = new ArrayList<InputSplit>(); List<InputSplit> splits = new ArrayList<InputSplit>();
for (Scan scan : scans) { for (Scan scan : scans) {
byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
if (tableName == null) if (tableNameBytes == null)
throw new IOException("A scan object did not have a table name"); throw new IOException("A scan object did not have a table name");
HTable table = null; TableName tableName = TableName.valueOf(tableNameBytes);
Table table = null;
RegionLocator regionLocator = null;
Connection conn = null;
try { try {
table = new HTable(context.getConfiguration(), TableName.valueOf(tableName)); conn = ConnectionFactory.createConnection(context.getConfiguration());
Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); table = conn.getTable(tableName);
regionLocator = conn.getRegionLocator(tableName);
regionLocator = (RegionLocator) table;
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
if (keys == null || keys.getFirst() == null || if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) { keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region for table : " throw new IOException("Expecting at least one region for table : "
+ Bytes.toString(tableName)); + tableName.getNameAsString());
} }
int count = 0; int count = 0;
byte[] startRow = scan.getStartRow(); byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow(); byte[] stopRow = scan.getStopRow();
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table);
for (int i = 0; i < keys.getFirst().length; i++) { for (int i = 0; i < keys.getFirst().length; i++) {
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue; continue;
} }
HRegionLocation hregionLocation = table.getRegionLocation(keys.getFirst()[i], false); HRegionLocation hregionLocation = regionLocator.getRegionLocation(
keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname(); String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo(); HRegionInfo regionInfo = hregionLocation.getRegionInfo();
@ -182,6 +194,8 @@ public abstract class MultiTableInputFormatBase extends
} }
} finally { } finally {
if (null != table) table.close(); if (null != table) table.close();
if (null != regionLocator) regionLocator.close();
if (null != conn) conn.close();
} }
} }
return splits; return splits;

View File

@ -34,9 +34,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -52,8 +56,8 @@ import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* A base for {@link TableInputFormat}s. Receives a {@link HTable}, an * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
* {@link Scan} instance that defines the input columns etc. Subclasses may use * an {@link Scan} instance that defines the input columns etc. Subclasses may use
* other TableRecordReader implementations. * other TableRecordReader implementations.
* <p> * <p>
* An example of a subclass: * An example of a subclass:
@ -61,10 +65,11 @@ import org.apache.hadoop.util.StringUtils;
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable { * class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
* *
* public void configure(JobConf job) { * public void configure(JobConf job) {
* HTable exampleTable = new HTable(HBaseConfiguration.create(job), * Connection connection =
* Bytes.toBytes("exampleTable")); * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
* TableName tableName = TableName.valueOf("exampleTable");
* // mandatory * // mandatory
* setHTable(exampleTable); * initializeTable(connection, tableName);
* Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"), * Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"),
* Bytes.toBytes("cf2") }; * Bytes.toBytes("cf2") };
* // mandatory * // mandatory
@ -86,10 +91,14 @@ extends InputFormat<ImmutableBytesWritable, Result> {
final Log LOG = LogFactory.getLog(TableInputFormatBase.class); final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
/** Holds the details for the internal scanner. */ /** Holds the details for the internal scanner.
*
* @see Scan */
private Scan scan = null; private Scan scan = null;
/** The table to scan. */ /** The {@link Table} to scan. */
private HTable table = null; private Table table;
/** The {@link RegionLocator} of the table. */
private RegionLocator regionLocator;
/** The reader scanning the table, can be a custom one. */ /** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null; private TableRecordReader tableRecordReader = null;
@ -102,7 +111,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
private String nameServer = null; private String nameServer = null;
/** /**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
* the default. * the default.
* *
* @param split The split to work with. * @param split The split to work with.
@ -133,7 +142,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
sc.setStartRow(tSplit.getStartRow()); sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow()); sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc); trr.setScan(sc);
trr.setHTable(table); trr.setTable(table);
return trr; return trr;
} }
@ -156,12 +165,12 @@ extends InputFormat<ImmutableBytesWritable, Result> {
this.nameServer = this.nameServer =
context.getConfiguration().get("hbase.nameserver.address", null); context.getConfiguration().get("hbase.nameserver.address", null);
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table);
Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
if (keys == null || keys.getFirst() == null || if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) { keys.getFirst().length == 0) {
HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
if (null == regLoc) { if (null == regLoc) {
throw new IOException("Expecting at least one region."); throw new IOException("Expecting at least one region.");
} }
@ -178,7 +187,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue; continue;
} }
HRegionLocation location = table.getRegionLocation(keys.getFirst()[i], false); HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false);
// The below InetSocketAddress creation does a name resolution. // The below InetSocketAddress creation does a name resolution.
InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
if (isa.isUnresolved()) { if (isa.isUnresolved()) {
@ -268,18 +277,24 @@ extends InputFormat<ImmutableBytesWritable, Result> {
/** /**
* Allows subclasses to get the {@link HTable}. * Allows subclasses to get the {@link HTable}.
*
* @deprecated Use {@link #getTable()} and {@link #getRegionLocator()} instead.
*/ */
@Deprecated
protected HTable getHTable() { protected HTable getHTable() {
return this.table; return (HTable) this.table;
} }
/** /**
* Allows subclasses to set the {@link HTable}. * Allows subclasses to set the {@link HTable}.
* *
* @param table The table to get the data from. * @param table The {@link HTable} to get the data from.
* @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
*/ */
@Deprecated
protected void setHTable(HTable table) { protected void setHTable(HTable table) {
this.table = table; this.table = table;
this.regionLocator = table;
} }
/** /**

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -57,9 +56,18 @@ extends RecordReader<ImmutableBytesWritable, Result> {
* Sets the HBase table. * Sets the HBase table.
* *
* @param htable The {@link HTable} to scan. * @param htable The {@link HTable} to scan.
* @deprecated Use setTable() instead.
*/ */
@Deprecated
public void setHTable(Table htable) { public void setHTable(Table htable) {
this.recordReaderImpl.setHTable(htable); this.setTable(htable);
}
/**
* @param table the {@link Table} to scan.
*/
public void setTable(Table table) {
this.recordReaderImpl.setHTable(table);
} }
/** /**