HBASE-14987 Compaction marker whose region name doesn't match current region's needs to be handled

This commit is contained in:
tedyu 2016-01-04 07:10:10 -08:00
parent 9589a7d8be
commit 00656688f7
3 changed files with 65 additions and 14 deletions

View File

@ -2563,12 +2563,19 @@ public final class ProtobufUtil {
@SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
}
@SuppressWarnings("deprecation")
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
// compaction descriptor contains relative paths.
// input / output paths are relative to the store dir
// store dir is relative to region dir
CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
.setTableName(ByteStringer.wrap(info.getTable().toBytes()))
.setEncodedRegionName(ByteStringer.wrap(info.getEncodedNameAsBytes()))
.setEncodedRegionName(ByteStringer.wrap(
regionName == null ? info.getEncodedNameAsBytes() : regionName))
.setFamilyName(ByteStringer.wrap(family))
.setStoreHomeDir(storeDir.getName()); //make relative
for (Path inputPath : inputPaths) {

View File

@ -4130,11 +4130,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
continue;
}
}
boolean checkRowWithinBoundary = false;
// Check this edit is for this region.
if (!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
skippedEdits++;
continue;
checkRowWithinBoundary = true;
}
boolean flush = false;
@ -4142,12 +4142,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
// if region names don't match, skipp replaying compaction marker
if (!checkRowWithinBoundary) {
//this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
}
}
skippedEdits++;
continue;
}
@ -4162,6 +4165,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
skippedEdits++;
continue;
}
if (checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(),
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) {
LOG.warn("Row of " + cell + " is not within region boundary");
skippedEdits++;
continue;
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
.getName())) {
@ -4232,8 +4241,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
boolean removeFiles, long replaySeqId)
throws IOException {
try {
checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
"Compaction marker from WAL ", compaction);
} catch (WrongRegionException wre) {
if (RegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
// skip the compaction marker since it is not for this region
return;
}
throw wre;
}
synchronized (writestate) {
if (replaySeqId < lastReplayedOpenRegionSeqId) {
@ -6590,6 +6607,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
(Bytes.compareTo(info.getEndKey(), row) > 0));
}
public static boolean rowIsInRange(HRegionInfo info, final byte [] row, final int offset,
final short length) {
return ((info.getStartKey().length == 0) ||
(Bytes.compareTo(info.getStartKey(), 0, info.getStartKey().length,
row, offset, length) <= 0)) &&
((info.getEndKey().length == 0) ||
(Bytes.compareTo(info.getEndKey(), 0, info.getEndKey().length, row, offset, length) > 0));
}
/**
* Merge two HRegions. The regions must be adjacent and must not overlap.
*

View File

@ -839,6 +839,10 @@ public class TestHRegion {
@Test
public void testRecoveredEditsReplayCompaction() throws Exception {
testRecoveredEditsReplayCompaction(false);
testRecoveredEditsReplayCompaction(true);
}
public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
String method = name.getMethodName();
TableName tableName = TableName.valueOf(method);
byte[] family = Bytes.toBytes("family");
@ -884,9 +888,17 @@ public class TestHRegion {
Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
files[0].getPath());
byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
for (int i=0; i < encodedNameAsBytes.length; i++) {
// Mix the byte array to have a new encodedName
fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
}
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
.getRegionInfo(), family, storeFiles, Lists.newArrayList(newFile), region
.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
.getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
@ -908,14 +920,20 @@ public class TestHRegion {
region.getTableDesc();
region.getRegionInfo();
region.close();
try {
region = HRegion.openHRegion(region, null);
} catch (WrongRegionException wre) {
fail("Matching encoded region name should not have produced WrongRegionException");
}
// now check whether we have only one store file, the compacted one
Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
for (StoreFile sf : sfs) {
LOG.info(sf.getPath());
}
if (!mismatchedRegionName) {
assertEquals(1, region.getStore(family).getStorefilesCount());
}
files = FSUtils.listStatus(fs, tmpDir);
assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);