From 0a500e5d305b0c75a6a357a5ff7a9210a615a007 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Fri, 27 Mar 2015 14:49:58 -0700 Subject: [PATCH] HBASE-13328 LoadIncrementalHFile.doBulkLoad(Path,HTable) should handle managed connections --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index d96c9e409cb..1a257b1cb02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; @@ -245,6 +246,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.hfilePath = hfilePath; } + @Override public String toString() { return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); } @@ -288,7 +290,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); + Admin admin = null; + try { + try { + admin = table.getConnection().getAdmin(); + } catch (NeedUnmanagedConnectionException ex) { + admin = new HBaseAdmin(table.getConfiguration()); + } + doBulkLoad(hfofDir, admin, table, table.getRegionLocator()); + } finally { + admin.close(); + } } /** @@ -436,6 +448,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final Collection lqis = e.getValue(); final Callable> call = new Callable>() { + @Override public List call() throws Exception { List toRetry = tryAtomicRegionLoad(conn, table.getName(), first, lqis); @@ -512,6 +525,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final LoadQueueItem item = queue.remove(); final Callable> call = new Callable>() { + @Override public List call() throws Exception { List splits = groupOrSplit(regionGroups, item, table, startEndKeys); return splits;