HBASE-13037 LoadIncrementalHFile should try to verify the content of unmatched families
This commit is contained in:
parent
3e10e6e1a6
commit
a98898cf41
|
@ -532,6 +532,47 @@ public class HFile {
|
||||||
return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
|
return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the specified file has a valid HFile Trailer.
|
||||||
|
* @param fs filesystem
|
||||||
|
* @param path Path to file to verify
|
||||||
|
* @return true if the file has a valid HFile Trailer, otherwise false
|
||||||
|
* @throws IOException if failed to read from the underlying stream
|
||||||
|
*/
|
||||||
|
public static boolean isHFileFormat(final FileSystem fs, final Path path) throws IOException {
|
||||||
|
return isHFileFormat(fs, fs.getFileStatus(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the specified file has a valid HFile Trailer.
|
||||||
|
* @param fs filesystem
|
||||||
|
* @param fileStatus the file to verify
|
||||||
|
* @return true if the file has a valid HFile Trailer, otherwise false
|
||||||
|
* @throws IOException if failed to read from the underlying stream
|
||||||
|
*/
|
||||||
|
public static boolean isHFileFormat(final FileSystem fs, final FileStatus fileStatus)
|
||||||
|
throws IOException {
|
||||||
|
final Path path = fileStatus.getPath();
|
||||||
|
final long size = fileStatus.getLen();
|
||||||
|
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
|
||||||
|
try {
|
||||||
|
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
|
||||||
|
assert !isHBaseChecksum; // Initially we must read with FS checksum.
|
||||||
|
FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
|
||||||
|
return true;
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
fsdis.close();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Metadata for this file. Conjured by the writer. Read in by the reader.
|
* Metadata for this file. Conjured by the writer. Read in by the reader.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -89,6 +89,7 @@ import java.util.Collection;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -246,7 +247,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
{
|
{
|
||||||
doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator());
|
doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a bulk load of the given directory into the given
|
* Perform a bulk load of the given directory into the given
|
||||||
* pre-existing table. This method is not threadsafe.
|
* pre-existing table. This method is not threadsafe.
|
||||||
|
@ -282,15 +283,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
discoverLoadQueue(queue, hfofDir);
|
discoverLoadQueue(queue, hfofDir);
|
||||||
// check whether there is invalid family name in HFiles to be bulkloaded
|
// check whether there is invalid family name in HFiles to be bulkloaded
|
||||||
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
|
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
|
||||||
ArrayList<String> familyNames = new ArrayList<String>();
|
ArrayList<String> familyNames = new ArrayList<String>(families.size());
|
||||||
for (HColumnDescriptor family : families) {
|
for (HColumnDescriptor family : families) {
|
||||||
familyNames.add(family.getNameAsString());
|
familyNames.add(family.getNameAsString());
|
||||||
}
|
}
|
||||||
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||||
for (LoadQueueItem lqi : queue) {
|
Iterator<LoadQueueItem> queueIter = queue.iterator();
|
||||||
|
while (queueIter.hasNext()) {
|
||||||
|
LoadQueueItem lqi = queueIter.next();
|
||||||
String familyNameInHFile = Bytes.toString(lqi.family);
|
String familyNameInHFile = Bytes.toString(lqi.family);
|
||||||
if (!familyNames.contains(familyNameInHFile)) {
|
if (!familyNames.contains(familyNameInHFile)) {
|
||||||
unmatchedFamilies.add(familyNameInHFile);
|
if (HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath)) {
|
||||||
|
unmatchedFamilies.add(familyNameInHFile);
|
||||||
|
} else {
|
||||||
|
LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping");
|
||||||
|
queueIter.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (unmatchedFamilies.size() > 0) {
|
if (unmatchedFamilies.size() > 0) {
|
||||||
|
@ -729,7 +737,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isSecureBulkLoadEndpointAvailable() {
|
private boolean isSecureBulkLoadEndpointAvailable() {
|
||||||
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
||||||
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
||||||
|
@ -945,7 +953,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
HTable table = (HTable) connection.getTable(tableName);) {
|
HTable table = (HTable) connection.getTable(tableName);) {
|
||||||
doBulkLoad(hfofDir, table);
|
doBulkLoad(hfofDir, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -306,6 +307,61 @@ public class TestLoadIncrementalHFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a random data file in a dir with a valid family name but not part of the table families
|
||||||
|
* we should we able to bulkload without getting the unmatched family exception. HBASE-13037
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
|
||||||
|
Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName");
|
||||||
|
FileSystem fs = util.getTestFileSystem();
|
||||||
|
dir = dir.makeQualified(fs);
|
||||||
|
|
||||||
|
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
|
||||||
|
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
|
||||||
|
FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
|
||||||
|
|
||||||
|
final String NON_FAMILY_FOLDER = "_logs";
|
||||||
|
Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
|
||||||
|
fs.mkdirs(nonFamilyDir);
|
||||||
|
createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
|
||||||
|
|
||||||
|
Table table = null;
|
||||||
|
try {
|
||||||
|
final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName";
|
||||||
|
table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY);
|
||||||
|
|
||||||
|
final String[] args = {dir.toString(), TABLE_NAME};
|
||||||
|
new LoadIncrementalHFiles(util.getConfiguration()).run(args);
|
||||||
|
assertEquals(500, util.countRows(table));
|
||||||
|
} finally {
|
||||||
|
if (table != null) {
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
fs.delete(dir, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createRandomDataFile(FileSystem fs, Path path, int size)
|
||||||
|
throws IOException {
|
||||||
|
FSDataOutputStream stream = fs.create(path);
|
||||||
|
try {
|
||||||
|
byte[] data = new byte[1024];
|
||||||
|
for (int i = 0; i < data.length; ++i) {
|
||||||
|
data[i] = (byte)(i & 0xff);
|
||||||
|
}
|
||||||
|
while (size >= data.length) {
|
||||||
|
stream.write(data, 0, data.length);
|
||||||
|
size -= data.length;
|
||||||
|
}
|
||||||
|
if (size > 0) {
|
||||||
|
stream.write(data, 0, size);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testSplitStoreFile() throws IOException {
|
public void testSplitStoreFile() throws IOException {
|
||||||
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
|
Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
|
||||||
|
|
Loading…
Reference in New Issue