HBASE-17092 Both LoadIncrementalHFiles#doBulkLoad() methods should set return value
This commit is contained in:
parent
dcf03b32f4
commit
c7bdb3017f
|
@ -134,6 +134,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private int nrThreads;
|
private int nrThreads;
|
||||||
private RpcControllerFactory rpcControllerFactory;
|
private RpcControllerFactory rpcControllerFactory;
|
||||||
|
|
||||||
|
private Map<LoadQueueItem, ByteBuffer> retValue = null;
|
||||||
|
|
||||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.rpcControllerFactory = new RpcControllerFactory(conf);
|
this.rpcControllerFactory = new RpcControllerFactory(conf);
|
||||||
|
@ -362,10 +364,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
* @param regionLocator region locator
|
* @param regionLocator region locator
|
||||||
* @param silence true to ignore unmatched column families
|
* @param silence true to ignore unmatched column families
|
||||||
* @param copyFile always copy hfiles if true
|
* @param copyFile always copy hfiles if true
|
||||||
* @return Map of LoadQueueItem to region
|
|
||||||
* @throws TableNotFoundException if table does not yet exist
|
* @throws TableNotFoundException if table does not yet exist
|
||||||
*/
|
*/
|
||||||
public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
|
public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
|
||||||
Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
|
Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
|
||||||
throws TableNotFoundException, IOException {
|
throws TableNotFoundException, IOException {
|
||||||
if (!admin.isTableAvailable(regionLocator.getName())) {
|
if (!admin.isTableAvailable(regionLocator.getName())) {
|
||||||
|
@ -380,7 +381,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
prepareHFileQueue(map, table, queue, silence);
|
prepareHFileQueue(map, table, queue, silence);
|
||||||
if (queue.isEmpty()) {
|
if (queue.isEmpty()) {
|
||||||
LOG.warn("Bulk load operation did not get any files to load");
|
LOG.warn("Bulk load operation did not get any files to load");
|
||||||
return null;
|
return;
|
||||||
}
|
}
|
||||||
pool = createExecutorService();
|
pool = createExecutorService();
|
||||||
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
|
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
|
||||||
|
@ -390,7 +391,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
|
retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
|
||||||
} finally {
|
} finally {
|
||||||
cleanup(admin, queue, pool, secureClient);
|
cleanup(admin, queue, pool, secureClient);
|
||||||
}
|
}
|
||||||
|
@ -443,7 +444,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
pool = createExecutorService();
|
pool = createExecutorService();
|
||||||
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
|
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
|
||||||
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
|
retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
|
||||||
} finally {
|
} finally {
|
||||||
cleanup(admin, queue, pool, secureClient);
|
cleanup(admin, queue, pool, secureClient);
|
||||||
}
|
}
|
||||||
|
@ -1232,10 +1233,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
|
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
|
||||||
if (dirPath != null) {
|
if (dirPath != null) {
|
||||||
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
|
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
|
||||||
return null;
|
|
||||||
} else {
|
} else {
|
||||||
return doBulkLoad(map, admin, table, locator, silence, copyFiles);
|
doBulkLoad(map, admin, table, locator, silence, copyFiles);
|
||||||
}
|
}
|
||||||
|
return retValue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue