HBASE-20125 Add UT for serial replication after region split and merge

This commit is contained in:
zhangduo 2018-03-06 21:31:05 +08:00
parent 1bafe4a028
commit 6b77786dfc
5 changed files with 284 additions and 92 deletions

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
@ -35,12 +34,20 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter {
/**
* Applies the filter, possibly returning a different Entry instance.
* If null is returned, the entry will be skipped.
* <p>
* Applies the filter, possibly returning a different Entry instance. If null is returned, the
* entry will be skipped.
* </p>
* <p>
* Notice that you are free to modify the cell list of the give entry, but do not change the
* content of the cell, it may be used by others at the same time(and usually you can not modify a
* cell unless you cast it to the implementation class, which is not a good idea).
* </p>
* @param entry Entry to filter
* @return a (possibly modified) Entry to use. Returning null or an entry with
* no cells will cause the entry to be skipped for replication.
* @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
* the entry to be skipped for replication.
*/
public Entry filter(Entry entry);
}

View File

@ -120,7 +120,7 @@ public class ReplicationSourceShipper extends Thread {
/**
* Do the shipping logic
*/
protected void shipEdits(WALEntryBatch entryBatch) {
protected final void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
@ -253,7 +253,7 @@ public class ReplicationSourceShipper extends Thread {
return 0;
}
protected boolean isActive() {
protected final boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}

View File

@ -71,6 +71,13 @@ public class ReplicationSourceWALReader extends Thread {
private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
// used to store the first cell in an entry before filtering. This is because that if serial
// replication is enabled, we may find out that an entry can not be pushed after filtering. And
// when we try the next time, the cells maybe null since the entry has already been filtered,
// especially for region event wal entries. And this can also used to determine whether we can
// skip filtering.
private Cell firstCellInEntryBeforeFiltering;
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
@ -162,37 +169,52 @@ public class ReplicationSourceWALReader extends Thread {
}
}
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException {
entryStream.next();
firstCellInEntryBeforeFiltering = null;
batch.setLastWalPosition(entryStream.getPosition());
}
private WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
if (!entryStream.hasNext()) {
return null;
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do {
Entry entry = entryStream.peek();
batch.setLastWalPosition(entryStream.getPosition());
boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the row
// key for the edit. And we need to do this before filtering since all the cells may be
// filtered out, especially that for the markers.
Cell firstCellInEdit = null;
boolean doFiltering = true;
if (hasSerialReplicationScope) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
firstCellInEdit = entry.getEdit().getCells().get(0);
if (firstCellInEntryBeforeFiltering == null) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the
// row key for the edit. And we need to do this before filtering since all the cells may
// be filtered out, especially that for the markers.
firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
} else {
// if this is not null then we know that the entry has already been filtered.
doFiltering = false;
}
}
if (doFiltering) {
entry = filterEntry(entry);
}
entry = filterEntry(entry);
if (entry != null) {
if (hasSerialReplicationScope) {
if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
if (batch.getNbEntries() > 0) {
if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit);
serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
// arrive here means we can push the entry, record the last sequence id
@ -200,7 +222,7 @@ public class ReplicationSourceWALReader extends Thread {
entry.getKey().getSequenceId());
}
// actually remove the entry.
entryStream.next();
removeEntryFromStream(entryStream, batch);
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
@ -215,7 +237,7 @@ public class ReplicationSourceWALReader extends Thread {
}
} else {
// actually remove the entry.
entryStream.next();
removeEntryFromStream(entryStream, batch);
}
} while (entryStream.hasNext());
return batch;

View File

@ -56,8 +56,12 @@ class WALEntryStream implements Closeable {
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
// position for the current entry. As now we support peek, which means that the upper layer may
// choose to return before reading the current entry, so it is not safe to return the value below
// in getPosition.
private long currentPositionOfEntry = 0;
// position after reading current entry
private long currentPosition = 0;
private long currentPositionOfReader = 0;
private final PriorityBlockingQueue<Path> logQueue;
private final FileSystem fs;
private final Configuration conf;
@ -82,7 +86,7 @@ class WALEntryStream implements Closeable {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.currentPositionOfEntry = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
this.serverName = serverName;
this.metrics = metrics;
@ -110,6 +114,7 @@ class WALEntryStream implements Closeable {
*/
public Entry next() throws IOException {
Entry save = peek();
currentPositionOfEntry = currentPositionOfReader;
currentEntry = null;
return save;
}
@ -126,7 +131,7 @@ class WALEntryStream implements Closeable {
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
return currentPosition;
return currentPositionOfEntry;
}
/**
@ -140,7 +145,7 @@ class WALEntryStream implements Closeable {
StringBuilder sb = new StringBuilder();
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(currentPosition).append("\n");
.append(currentPositionOfEntry).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
@ -159,7 +164,7 @@ class WALEntryStream implements Closeable {
}
private void setPosition(long position) {
currentPosition = position;
currentPositionOfEntry = position;
}
private void setCurrentPath(Path path) {
@ -168,19 +173,19 @@ class WALEntryStream implements Closeable {
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
boolean beingWritten = readNextEntryAndSetPosition();
boolean beingWritten = readNextEntryAndRecordReaderPosition();
if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
readNextEntryAndRecordReaderPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
readNextEntryAndRecordReaderPosition();
}
}
}
@ -201,45 +206,49 @@ class WALEntryStream implements Closeable {
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+ (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
// Here we use currentPositionOfReader instead of currentPositionOfEntry.
// We only call this method when currentEntry is null so usually they are the same, but there
// are two exceptions. One is we have nothing in the file but only a header, in this way
// the currentPositionOfEntry will always be 0 since we have no change to update it. The other
// is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
// last valid entry, and the currentPositionOfReader will usually point to the end of the file.
if (stat != null) {
if (trailerSize < 0) {
if (currentPosition < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPosition;
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of WAL file '" + currentPath
+ "'. It was not closed cleanly, so we did not parse " + skippedBytes
+ " bytes of data. This is normally ok.");
}
if (currentPositionOfReader < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPositionOfReader;
LOG.debug(
"Reached the end of WAL file '{}'. It was not closed cleanly," +
" so we did not parse {} bytes of data. This is normally ok.",
currentPath, skippedBytes);
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
} else if (currentPosition + trailerSize < stat.getLen()) {
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ ", which is too far away from reported file length " + stat.getLen()
+ ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
} else if (currentPositionOfReader + trailerSize < stat.getLen()) {
LOG.warn(
"Processing end of WAL file '{}'. At position {}, which is too far away from" +
" reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPosition);
metrics.incrRepeatedFileBytes(currentPositionOfReader);
return false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ (stat == null ? "N/A" : stat.getLen()));
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
(stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of log " + currentPath);
}
LOG.debug("Reached the end of log {}", currentPath);
closeReader();
logQueue.remove();
setPosition(0);
@ -249,7 +258,7 @@ class WALEntryStream implements Closeable {
/**
* Returns whether the file is opened for writing.
*/
private boolean readNextEntryAndSetPosition() throws IOException {
private boolean readNextEntryAndRecordReaderPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
@ -265,10 +274,10 @@ class WALEntryStream implements Closeable {
}
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
this.currentPositionOfReader = readerPos;
return fileLength.isPresent();
}
@ -401,8 +410,8 @@ class WALEntryStream implements Closeable {
}
private void seek() throws IOException {
if (currentPosition != 0) {
reader.seek(currentPosition);
if (currentPositionOfEntry != 0) {
reader.seek(currentPositionOfEntry);
}
}

View File

@ -18,11 +18,18 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -40,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -124,6 +132,8 @@ public class TestSerialReplication {
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
// disable balancer
UTIL.getAdmin().balancerSwitch(false, true);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
@ -141,7 +151,6 @@ public class TestSerialReplication {
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
UTIL.ensureSomeRegionServersAvailable(3);
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
// add in disable state, so later when enabling it all sources will start push together.
@ -152,14 +161,74 @@ public class TestSerialReplication {
}
@After
public void tearDown() throws IOException {
public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
}
@Override
public String explainFailure() throws Exception {
return "Log roll has not finished yet";
}
});
for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
t.getRegionServer().getWalRoller().requestRollAll();
}
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return rs.getRegion(region.getEncodedName()) != null;
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
}
private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception {
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= expectedEntries;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
}
@Test
public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
@ -174,46 +243,13 @@ public class TestSerialReplication {
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !rs.getRegions(tableName).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
moveRegion(region, rs);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= 200;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
enablePeerAndWaitUntilReplicationDone(200);
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
@ -231,4 +267,122 @@ public class TestSerialReplication {
assertEquals(200, count);
}
}
@Test
public void testRegionSplit() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.flush(tableName);
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30,
TimeUnit.SECONDS);
UTIL.waitUntilNoRegionsInTransition(30000);
List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
assertEquals(2, regions.size());
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
Map<String, Long> regionsToSeqId = new HashMap<>();
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
Long seqId = regionsToSeqId.get(encodedName);
assertNotNull(
"Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
assertTrue("Sequence id go backwards from " + seqId + " to " +
entry.getKey().getSequenceId() + " for " + encodedName,
entry.getKey().getSequenceId() >= seqId.longValue());
if (count < 100) {
assertEquals(encodedName + " is pushed before parent " + region.getEncodedName(),
region.getEncodedName(), encodedName);
} else {
assertNotEquals(region.getEncodedName(), encodedName);
}
count++;
}
assertEquals(200, count);
}
}
@Test
public void testRegionMerge() throws Exception {
byte[] splitKey = Bytes.toBytes(50);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
.setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
.build(),
new byte[][] { splitKey });
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
UTIL.getAdmin()
.mergeRegionsAsync(
regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false)
.get(30, TimeUnit.SECONDS);
UTIL.waitUntilNoRegionsInTransition(30000);
List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
assertEquals(1, regionsAfterMerge.size());
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
Map<String, Long> regionsToSeqId = new HashMap<>();
RegionInfo region = regionsAfterMerge.get(0);
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
Long seqId = regionsToSeqId.get(encodedName);
assertNotNull(
"Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
assertTrue("Sequence id go backwards from " + seqId + " to " +
entry.getKey().getSequenceId() + " for " + encodedName,
entry.getKey().getSequenceId() >= seqId.longValue());
if (count < 100) {
assertNotEquals(
encodedName + " is pushed before parents " +
regions.stream().map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")),
region.getEncodedName(), encodedName);
} else {
assertEquals(region.getEncodedName(), encodedName);
}
count++;
}
assertEquals(200, count);
}
}
}