diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 109f2d00f54..2a88b568004 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hbase; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -35,7 +37,6 @@ import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.client.Connection; @@ -150,11 +151,13 @@ public class MetaTableAccessor { META_REGION_PREFIX, 0, len); } - private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent"); + @VisibleForTesting + public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent"); - private static final String REPLICATION_PARENT_SEPARATOR = "|"; + private static final byte ESCAPE_BYTE = (byte) 0xFF; + + private static final byte SEPARATED_BYTE = 0x00; - private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|"; /** * Lists all of the table regions currently in META. * Deprecated, keep there until some test use this. @@ -1921,10 +1924,51 @@ public class MetaTableAccessor { .build()); } + private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) { + for (byte b : regionName) { + if (b == ESCAPE_BYTE) { + out.write(ESCAPE_BYTE); + } + out.write(b); + } + } + + @VisibleForTesting + public static byte[] getParentsBytes(List parents) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Iterator iter = parents.iterator(); + writeRegionName(bos, iter.next().getRegionName()); + while (iter.hasNext()) { + bos.write(ESCAPE_BYTE); + bos.write(SEPARATED_BYTE); + writeRegionName(bos, iter.next().getRegionName()); + } + return bos.toByteArray(); + } + + private static List parseParentsBytes(byte[] bytes) { + List parents = new ArrayList<>(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int i = 0; i < bytes.length; i++) { + if (bytes[i] == ESCAPE_BYTE) { + i++; + if (bytes[i] == SEPARATED_BYTE) { + parents.add(bos.toByteArray()); + bos.reset(); + continue; + } + // fall through to append the byte + } + bos.write(bytes[i]); + } + if (bos.size() > 0) { + parents.add(bos.toByteArray()); + } + return parents; + } + private static void addReplicationParent(Put put, List parents) throws IOException { - byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica) - .map(RegionInfo::getRegionNameAsString).collect(Collectors - .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes)); + byte[] value = getParentsBytes(parents); put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build()); @@ -1995,6 +2039,14 @@ public class MetaTableAccessor { public List getParentRegionNames() { return parentRegionNames; } + + @Override + public String toString() { + return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" + + state + ", parentRegionNames=" + + parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) + + "]"; + } } private static long getReplicationBarrier(Cell c) { @@ -2014,10 +2066,7 @@ public class MetaTableAccessor { byte[] parentRegionsBytes = result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER); List parentRegionNames = - parentRegionsBytes != null - ? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX)) - .map(Bytes::toBytes).collect(Collectors.toList()) - : Collections.emptyList(); + parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList(); return new ReplicationBarrierResult(barriers, state, parentRegionNames); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java index 95f3868031d..9276359172d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; @@ -108,6 +110,8 @@ import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; @InterfaceAudience.Private class SerialReplicationChecker { + private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class); + public static final String REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waiting.ms"; public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; @@ -182,9 +186,11 @@ class SerialReplicationChecker { long seqId = entry.getKey().getSequenceId(); ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn, entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName()); + LOG.debug("Replication barrier for {}: {}", entry, barrierResult); long[] barriers = barrierResult.getBarriers(); int index = Arrays.binarySearch(barriers, seqId); if (index == -1) { + LOG.debug("{} is before the first barrier, pass", entry); // This means we are in the range before the first record openSeqNum, this usually because the // wal is written before we enable serial replication for this table, just return true since // we can not guarantee the order. @@ -203,22 +209,29 @@ class SerialReplicationChecker { // we are in the first range, check whether we have parents for (byte[] regionName : barrierResult.getParentRegionNames()) { if (!isParentFinished(regionName)) { + LOG.debug("Parent {} has not been finished yet for entry {}, give up", + Bytes.toStringBinary(regionName), entry); return false; } } if (isLastRangeAndOpening(barrierResult, index)) { + LOG.debug("{} is in the last range and the region is opening, give up", entry); return false; } + LOG.debug("{} is in the first range, pass", entry); recordCanPush(encodedNameAsString, seqId, barriers, 1); return true; } // check whether the previous range is finished if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) { + LOG.debug("Previous range for {} has not been finished yet, give up", entry); return false; } if (isLastRangeAndOpening(barrierResult, index)) { + LOG.debug("{} is in the last range and the region is opening, give up", entry); return false; } + LOG.debug("The previous range for {} has been finished, pass", entry); recordCanPush(encodedNameAsString, seqId, barriers, index); return true; } @@ -229,8 +242,11 @@ class SerialReplicationChecker { Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString); if (canReplicateUnderSeqId != null) { if (seqId < canReplicateUnderSeqId.longValue()) { + LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId); return true; } + LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry, + canReplicateUnderSeqId); // we are already beyond the last safe point, remove canPushUnder.invalidate(encodedNameAsString); } @@ -239,6 +255,7 @@ class SerialReplicationChecker { // has been moved to another RS and then back, so we need to check the barrier. MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString); if (seqId == previousPushedSeqId.longValue() + 1) { + LOG.trace("The sequence id for {} is continuous, pass"); previousPushedSeqId.increment(); return true; } @@ -249,6 +266,7 @@ class SerialReplicationChecker { throws IOException, InterruptedException { byte[] row = CellUtil.cloneRow(firstCellInEdit); while (!canPush(entry, row)) { + LOG.debug("Can not push{}, wait", entry); Thread.sleep(waitTimeMs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index c8387c5c41b..58e954388a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -19,10 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CellBuilderFactory; @@ -30,8 +36,10 @@ import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -54,6 +62,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -72,6 +82,8 @@ public class TestSerialReplicationChecker { private static String WAL_FILE_NAME = "test.wal"; + private Connection conn; + private SerialReplicationChecker checker; @Rule @@ -98,8 +110,18 @@ public class TestSerialReplicationChecker { ReplicationSource source = mock(ReplicationSource.class); when(source.getPeerId()).thenReturn(PEER_ID); when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE); + conn = mock(Connection.class); + when(conn.isClosed()).thenReturn(false); + doAnswer(new Answer() { + + @Override + public Table answer(InvocationOnMock invocation) throws Throwable { + return UTIL.getConnection().getTable((TableName) invocation.getArgument(0)); + } + + }).when(conn).getTable(any(TableName.class)); Server server = mock(Server.class); - when(server.getConnection()).thenReturn(UTIL.getConnection()); + when(server.getConnection()).thenReturn(conn); when(source.getServer()).thenReturn(server); checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); tableName = TableName.valueOf(name.getMethodName()); @@ -129,8 +151,10 @@ public class TestSerialReplicationChecker { private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) throws IOException { Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); - put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, - Bytes.toBytes(state.name())); + if (state != null) { + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + } for (int i = 0; i < barriers.length; i++) { put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, put.getTimeStamp() - i, Bytes.toBytes(barriers[i])); @@ -154,6 +178,15 @@ public class TestSerialReplicationChecker { PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); } + private void addParents(RegionInfo region, List parents) throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, + MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents)); + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + @Test public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); @@ -173,4 +206,98 @@ public class TestSerialReplicationChecker { setState(region, RegionState.State.OPENING); assertFalse(checker.canPush(createEntry(region, 104), cell)); } + + @Test + public void testCanPushUnder() throws IOException, ReplicationException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); + updatePushedSeqId(region, 9); + Cell cell = createCell(region); + assertTrue(checker.canPush(createEntry(region, 20), cell)); + verify(conn, times(1)).getTable(any(TableName.class)); + // not continuous + for (int i = 22; i < 100; i += 2) { + assertTrue(checker.canPush(createEntry(region, i), cell)); + } + // verify that we do not go to meta table + verify(conn, times(1)).getTable(any(TableName.class)); + } + + @Test + public void testCanPushIfContinuous() throws IOException, ReplicationException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addStateAndBarrier(region, RegionState.State.OPEN, 10); + updatePushedSeqId(region, 9); + Cell cell = createCell(region); + assertTrue(checker.canPush(createEntry(region, 20), cell)); + verify(conn, times(1)).getTable(any(TableName.class)); + // continuous + for (int i = 21; i < 100; i++) { + assertTrue(checker.canPush(createEntry(region, i), cell)); + } + // verify that we do not go to meta table + verify(conn, times(1)).getTable(any(TableName.class)); + } + + @Test + public void testCanPushAfterMerge() throws IOException, ReplicationException { + // 0xFF is the escape byte when storing region name so let's make sure it can work. + byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; + RegionInfo regionA = + RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build(); + RegionInfo regionB = + RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build(); + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build(); + addStateAndBarrier(regionA, null, 10, 100); + addStateAndBarrier(regionB, null, 20, 200); + addStateAndBarrier(region, RegionState.State.OPEN, 200); + addParents(region, Arrays.asList(regionA, regionB)); + Cell cell = createCell(region); + // can not push since both parents have not been finished yet + assertFalse(checker.canPush(createEntry(region, 300), cell)); + updatePushedSeqId(regionB, 199); + // can not push since regionA has not been finished yet + assertFalse(checker.canPush(createEntry(region, 300), cell)); + updatePushedSeqId(regionA, 99); + // can push since all parents have been finished + assertTrue(checker.canPush(createEntry(region, 300), cell)); + } + + @Test + public void testCanPushAfterSplit() throws IOException, ReplicationException { + // 0xFF is the escape byte when storing region name so let's make sure it can work. + byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 }; + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build(); + RegionInfo regionA = + RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build(); + RegionInfo regionB = + RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build(); + addStateAndBarrier(region, null, 10, 100); + addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200); + addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300); + addParents(regionA, Arrays.asList(region)); + addParents(regionB, Arrays.asList(region)); + Cell cellA = createCell(regionA); + Cell cellB = createCell(regionB); + // can not push since parent has not been finished yet + assertFalse(checker.canPush(createEntry(regionA, 150), cellA)); + assertFalse(checker.canPush(createEntry(regionB, 200), cellB)); + updatePushedSeqId(region, 99); + // can push since parent has been finished + assertTrue(checker.canPush(createEntry(regionA, 150), cellA)); + assertTrue(checker.canPush(createEntry(regionB, 200), cellB)); + } + + @Test + public void testCanPushEqualsToBarrier() throws IOException, ReplicationException { + // For binary search, equals to an element will result to a positive value, let's test whether + // it works. + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); + Cell cell = createCell(region); + assertTrue(checker.canPush(createEntry(region, 10), cell)); + assertFalse(checker.canPush(createEntry(region, 100), cell)); + updatePushedSeqId(region, 99); + assertTrue(checker.canPush(createEntry(region, 100), cell)); + } }