diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 417f868dd29..cd3f1bdfb06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
@@ -35,12 +34,20 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter {
+
/**
- * Applies the filter, possibly returning a different Entry instance.
- * If null is returned, the entry will be skipped.
+ *
+ * Applies the filter, possibly returning a different Entry instance. If null is returned, the
+ * entry will be skipped.
+ *
+ *
+ * Notice that you are free to modify the cell list of the give entry, but do not change the
+ * content of the cell, it may be used by others at the same time(and usually you can not modify a
+ * cell unless you cast it to the implementation class, which is not a good idea).
+ *
* @param entry Entry to filter
- * @return a (possibly modified) Entry to use. Returning null or an entry with
- * no cells will cause the entry to be skipped for replication.
+ * @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
+ * the entry to be skipped for replication.
*/
public Entry filter(Entry entry);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index d207d775701..50aaf95e29b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -120,7 +120,7 @@ public class ReplicationSourceShipper extends Thread {
/**
* Do the shipping logic
*/
- protected void shipEdits(WALEntryBatch entryBatch) {
+ protected final void shipEdits(WALEntryBatch entryBatch) {
List entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
@@ -253,7 +253,7 @@ public class ReplicationSourceShipper extends Thread {
return 0;
}
- protected boolean isActive() {
+ protected final boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index fe87aec3175..ad3baafa15f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -71,6 +71,13 @@ public class ReplicationSourceWALReader extends Thread {
private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
+ // used to store the first cell in an entry before filtering. This is because that if serial
+ // replication is enabled, we may find out that an entry can not be pushed after filtering. And
+ // when we try the next time, the cells maybe null since the entry has already been filtered,
+ // especially for region event wal entries. And this can also used to determine whether we can
+ // skip filtering.
+ private Cell firstCellInEntryBeforeFiltering;
+
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
@@ -162,37 +169,52 @@ public class ReplicationSourceWALReader extends Thread {
}
}
+ private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
+ throws IOException {
+ entryStream.next();
+ firstCellInEntryBeforeFiltering = null;
+ batch.setLastWalPosition(entryStream.getPosition());
+ }
+
private WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
if (!entryStream.hasNext()) {
return null;
}
+ long positionBefore = entryStream.getPosition();
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do {
Entry entry = entryStream.peek();
- batch.setLastWalPosition(entryStream.getPosition());
boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
- // Used to locate the region record in meta table. In WAL we only have the table name and
- // encoded region name which can not be mapping to region name without scanning all the
- // records for a table, so we need a start key, just like what we have done at client side
- // when locating a region. For the markers, we will use the start key of the region as the row
- // key for the edit. And we need to do this before filtering since all the cells may be
- // filtered out, especially that for the markers.
- Cell firstCellInEdit = null;
+ boolean doFiltering = true;
if (hasSerialReplicationScope) {
- assert !entry.getEdit().isEmpty() : "should not write empty edits";
- firstCellInEdit = entry.getEdit().getCells().get(0);
+ if (firstCellInEntryBeforeFiltering == null) {
+ assert !entry.getEdit().isEmpty() : "should not write empty edits";
+ // Used to locate the region record in meta table. In WAL we only have the table name and
+ // encoded region name which can not be mapping to region name without scanning all the
+ // records for a table, so we need a start key, just like what we have done at client side
+ // when locating a region. For the markers, we will use the start key of the region as the
+ // row key for the edit. And we need to do this before filtering since all the cells may
+ // be filtered out, especially that for the markers.
+ firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
+ } else {
+ // if this is not null then we know that the entry has already been filtered.
+ doFiltering = false;
+ }
+ }
+
+ if (doFiltering) {
+ entry = filterEntry(entry);
}
- entry = filterEntry(entry);
if (entry != null) {
if (hasSerialReplicationScope) {
- if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
- if (batch.getNbEntries() > 0) {
+ if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+ if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
} else {
- serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit);
+ serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
// arrive here means we can push the entry, record the last sequence id
@@ -200,7 +222,7 @@ public class ReplicationSourceWALReader extends Thread {
entry.getKey().getSequenceId());
}
// actually remove the entry.
- entryStream.next();
+ removeEntryFromStream(entryStream, batch);
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
@@ -215,7 +237,7 @@ public class ReplicationSourceWALReader extends Thread {
}
} else {
// actually remove the entry.
- entryStream.next();
+ removeEntryFromStream(entryStream, batch);
}
} while (entryStream.hasNext());
return batch;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index bcab9b4b339..c639a483361 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -56,8 +56,12 @@ class WALEntryStream implements Closeable {
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
+ // position for the current entry. As now we support peek, which means that the upper layer may
+ // choose to return before reading the current entry, so it is not safe to return the value below
+ // in getPosition.
+ private long currentPositionOfEntry = 0;
// position after reading current entry
- private long currentPosition = 0;
+ private long currentPositionOfReader = 0;
private final PriorityBlockingQueue logQueue;
private final FileSystem fs;
private final Configuration conf;
@@ -82,7 +86,7 @@ class WALEntryStream implements Closeable {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
- this.currentPosition = startPosition;
+ this.currentPositionOfEntry = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
this.serverName = serverName;
this.metrics = metrics;
@@ -110,6 +114,7 @@ class WALEntryStream implements Closeable {
*/
public Entry next() throws IOException {
Entry save = peek();
+ currentPositionOfEntry = currentPositionOfReader;
currentEntry = null;
return save;
}
@@ -126,7 +131,7 @@ class WALEntryStream implements Closeable {
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
- return currentPosition;
+ return currentPositionOfEntry;
}
/**
@@ -140,7 +145,7 @@ class WALEntryStream implements Closeable {
StringBuilder sb = new StringBuilder();
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
- .append(currentPosition).append("\n");
+ .append(currentPositionOfEntry).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
@@ -159,7 +164,7 @@ class WALEntryStream implements Closeable {
}
private void setPosition(long position) {
- currentPosition = position;
+ currentPositionOfEntry = position;
}
private void setCurrentPath(Path path) {
@@ -168,19 +173,19 @@ class WALEntryStream implements Closeable {
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
- boolean beingWritten = readNextEntryAndSetPosition();
+ boolean beingWritten = readNextEntryAndRecordReaderPosition();
if (currentEntry == null && !beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
resetReader();
- readNextEntryAndSetPosition();
+ readNextEntryAndRecordReaderPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
- readNextEntryAndSetPosition();
+ readNextEntryAndRecordReaderPosition();
}
}
}
@@ -201,45 +206,49 @@ class WALEntryStream implements Closeable {
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
- LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
- + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
+ LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
+ currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
+ // Here we use currentPositionOfReader instead of currentPositionOfEntry.
+ // We only call this method when currentEntry is null so usually they are the same, but there
+ // are two exceptions. One is we have nothing in the file but only a header, in this way
+ // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
+ // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
+ // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
if (stat != null) {
if (trailerSize < 0) {
- if (currentPosition < stat.getLen()) {
- final long skippedBytes = stat.getLen() - currentPosition;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reached the end of WAL file '" + currentPath
- + "'. It was not closed cleanly, so we did not parse " + skippedBytes
- + " bytes of data. This is normally ok.");
- }
+ if (currentPositionOfReader < stat.getLen()) {
+ final long skippedBytes = stat.getLen() - currentPositionOfReader;
+ LOG.debug(
+ "Reached the end of WAL file '{}'. It was not closed cleanly," +
+ " so we did not parse {} bytes of data. This is normally ok.",
+ currentPath, skippedBytes);
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
- } else if (currentPosition + trailerSize < stat.getLen()) {
- LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
- + ", which is too far away from reported file length " + stat.getLen()
- + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
+ } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
+ LOG.warn(
+ "Processing end of WAL file '{}'. At position {}, which is too far away from" +
+ " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
+ currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
- metrics.incrRepeatedFileBytes(currentPosition);
+ metrics.incrRepeatedFileBytes(currentPositionOfReader);
return false;
}
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
- + (stat == null ? "N/A" : stat.getLen()));
+ LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
+ (stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reached the end of log " + currentPath);
- }
+ LOG.debug("Reached the end of log {}", currentPath);
closeReader();
logQueue.remove();
setPosition(0);
@@ -249,7 +258,7 @@ class WALEntryStream implements Closeable {
/**
* Returns whether the file is opened for writing.
*/
- private boolean readNextEntryAndSetPosition() throws IOException {
+ private boolean readNextEntryAndRecordReaderPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
@@ -265,10 +274,10 @@ class WALEntryStream implements Closeable {
}
if (readEntry != null) {
metrics.incrLogEditsRead();
- metrics.incrLogReadInBytes(readerPos - currentPosition);
+ metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
}
currentEntry = readEntry; // could be null
- setPosition(readerPos);
+ this.currentPositionOfReader = readerPos;
return fileLength.isPresent();
}
@@ -401,8 +410,8 @@ class WALEntryStream implements Closeable {
}
private void seek() throws IOException {
- if (currentPosition != 0) {
- reader.seek(currentPosition);
+ if (currentPositionOfEntry != 0) {
+ reader.seek(currentPositionOfEntry);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index dfa78e7bb92..bf6c0c8fe99 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -18,11 +18,18 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -40,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -124,6 +132,8 @@ public class TestSerialReplication {
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
+ // disable balancer
+ UTIL.getAdmin().balancerSwitch(false, true);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
@@ -141,7 +151,6 @@ public class TestSerialReplication {
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
- UTIL.ensureSomeRegionServersAvailable(3);
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
// add in disable state, so later when enabling it all sources will start push together.
@@ -152,14 +161,74 @@ public class TestSerialReplication {
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws Exception {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+ t.getRegionServer().getWalRoller().requestRollAll();
+ }
+ UTIL.waitFor(30000, new ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+ .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Log roll has not finished yet";
+ }
+ });
+ for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+ t.getRegionServer().getWalRoller().requestRollAll();
+ }
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
+ private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
+ UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+ Bytes.toBytes(rs.getServerName().getServerName()));
+ UTIL.waitFor(30000, new ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return rs.getRegion(region.getEncodedName()) != null;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return region + " is still not on " + rs;
+ }
+ });
+ }
+
+ private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception {
+ UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+ UTIL.waitFor(30000, new ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ while (reader.next() != null) {
+ count++;
+ }
+ return count >= expectedEntries;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Not enough entries replicated";
+ }
+ });
+ }
+
@Test
public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
@@ -174,46 +243,13 @@ public class TestSerialReplication {
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
- UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
- Bytes.toBytes(rs.getServerName().getServerName()));
- UTIL.waitFor(30000, new ExplainingPredicate() {
-
- @Override
- public boolean evaluate() throws Exception {
- return !rs.getRegions(tableName).isEmpty();
- }
-
- @Override
- public String explainFailure() throws Exception {
- return region + " is still not on " + rs;
- }
- });
+ moveRegion(region, rs);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
- UTIL.getAdmin().enableReplicationPeer(PEER_ID);
- UTIL.waitFor(30000, new ExplainingPredicate() {
-
- @Override
- public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
- int count = 0;
- while (reader.next() != null) {
- count++;
- }
- return count >= 200;
- } catch (IOException e) {
- return false;
- }
- }
-
- @Override
- public String explainFailure() throws Exception {
- return "Not enough entries replicated";
- }
- });
+ enablePeerAndWaitUntilReplicationDone(200);
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
@@ -231,4 +267,122 @@ public class TestSerialReplication {
assertEquals(200, count);
}
}
+
+ @Test
+ public void testRegionSplit() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ UTIL.flush(tableName);
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30,
+ TimeUnit.SECONDS);
+ UTIL.waitUntilNoRegionsInTransition(30000);
+ List regions = UTIL.getAdmin().getRegions(tableName);
+ assertEquals(2, regions.size());
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ enablePeerAndWaitUntilReplicationDone(200);
+ Map regionsToSeqId = new HashMap<>();
+ regionsToSeqId.put(region.getEncodedName(), -1L);
+ regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
+ try (WAL.Reader reader =
+ WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ for (Entry entry;;) {
+ entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
+ Long seqId = regionsToSeqId.get(encodedName);
+ assertNotNull(
+ "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
+ assertTrue("Sequence id go backwards from " + seqId + " to " +
+ entry.getKey().getSequenceId() + " for " + encodedName,
+ entry.getKey().getSequenceId() >= seqId.longValue());
+ if (count < 100) {
+ assertEquals(encodedName + " is pushed before parent " + region.getEncodedName(),
+ region.getEncodedName(), encodedName);
+ } else {
+ assertNotEquals(region.getEncodedName(), encodedName);
+ }
+ count++;
+ }
+ assertEquals(200, count);
+ }
+ }
+
+ @Test
+ public void testRegionMerge() throws Exception {
+ byte[] splitKey = Bytes.toBytes(50);
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName)
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
+ .setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
+ .build(),
+ new byte[][] { splitKey });
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ List regions = UTIL.getAdmin().getRegions(tableName);
+ UTIL.getAdmin()
+ .mergeRegionsAsync(
+ regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false)
+ .get(30, TimeUnit.SECONDS);
+ UTIL.waitUntilNoRegionsInTransition(30000);
+ List regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
+ assertEquals(1, regionsAfterMerge.size());
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ enablePeerAndWaitUntilReplicationDone(200);
+ Map regionsToSeqId = new HashMap<>();
+ RegionInfo region = regionsAfterMerge.get(0);
+ regionsToSeqId.put(region.getEncodedName(), -1L);
+ regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
+ try (WAL.Reader reader =
+ WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ for (Entry entry;;) {
+ entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
+ Long seqId = regionsToSeqId.get(encodedName);
+ assertNotNull(
+ "Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
+ assertTrue("Sequence id go backwards from " + seqId + " to " +
+ entry.getKey().getSequenceId() + " for " + encodedName,
+ entry.getKey().getSequenceId() >= seqId.longValue());
+ if (count < 100) {
+ assertNotEquals(
+ encodedName + " is pushed before parents " +
+ regions.stream().map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")),
+ region.getEncodedName(), encodedName);
+ } else {
+ assertEquals(region.getEncodedName(), encodedName);
+ }
+ count++;
+ }
+ assertEquals(200, count);
+ }
+ }
}