HBASE-13985 Add configuration to skip validating HFile format when bulk loading (Victor Xu)
This commit is contained in:
parent
643ba90185
commit
d7178aa27c
|
@ -169,6 +169,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
*/
|
*/
|
||||||
private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
|
private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
|
||||||
final BulkHFileVisitor<TFamily> visitor) throws IOException {
|
final BulkHFileVisitor<TFamily> visitor) throws IOException {
|
||||||
|
visitBulkHFiles(fs, bulkDir, visitor, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iterate over the bulkDir hfiles.
|
||||||
|
* Skip reference, HFileLink, files starting with "_".
|
||||||
|
* Check and skip non-valid hfiles by default, or skip this validation by setting
|
||||||
|
* 'hbase.loadincremental.validate.hfile' to false.
|
||||||
|
*/
|
||||||
|
private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
|
||||||
|
final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile) throws IOException {
|
||||||
if (!fs.exists(bulkDir)) {
|
if (!fs.exists(bulkDir)) {
|
||||||
throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
|
throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
|
||||||
}
|
}
|
||||||
|
@ -209,16 +220,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate HFile Format
|
// Validate HFile Format if needed
|
||||||
try {
|
if (validateHFile) {
|
||||||
if (!HFile.isHFileFormat(fs, hfile)) {
|
try {
|
||||||
LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
|
if (!HFile.isHFileFormat(fs, hfile)) {
|
||||||
|
LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.warn("the file " + hfile + " was removed");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException e) {
|
}
|
||||||
LOG.warn("the file " + hfile + " was removed");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
visitor.bulkHFile(family, hfileStatus);
|
visitor.bulkHFile(family, hfileStatus);
|
||||||
}
|
}
|
||||||
|
@ -252,8 +265,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
* Walk the given directory for all HFiles, and return a Queue
|
* Walk the given directory for all HFiles, and return a Queue
|
||||||
* containing all such files.
|
* containing all such files.
|
||||||
*/
|
*/
|
||||||
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
|
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
|
||||||
throws IOException {
|
final boolean validateHFile) throws IOException {
|
||||||
fs = hfofDir.getFileSystem(getConf());
|
fs = hfofDir.getFileSystem(getConf());
|
||||||
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
|
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -270,7 +283,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
ret.add(new LoadQueueItem(family, hfile.getPath()));
|
ret.add(new LoadQueueItem(family, hfile.getPath()));
|
||||||
}
|
}
|
||||||
});
|
}, validateHFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -323,7 +336,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
// happen in this thread
|
// happen in this thread
|
||||||
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
|
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
|
||||||
try {
|
try {
|
||||||
discoverLoadQueue(queue, hfofDir);
|
/*
|
||||||
|
* Checking hfile format is a time-consuming operation, we should have an option to skip
|
||||||
|
* this step when bulkloading millions of HFiles. See HBASE-13985.
|
||||||
|
*/
|
||||||
|
boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
|
||||||
|
if(!validateHFile) {
|
||||||
|
LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
|
||||||
|
"are not correct. If you fail to read data from your table after using this " +
|
||||||
|
"option, consider removing the files and bulkload again without this option. " +
|
||||||
|
"See HBASE-13985");
|
||||||
|
}
|
||||||
|
discoverLoadQueue(queue, hfofDir, validateHFile);
|
||||||
// check whether there is invalid family name in HFiles to be bulkloaded
|
// check whether there is invalid family name in HFiles to be bulkloaded
|
||||||
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
|
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
|
||||||
ArrayList<String> familyNames = new ArrayList<String>(families.size());
|
ArrayList<String> familyNames = new ArrayList<String>(families.size());
|
||||||
|
|
Loading…
Reference in New Issue