HBASE-10549 When there is a hole, LoadIncrementalHFiles will hang in an infinite loop.(yuanxinen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577655 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f2ff9baa64
commit
7f35dcd74a
|
@ -502,6 +502,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* LQI's corresponding to the resultant hfiles.
|
||||
*
|
||||
* protected for testing
|
||||
* @throws IOException
|
||||
*/
|
||||
protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
|
||||
final LoadQueueItem item, final HTable table,
|
||||
|
@ -542,6 +543,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
idx = -(idx + 1) - 1;
|
||||
}
|
||||
final int indexForCallable = idx;
|
||||
|
||||
/**
|
||||
* we can consider there is a region hole in following conditions. 1) if idx < 0,then first
|
||||
* region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
|
||||
* region. 3) if the endkey of the last region is not empty.
|
||||
*/
|
||||
if (indexForCallable < 0) {
|
||||
throw new IOException("The first region info for table "
|
||||
+ Bytes.toString(table.getTableName())
|
||||
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if ((indexForCallable == startEndKeys.getFirst().length - 1)
|
||||
&& !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
|
||||
throw new IOException("The last region info for table "
|
||||
+ Bytes.toString(table.getTableName())
|
||||
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if (indexForCallable + 1 < startEndKeys.getFirst().length
|
||||
&& !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
|
||||
startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
|
||||
throw new IOException("The endkey of one region for table "
|
||||
+ Bytes.toString(table.getTableName())
|
||||
+ " is not equal to the startkey of the next region in hbase:meta."
|
||||
+ "Please use hbck tool to fix it first.");
|
||||
}
|
||||
|
||||
boolean lastKeyInRange =
|
||||
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
|
||||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -44,6 +45,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -119,7 +123,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
try {
|
||||
LOG.info("Creating table " + table);
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
for (int i = 0; i < 10; i++) {
|
||||
for (int i = 0; i < cfs; i++) {
|
||||
htd.addFamily(new HColumnDescriptor(family(i)));
|
||||
}
|
||||
|
||||
|
@ -129,6 +133,28 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a table with given table name,specified number of column families<br>
|
||||
* and splitkeys if the table does not already exist.
|
||||
* @param table
|
||||
* @param cfs
|
||||
* @param SPLIT_KEYS
|
||||
*/
|
||||
private void setupTableWithSplitkeys(String table, int cfs, byte[][] SPLIT_KEYS)
|
||||
throws IOException {
|
||||
try {
|
||||
LOG.info("Creating table " + table);
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
for (int i = 0; i < cfs; i++) {
|
||||
htd.addFamily(new HColumnDescriptor(family(i)));
|
||||
}
|
||||
|
||||
util.getHBaseAdmin().createTable(htd, SPLIT_KEYS);
|
||||
} catch (TableExistsException tee) {
|
||||
LOG.info("Table " + table + " already exists");
|
||||
}
|
||||
}
|
||||
|
||||
private Path buildBulkFiles(String table, int value) throws Exception {
|
||||
Path dir = util.getDataTestDirOnTestFS(table);
|
||||
Path bulk1 = new Path(dir, table+value);
|
||||
|
@ -403,5 +429,62 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
fail("doBulkLoad should have thrown an exception");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
|
||||
String tableName = "testGroupOrSplitWhenRegionHoleExistsInMeta";
|
||||
HTable table = new HTable(util.getConfiguration(), Bytes.toBytes(tableName));
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
|
||||
|
||||
setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
|
||||
Path dir = buildBulkFiles(tableName, 2);
|
||||
|
||||
final AtomicInteger countedLqis = new AtomicInteger();
|
||||
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
|
||||
util.getConfiguration()) {
|
||||
|
||||
protected List<LoadQueueItem> groupOrSplit(
|
||||
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
|
||||
final LoadQueueItem item, final HTable htable,
|
||||
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
|
||||
List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
|
||||
if (lqis != null) {
|
||||
countedLqis.addAndGet(lqis.size());
|
||||
}
|
||||
return lqis;
|
||||
}
|
||||
};
|
||||
|
||||
// do bulkload when there is no region hole in hbase:meta.
|
||||
try {
|
||||
loader.doBulkLoad(dir, table);
|
||||
} catch (Exception e) {
|
||||
LOG.error("exeception=", e);
|
||||
}
|
||||
// check if all the data are loaded into the table.
|
||||
this.assertExpectedTable(tableName, ROWCOUNT, 2);
|
||||
|
||||
dir = buildBulkFiles(tableName, 3);
|
||||
|
||||
// Mess it up by leaving a hole in the hbase:meta
|
||||
CatalogTracker ct = new CatalogTracker(util.getConfiguration());
|
||||
List<HRegionInfo> regionInfos = MetaReader.getTableRegions(ct, TableName.valueOf(tableName));
|
||||
for (HRegionInfo regionInfo : regionInfos) {
|
||||
if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
|
||||
MetaEditor.deleteRegion(ct, regionInfo);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
loader.doBulkLoad(dir, table);
|
||||
} catch (Exception e) {
|
||||
LOG.error("exeception=", e);
|
||||
assertTrue("IOException expected", e instanceof IOException);
|
||||
}
|
||||
|
||||
table.close();
|
||||
|
||||
this.assertExpectedTable(tableName, ROWCOUNT, 2);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue