HBASE-13855 Race in multi threaded PartitionedMobCompactor causes NPE. (Jingcheng)
This commit is contained in:
parent
26893aa451
commit
faefb9073f
|
@ -211,14 +211,22 @@ public class PartitionedMobCompactor extends MobCompactor {
|
||||||
}
|
}
|
||||||
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
|
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
|
||||||
List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
|
List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
|
||||||
|
List<Path> paths = null;
|
||||||
|
try {
|
||||||
for (Path newDelPath : newDelPaths) {
|
for (Path newDelPath : newDelPaths) {
|
||||||
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
|
StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
|
||||||
|
// pre-create reader of a del file to avoid race condition when opening the reader in each
|
||||||
|
// partition.
|
||||||
|
sf.createReader();
|
||||||
newDelFiles.add(sf);
|
newDelFiles.add(sf);
|
||||||
}
|
}
|
||||||
LOG.info("After merging, there are " + newDelFiles.size() + " del files");
|
LOG.info("After merging, there are " + newDelFiles.size() + " del files");
|
||||||
// compact the mob files by partitions.
|
// compact the mob files by partitions.
|
||||||
List<Path> paths = compactMobFiles(request, newDelFiles);
|
paths = compactMobFiles(request, newDelFiles);
|
||||||
LOG.info("After compaction, there are " + paths.size() + " mob files");
|
LOG.info("After compaction, there are " + paths.size() + " mob files");
|
||||||
|
} finally {
|
||||||
|
closeStoreFileReaders(newDelFiles);
|
||||||
|
}
|
||||||
// archive the del files if all the mob files are selected.
|
// archive the del files if all the mob files are selected.
|
||||||
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
|
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
|
||||||
LOG.info("After a mob compaction with all files selected, archiving the del files "
|
LOG.info("After a mob compaction with all files selected, archiving the del files "
|
||||||
|
@ -336,6 +344,20 @@ public class PartitionedMobCompactor extends MobCompactor {
|
||||||
return newFiles;
|
return newFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the readers of store files.
|
||||||
|
* @param storeFiles The store files to be closed.
|
||||||
|
*/
|
||||||
|
private void closeStoreFileReaders(List<StoreFile> storeFiles) {
|
||||||
|
for (StoreFile storeFile : storeFiles) {
|
||||||
|
try {
|
||||||
|
storeFile.closeReader(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to close the reader on store file " + storeFile.getPath(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compacts a partition of selected small mob files and all the del files in a batch.
|
* Compacts a partition of selected small mob files and all the del files in a batch.
|
||||||
* @param request The compaction request.
|
* @param request The compaction request.
|
||||||
|
@ -415,6 +437,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
||||||
}
|
}
|
||||||
// archive the old mob files, do not archive the del files.
|
// archive the old mob files, do not archive the del files.
|
||||||
try {
|
try {
|
||||||
|
closeStoreFileReaders(mobFilesToCompact);
|
||||||
MobUtils
|
MobUtils
|
||||||
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
|
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue