HBASE-5472 LoadIncrementalHFiles loops forever if the target table misses a CF -- RE-RE-APPLY
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1471248 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df95e96647
commit
d11a58116c
|
@ -187,7 +187,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
/**
|
/**
|
||||||
* Perform a bulk load of the given directory into the given
|
* Perform a bulk load of the given directory into the given
|
||||||
* pre-existing table. This method is not threadsafe.
|
* pre-existing table. This method is not threadsafe.
|
||||||
*
|
*
|
||||||
* @param hfofDir the directory that was provided as the output path
|
* @param hfofDir the directory that was provided as the output path
|
||||||
* of a job using HFileOutputFormat
|
* of a job using HFileOutputFormat
|
||||||
* @param table the table to load into
|
* @param table the table to load into
|
||||||
|
@ -220,6 +220,27 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
|
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
|
||||||
try {
|
try {
|
||||||
discoverLoadQueue(queue, hfofDir);
|
discoverLoadQueue(queue, hfofDir);
|
||||||
|
// check whether there is invalid family name in HFiles to be bulkloaded
|
||||||
|
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
|
||||||
|
ArrayList<String> familyNames = new ArrayList<String>();
|
||||||
|
for (HColumnDescriptor family : families) {
|
||||||
|
familyNames.add(family.getNameAsString());
|
||||||
|
}
|
||||||
|
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||||
|
for (LoadQueueItem lqi : queue) {
|
||||||
|
String familyNameInHFile = Bytes.toString(lqi.family);
|
||||||
|
if (!familyNames.contains(familyNameInHFile)) {
|
||||||
|
unmatchedFamilies.add(familyNameInHFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (unmatchedFamilies.size() > 0) {
|
||||||
|
String msg =
|
||||||
|
"Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
|
||||||
|
+ unmatchedFamilies + "; valid family names of table "
|
||||||
|
+ Bytes.toString(table.getTableName()) + " are: " + familyNames;
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
if (queue.isEmpty()) {
|
if (queue.isEmpty()) {
|
||||||
|
@ -358,7 +379,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
|
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
|
||||||
while (!queue.isEmpty()) {
|
while (!queue.isEmpty()) {
|
||||||
final LoadQueueItem item = queue.remove();
|
final LoadQueueItem item = queue.remove();
|
||||||
|
|
||||||
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
|
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
|
||||||
public List<LoadQueueItem> call() throws Exception {
|
public List<LoadQueueItem> call() throws Exception {
|
||||||
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
|
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
|
||||||
|
@ -492,12 +513,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
* Attempts to do an atomic load of many hfiles into a region. If it fails,
|
* Attempts to do an atomic load of many hfiles into a region. If it fails,
|
||||||
* it returns a list of hfiles that need to be retried. If it is successful
|
* it returns a list of hfiles that need to be retried. If it is successful
|
||||||
* it will return an empty list.
|
* it will return an empty list.
|
||||||
*
|
*
|
||||||
* NOTE: To maintain row atomicity guarantees, region server callable should
|
* NOTE: To maintain row atomicity guarantees, region server callable should
|
||||||
* succeed atomically and fails atomically.
|
* succeed atomically and fails atomically.
|
||||||
*
|
*
|
||||||
* Protected for testing.
|
* Protected for testing.
|
||||||
*
|
*
|
||||||
* @return empty list if success, list of items to retry on recoverable
|
* @return empty list if success, list of items to retry on recoverable
|
||||||
* failure
|
* failure
|
||||||
*/
|
*/
|
||||||
|
@ -650,7 +671,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private boolean doesTableExist(String tableName) throws Exception {
|
private boolean doesTableExist(String tableName) throws Exception {
|
||||||
return hbAdmin.tableExists(tableName);
|
return hbAdmin.tableExists(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Infers region boundaries for a new table.
|
* Infers region boundaries for a new table.
|
||||||
* Parameter:
|
* Parameter:
|
||||||
|
@ -658,29 +679,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
* If a key is a start key of a file, then it maps to +1
|
* If a key is a start key of a file, then it maps to +1
|
||||||
* If a key is an end key of a file, then it maps to -1
|
* If a key is an end key of a file, then it maps to -1
|
||||||
* Algo:
|
* Algo:
|
||||||
* 1) Poll on the keys in order:
|
* 1) Poll on the keys in order:
|
||||||
* a) Keep adding the mapped values to these keys (runningSum)
|
* a) Keep adding the mapped values to these keys (runningSum)
|
||||||
* b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
|
* b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
|
||||||
* 2) Return the boundary list.
|
* 2) Return the boundary list.
|
||||||
*/
|
*/
|
||||||
public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
|
public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
|
||||||
ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
|
ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
|
||||||
int runningValue = 0;
|
int runningValue = 0;
|
||||||
byte[] currStartKey = null;
|
byte[] currStartKey = null;
|
||||||
boolean firstBoundary = true;
|
boolean firstBoundary = true;
|
||||||
|
|
||||||
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
|
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
|
||||||
if (runningValue == 0) currStartKey = item.getKey();
|
if (runningValue == 0) currStartKey = item.getKey();
|
||||||
runningValue += item.getValue();
|
runningValue += item.getValue();
|
||||||
if (runningValue == 0) {
|
if (runningValue == 0) {
|
||||||
if (!firstBoundary) keysArray.add(currStartKey);
|
if (!firstBoundary) keysArray.add(currStartKey);
|
||||||
firstBoundary = false;
|
firstBoundary = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return keysArray.toArray(new byte[0][0]);
|
return keysArray.toArray(new byte[0][0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the table is created for the first time, then "completebulkload" reads the files twice.
|
* If the table is created for the first time, then "completebulkload" reads the files twice.
|
||||||
* More modifications necessary if we want to avoid doing it.
|
* More modifications necessary if we want to avoid doing it.
|
||||||
|
@ -706,7 +727,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
// Build a set of keys
|
// Build a set of keys
|
||||||
byte[][] keys;
|
byte[][] keys;
|
||||||
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
for (FileStatus stat : familyDirStatuses) {
|
for (FileStatus stat : familyDirStatuses) {
|
||||||
if (!stat.isDir()) {
|
if (!stat.isDir()) {
|
||||||
LOG.warn("Skipping non-directory " + stat.getPath());
|
LOG.warn("Skipping non-directory " + stat.getPath());
|
||||||
|
@ -716,10 +737,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
// Skip _logs, etc
|
// Skip _logs, etc
|
||||||
if (familyDir.getName().startsWith("_")) continue;
|
if (familyDir.getName().startsWith("_")) continue;
|
||||||
byte[] family = familyDir.getName().getBytes();
|
byte[] family = familyDir.getName().getBytes();
|
||||||
|
|
||||||
hcd = new HColumnDescriptor(family);
|
hcd = new HColumnDescriptor(family);
|
||||||
htd.addFamily(hcd);
|
htd.addFamily(hcd);
|
||||||
|
|
||||||
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
|
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
|
||||||
for (Path hfile : hfiles) {
|
for (Path hfile : hfiles) {
|
||||||
if (hfile.getName().startsWith("_")) continue;
|
if (hfile.getName().startsWith("_")) continue;
|
||||||
|
@ -739,7 +760,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
|
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
|
||||||
" first=" + Bytes.toStringBinary(first) +
|
" first=" + Bytes.toStringBinary(first) +
|
||||||
" last=" + Bytes.toStringBinary(last));
|
" last=" + Bytes.toStringBinary(last));
|
||||||
|
|
||||||
// To eventually infer start key-end key boundaries
|
// To eventually infer start key-end key boundaries
|
||||||
Integer value = map.containsKey(first)? map.get(first):0;
|
Integer value = map.containsKey(first)? map.get(first):0;
|
||||||
map.put(first, value+1);
|
map.put(first, value+1);
|
||||||
|
@ -751,7 +772,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
keys = LoadIncrementalHFiles.inferBoundaries(map);
|
keys = LoadIncrementalHFiles.inferBoundaries(map);
|
||||||
this.hbAdmin.createTable(htd,keys);
|
this.hbAdmin.createTable(htd,keys);
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.junit.experimental.categories.Category;
|
||||||
public class TestLoadIncrementalHFiles {
|
public class TestLoadIncrementalHFiles {
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
||||||
|
private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
|
||||||
|
|
||||||
private static final byte[][] SPLIT_KEYS = new byte[][] {
|
private static final byte[][] SPLIT_KEYS = new byte[][] {
|
||||||
Bytes.toBytes("ddd"),
|
Bytes.toBytes("ddd"),
|
||||||
|
@ -188,6 +189,11 @@ public class TestLoadIncrementalHFiles {
|
||||||
|
|
||||||
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
|
HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
|
||||||
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
||||||
|
// set real family name to upper case in purpose to simulate the case that
|
||||||
|
// family name in HFiles is invalid
|
||||||
|
HColumnDescriptor family =
|
||||||
|
new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase()));
|
||||||
|
htd.addFamily(family);
|
||||||
admin.createTable(htd, SPLIT_KEYS);
|
admin.createTable(htd, SPLIT_KEYS);
|
||||||
|
|
||||||
HTable table = new HTable(util.getConfiguration(), TABLE);
|
HTable table = new HTable(util.getConfiguration(), TABLE);
|
||||||
|
@ -198,6 +204,11 @@ public class TestLoadIncrementalHFiles {
|
||||||
assertTrue("Loading into table with non-existent family should have failed", false);
|
assertTrue("Loading into table with non-existent family should have failed", false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertTrue("IOException expected", e instanceof IOException);
|
assertTrue("IOException expected", e instanceof IOException);
|
||||||
|
// further check whether the exception message is correct
|
||||||
|
String errMsg = e.getMessage();
|
||||||
|
assertTrue("Incorrect exception message, expected message: ["
|
||||||
|
+ EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
|
||||||
|
errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
|
||||||
}
|
}
|
||||||
table.close();
|
table.close();
|
||||||
admin.close();
|
admin.close();
|
||||||
|
|
Loading…
Reference in New Issue