HBASE-13328 LoadIncrementalHFile.doBulkLoad(Path,HTable) should handle managed connections

This commit is contained in:
Enis Soztutar 2015-03-27 14:49:58 -07:00
parent 98b1e72d1c
commit 0a500e5d30
1 changed files with 15 additions and 1 deletions

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
@ -245,6 +246,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
this.hfilePath = hfilePath; this.hfilePath = hfilePath;
} }
@Override
public String toString() { public String toString() {
return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
} }
@ -288,7 +290,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void doBulkLoad(Path hfofDir, final HTable table) public void doBulkLoad(Path hfofDir, final HTable table)
throws TableNotFoundException, IOException throws TableNotFoundException, IOException
{ {
doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); Admin admin = null;
try {
try {
admin = table.getConnection().getAdmin();
} catch (NeedUnmanagedConnectionException ex) {
admin = new HBaseAdmin(table.getConfiguration());
}
doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
} finally {
admin.close();
}
} }
/** /**
@ -436,6 +448,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final Collection<LoadQueueItem> lqis = e.getValue(); final Collection<LoadQueueItem> lqis = e.getValue();
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
@Override
public List<LoadQueueItem> call() throws Exception { public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> toRetry = List<LoadQueueItem> toRetry =
tryAtomicRegionLoad(conn, table.getName(), first, lqis); tryAtomicRegionLoad(conn, table.getName(), first, lqis);
@ -512,6 +525,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final LoadQueueItem item = queue.remove(); final LoadQueueItem item = queue.remove();
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
@Override
public List<LoadQueueItem> call() throws Exception { public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys); List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
return splits; return splits;