HBASE-13253 LoadIncrementalHFiles unify hfiles discovery
This commit is contained in:
parent
99ec366147
commit
f9a17edc25
|
@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -160,6 +159,75 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
+ "\n");
|
||||
}
|
||||
|
||||
private static interface BulkHFileVisitor<TFamily> {
|
||||
TFamily bulkFamily(final byte[] familyName)
|
||||
throws IOException;
|
||||
void bulkHFile(final TFamily family, final FileStatus hfileStatus)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate over the bulkDir hfiles.
|
||||
* Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
|
||||
*/
|
||||
private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
|
||||
final BulkHFileVisitor<TFamily> visitor) throws IOException {
|
||||
if (!fs.exists(bulkDir)) {
|
||||
throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
|
||||
}
|
||||
|
||||
FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
|
||||
if (familyDirStatuses == null) {
|
||||
throw new FileNotFoundException("No families found in " + bulkDir);
|
||||
}
|
||||
|
||||
for (FileStatus familyStat : familyDirStatuses) {
|
||||
if (!familyStat.isDirectory()) {
|
||||
LOG.warn("Skipping non-directory " + familyStat.getPath());
|
||||
continue;
|
||||
}
|
||||
Path familyDir = familyStat.getPath();
|
||||
byte[] familyName = familyDir.getName().getBytes();
|
||||
TFamily family = visitor.bulkFamily(familyName);
|
||||
|
||||
FileStatus[] hfileStatuses = fs.listStatus(familyDir);
|
||||
for (FileStatus hfileStatus : hfileStatuses) {
|
||||
if (!fs.isFile(hfileStatus.getPath())) {
|
||||
LOG.warn("Skipping non-file " + hfileStatus);
|
||||
continue;
|
||||
}
|
||||
|
||||
Path hfile = hfileStatus.getPath();
|
||||
// Skip "_", reference, HFileLink
|
||||
String fileName = hfile.getName();
|
||||
if (fileName.startsWith("_")) {
|
||||
continue;
|
||||
}
|
||||
if (StoreFileInfo.isReference(fileName)) {
|
||||
LOG.warn("Skipping reference " + fileName);
|
||||
continue;
|
||||
}
|
||||
if (HFileLink.isHFileLink(fileName)) {
|
||||
LOG.warn("Skipping HFileLink " + fileName);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Validate HFile Format
|
||||
try {
|
||||
if (!HFile.isHFileFormat(fs, hfile)) {
|
||||
LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
|
||||
continue;
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.warn("the file " + hfile + " was removed");
|
||||
continue;
|
||||
}
|
||||
|
||||
visitor.bulkHFile(family, hfileStatus);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents an HFile waiting to be loaded. An queue is used
|
||||
* in this class in order to support the case where a region has
|
||||
|
@ -186,54 +254,25 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* Walk the given directory for all HFiles, and return a Queue
|
||||
* containing all such files.
|
||||
*/
|
||||
private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
|
||||
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
|
||||
throws IOException {
|
||||
fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
if (!fs.exists(hfofDir)) {
|
||||
throw new FileNotFoundException("HFileOutputFormat dir " +
|
||||
hfofDir + " not found");
|
||||
}
|
||||
|
||||
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
|
||||
if (familyDirStatuses == null) {
|
||||
throw new FileNotFoundException("No families found in " + hfofDir);
|
||||
}
|
||||
|
||||
for (FileStatus stat : familyDirStatuses) {
|
||||
if (!stat.isDirectory()) {
|
||||
LOG.warn("Skipping non-directory " + stat.getPath());
|
||||
continue;
|
||||
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
|
||||
@Override
|
||||
public byte[] bulkFamily(final byte[] familyName) {
|
||||
return familyName;
|
||||
}
|
||||
Path familyDir = stat.getPath();
|
||||
byte[] family = familyDir.getName().getBytes();
|
||||
FileStatus[] hfileStatuses = fs.listStatus(familyDir);
|
||||
for (FileStatus hfileStatus : hfileStatuses) {
|
||||
if (!hfileStatus.isFile()) {
|
||||
LOG.warn("Skipping non-file " + hfileStatus);
|
||||
continue;
|
||||
}
|
||||
long length = hfileStatus.getLen();
|
||||
Path hfile = hfileStatus.getPath();
|
||||
// Skip "_", reference, HFileLink
|
||||
String fileName = hfile.getName();
|
||||
if (fileName.startsWith("_")) continue;
|
||||
if (StoreFileInfo.isReference(fileName)) {
|
||||
LOG.warn("Skipping reference " + fileName);
|
||||
continue;
|
||||
}
|
||||
if (HFileLink.isHFileLink(fileName)) {
|
||||
LOG.warn("Skipping HFileLink " + fileName);
|
||||
continue;
|
||||
}
|
||||
if(length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
@Override
|
||||
public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
|
||||
long length = hfile.getLen();
|
||||
if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE)) {
|
||||
LOG.warn("Trying to bulk load hfile " + hfofDir.toString() + " with size: " +
|
||||
LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
|
||||
length + " bytes can be problematic as it may lead to oversplitting.");
|
||||
}
|
||||
ret.add(new LoadQueueItem(family, hfile));
|
||||
ret.add(new LoadQueueItem(family, hfile.getPath()));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -297,20 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
LoadQueueItem lqi = queueIter.next();
|
||||
String familyNameInHFile = Bytes.toString(lqi.family);
|
||||
if (!familyNames.contains(familyNameInHFile)) {
|
||||
boolean isValid = false;
|
||||
try {
|
||||
isValid = HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath);
|
||||
if (!isValid) {
|
||||
LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping");
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.warn("the file " + lqi + " was removed");
|
||||
}
|
||||
if (isValid) {
|
||||
unmatchedFamilies.add(familyNameInHFile);
|
||||
} else {
|
||||
queueIter.remove();
|
||||
}
|
||||
unmatchedFamilies.add(familyNameInHFile);
|
||||
}
|
||||
}
|
||||
if (unmatchedFamilies.size() > 0) {
|
||||
|
@ -864,46 +890,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* More modifications necessary if we want to avoid doing it.
|
||||
*/
|
||||
private void createTable(TableName tableName, String dirPath) throws Exception {
|
||||
Path hfofDir = new Path(dirPath);
|
||||
FileSystem fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
if (!fs.exists(hfofDir)) {
|
||||
throw new FileNotFoundException("HFileOutputFormat dir " +
|
||||
hfofDir + " not found");
|
||||
}
|
||||
|
||||
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
|
||||
if (familyDirStatuses == null) {
|
||||
throw new FileNotFoundException("No families found in " + hfofDir);
|
||||
}
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd;
|
||||
final Path hfofDir = new Path(dirPath);
|
||||
final FileSystem fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
// Add column families
|
||||
// Build a set of keys
|
||||
byte[][] keys;
|
||||
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
for (FileStatus stat : familyDirStatuses) {
|
||||
if (!stat.isDirectory()) {
|
||||
LOG.warn("Skipping non-directory " + stat.getPath());
|
||||
continue;
|
||||
final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
|
||||
@Override
|
||||
public HColumnDescriptor bulkFamily(final byte[] familyName) {
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(familyName);
|
||||
htd.addFamily(hcd);
|
||||
return hcd;
|
||||
}
|
||||
Path familyDir = stat.getPath();
|
||||
byte[] family = familyDir.getName().getBytes();
|
||||
|
||||
hcd = new HColumnDescriptor(family);
|
||||
htd.addFamily(hcd);
|
||||
|
||||
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
|
||||
for (Path hfile : hfiles) {
|
||||
String fileName = hfile.getName();
|
||||
if (fileName.startsWith("_") || StoreFileInfo.isReference(fileName)
|
||||
|| HFileLink.isHFileLink(fileName)) continue;
|
||||
@Override
|
||||
public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
|
||||
throws IOException {
|
||||
Path hfile = hfileStatus.getPath();
|
||||
HFile.Reader reader = HFile.createReader(fs, hfile,
|
||||
new CacheConfig(getConf()), getConf());
|
||||
final byte[] first, last;
|
||||
try {
|
||||
if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
|
||||
hcd.setCompressionType(reader.getFileContext().getCompression());
|
||||
|
@ -911,8 +917,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
" for family " + hcd.toString());
|
||||
}
|
||||
reader.loadFileInfo();
|
||||
first = reader.getFirstRowKey();
|
||||
last = reader.getLastRowKey();
|
||||
byte[] first = reader.getFirstRowKey();
|
||||
byte[] last = reader.getLastRowKey();
|
||||
|
||||
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
|
||||
" first=" + Bytes.toStringBinary(first) +
|
||||
|
@ -924,13 +930,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
|
||||
value = map.containsKey(last)? map.get(last):0;
|
||||
map.put(last, value-1);
|
||||
} finally {
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
keys = LoadIncrementalHFiles.inferBoundaries(map);
|
||||
byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
|
||||
this.hbAdmin.createTable(htd,keys);
|
||||
|
||||
LOG.info("Table "+ tableName +" is available!!");
|
||||
|
|
|
@ -131,7 +131,7 @@ public class TestLoadIncrementalHFiles {
|
|||
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test loading into a column family that has a ROWCOL bloom filter.
|
||||
*/
|
||||
|
@ -271,7 +271,7 @@ public class TestLoadIncrementalHFiles {
|
|||
file.getPath().getName() != "DONOTERASE");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
|
||||
|
@ -307,20 +307,30 @@ public class TestLoadIncrementalHFiles {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
|
||||
testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testNonHfileFolder() throws Exception {
|
||||
testNonHfileFolder("testNonHfileFolder", false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a random data file and a non-file in a dir with a valid family name
|
||||
* but not part of the table families. we should we able to bulkload without
|
||||
* getting the unmatched family exception. HBASE-13037/HBASE-13227
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
|
||||
Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName");
|
||||
private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
|
||||
Path dir = util.getDataTestDirOnTestFS(tableName);
|
||||
FileSystem fs = util.getTestFileSystem();
|
||||
dir = dir.makeQualified(fs);
|
||||
|
||||
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
|
||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
|
||||
FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
|
||||
createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
|
||||
|
||||
final String NON_FAMILY_FOLDER = "_logs";
|
||||
Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
|
||||
|
@ -330,10 +340,13 @@ public class TestLoadIncrementalHFiles {
|
|||
|
||||
Table table = null;
|
||||
try {
|
||||
final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName";
|
||||
table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY);
|
||||
if (preCreateTable) {
|
||||
table = util.createTable(TableName.valueOf(tableName), FAMILY);
|
||||
} else {
|
||||
table = util.getConnection().getTable(TableName.valueOf(tableName));
|
||||
}
|
||||
|
||||
final String[] args = {dir.toString(), TABLE_NAME};
|
||||
final String[] args = {dir.toString(), tableName};
|
||||
new LoadIncrementalHFiles(util.getConfiguration()).run(args);
|
||||
assertEquals(500, util.countRows(table));
|
||||
} finally {
|
||||
|
@ -421,8 +434,8 @@ public class TestLoadIncrementalHFiles {
|
|||
*
|
||||
* Should be inferred as:
|
||||
* a-----------------k m-------------q r--------------t u---------x
|
||||
*
|
||||
* The output should be (m,r,u)
|
||||
*
|
||||
* The output should be (m,r,u)
|
||||
*/
|
||||
|
||||
String first;
|
||||
|
@ -430,7 +443,7 @@ public class TestLoadIncrementalHFiles {
|
|||
|
||||
first = "a"; last = "e";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
|
||||
first = "r"; last = "s";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
|
@ -451,14 +464,14 @@ public class TestLoadIncrementalHFiles {
|
|||
|
||||
first = "s"; last = "t";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
|
||||
first = "u"; last = "w";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
|
||||
byte[][] compare = new byte[3][];
|
||||
compare[0] = "m".getBytes();
|
||||
compare[1] = "r".getBytes();
|
||||
compare[1] = "r".getBytes();
|
||||
compare[2] = "u".getBytes();
|
||||
|
||||
assertEquals(keysArray.length, 3);
|
||||
|
|
Loading…
Reference in New Issue