HBASE-16821 Enhance LoadIncrementalHFiles API to convey missing hfiles if any

This commit is contained in:
tedyu 2016-10-14 09:07:38 -07:00
parent 07086036a5
commit 39d43ab779
3 changed files with 70 additions and 40 deletions

View File

@ -362,9 +362,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @param copyFile always copy hfiles if true
* @return List of filenames which were not found
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
public List<String> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence, boolean copyFile)
throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
@ -379,7 +380,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
prepareHFileQueue(map, table, queue, silence);
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not get any files to load");
return;
return null;
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
@ -389,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
break;
}
}
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@ -448,7 +449,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
List<String> performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue, ExecutorService pool,
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0;
@ -463,6 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = null;
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
@ -482,8 +484,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
count++;
// Using ByteBuffer for byte[] equality semantics
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
pool, queue, startEndKeys);
pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
// Error is logged inside checkHFilesCountPerRegionPerFamily.
@ -502,6 +504,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
+ "Please check log for more details.");
}
if (pair == null) return null;
return pair.getSecond();
}
/**
@ -625,7 +629,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups =
groupOrSplitPhase(table, pool, queue, startEndKeys);
groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
} finally {
if (pool != null) {
@ -709,25 +713,34 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
* @return A map that groups LQI by likely bulk load region targets.
* @param table the table to load into
* @param pool the ExecutorService
* @param queue the queue for LoadQueueItem
* @param startEndKeys start and end keys
* @return A map that groups LQI by likely bulk load region targets and List of missing hfiles.
*/
private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
ExecutorService pool, Deque<LoadQueueItem> queue,
private Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase(
final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
List<String> missingHFiles = new ArrayList<>();
Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = new Pair<>(regionGroups,
missingHFiles);
// drain LQIs and figure out bulk load groups
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<>();
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
final Callable<Pair<List<LoadQueueItem>, String>> call =
new Callable<Pair<List<LoadQueueItem>, String>>() {
@Override
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
public Pair<List<LoadQueueItem>, String> call() throws Exception {
Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
startEndKeys);
return splits;
}
};
@ -735,11 +748,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
// get all the results. All grouping and splitting must finish before
// we can attempt the atomic loads.
for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
try {
List<LoadQueueItem> splits = lqis.get();
Pair<List<LoadQueueItem>, String> splits = lqis.get();
if (splits != null) {
queue.addAll(splits);
if (splits.getFirst() != null) {
queue.addAll(splits.getFirst());
} else {
missingHFiles.add(splits.getSecond());
}
}
} catch (ExecutionException e1) {
Throwable t = e1.getCause();
@ -754,7 +771,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
}
}
return regionGroups;
return pair;
}
// unique file name for the table
@ -817,17 +834,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* protected for testing
* @throws IOException if an IO failure is encountered
*/
protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys)
throws IOException {
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Path hfilePath = item.hfilePath;
// fs is the source filesystem
if (fs == null) {
fs = hfilePath.getFileSystem(getConf());
}
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()), getConf());
HFile.Reader hfr = null;
try {
hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()), getConf());
} catch (FileNotFoundException fnfe) {
LOG.debug("encountered", fnfe);
return new Pair<>(null, hfilePath.getName());
}
final byte[] first, last;
try {
hfr.loadFileInfo();
@ -890,7 +912,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> lqis = splitStoreFile(item, table,
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
return lqis;
return new Pair<>(lqis, null);
}
// group regions.
@ -1171,7 +1193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.info("Table "+ tableName +" is available!!");
}
public int run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
public List<String> run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) {
@ -1197,13 +1219,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
return null;
} else {
doBulkLoad(map, admin, table, locator, silence, copyFiles);
return doBulkLoad(map, admin, table, locator, silence, copyFiles);
}
}
}
return 0;
}
@Override
@ -1215,7 +1236,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
return run(dirPath, null, tableName);
List<String> missingHFiles = run(dirPath, null, tableName);
if (missingHFiles == null) return 0;
return -1;
}
public static void main(String[] args) throws Exception {

View File

@ -323,12 +323,14 @@ public class TestLoadIncrementalHFiles {
map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
map.put(FAMILY, list);
}
Path last = null;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
Path path = new Path(familyDir, "hfile_" + hfileIdx++);
HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
if (useMap) {
last = path;
list.add(path);
}
}
@ -346,7 +348,10 @@ public class TestLoadIncrementalHFiles {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()};
if (useMap) {
loader.run(null, map, tableName);
fs.delete(last);
List<String> missingHFiles = loader.run(null, map, tableName);
expectedRows -= 1000;
assertTrue(missingHFiles.contains(last.getName()));
} else {
loader.run(args);
}

View File

@ -404,13 +404,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
@Override
protected List<LoadQueueItem> groupOrSplit(
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) {
countedLqis.addAndGet(lqis.size());
Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
startEndKeys);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
return lqis;
}
@ -479,7 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
int i = 0;
@Override
protected List<LoadQueueItem> groupOrSplit(
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
@ -521,13 +522,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected List<LoadQueueItem> groupOrSplit(
protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
if (lqis != null) {
countedLqis.addAndGet(lqis.size());
Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
startEndKeys);
if (lqis != null && lqis.getFirst() != null) {
countedLqis.addAndGet(lqis.getFirst().size());
}
return lqis;
}