HBASE-20129 Add UT for serial replication checker

This commit is contained in:
zhangduo 2018-03-06 08:40:31 +08:00
parent f89a1f7d7a
commit 1384da7137
3 changed files with 208 additions and 14 deletions

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.hbase;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -35,7 +37,6 @@ import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.client.Connection;
@ -150,11 +151,13 @@ public class MetaTableAccessor {
META_REGION_PREFIX, 0, len);
}
private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
@VisibleForTesting
public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
private static final String REPLICATION_PARENT_SEPARATOR = "|";
private static final byte ESCAPE_BYTE = (byte) 0xFF;
private static final byte SEPARATED_BYTE = 0x00;
private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
/**
* Lists all of the table regions currently in META.
* Deprecated, keep there until some test use this.
@ -1924,10 +1927,51 @@ public class MetaTableAccessor {
.build());
}
private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) {
for (byte b : regionName) {
if (b == ESCAPE_BYTE) {
out.write(ESCAPE_BYTE);
}
out.write(b);
}
}
@VisibleForTesting
public static byte[] getParentsBytes(List<RegionInfo> parents) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Iterator<RegionInfo> iter = parents.iterator();
writeRegionName(bos, iter.next().getRegionName());
while (iter.hasNext()) {
bos.write(ESCAPE_BYTE);
bos.write(SEPARATED_BYTE);
writeRegionName(bos, iter.next().getRegionName());
}
return bos.toByteArray();
}
private static List<byte[]> parseParentsBytes(byte[] bytes) {
List<byte[]> parents = new ArrayList<>();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int i = 0; i < bytes.length; i++) {
if (bytes[i] == ESCAPE_BYTE) {
i++;
if (bytes[i] == SEPARATED_BYTE) {
parents.add(bos.toByteArray());
bos.reset();
continue;
}
// fall through to append the byte
}
bos.write(bytes[i]);
}
if (bos.size() > 0) {
parents.add(bos.toByteArray());
}
return parents;
}
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
.map(RegionInfo::getRegionNameAsString).collect(Collectors
.collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
byte[] value = getParentsBytes(parents);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
@ -1998,6 +2042,14 @@ public class MetaTableAccessor {
public List<byte[]> getParentRegionNames() {
return parentRegionNames;
}
@Override
public String toString() {
return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" +
state + ", parentRegionNames=" +
parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) +
"]";
}
}
private static long getReplicationBarrier(Cell c) {
@ -2017,10 +2069,7 @@ public class MetaTableAccessor {
byte[] parentRegionsBytes =
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
List<byte[]> parentRegionNames =
parentRegionsBytes != null
? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
.map(Bytes::toBytes).collect(Collectors.toList())
: Collections.emptyList();
parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList();
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@ -108,6 +110,8 @@ import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@InterfaceAudience.Private
class SerialReplicationChecker {
private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class);
public static final String REPLICATION_SERIALLY_WAITING_KEY =
"hbase.serial.replication.waiting.ms";
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
@ -182,9 +186,11 @@ class SerialReplicationChecker {
long seqId = entry.getKey().getSequenceId();
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
long[] barriers = barrierResult.getBarriers();
int index = Arrays.binarySearch(barriers, seqId);
if (index == -1) {
LOG.debug("{} is before the first barrier, pass", entry);
// This means we are in the range before the first record openSeqNum, this usually because the
// wal is written before we enable serial replication for this table, just return true since
// we can not guarantee the order.
@ -203,22 +209,29 @@ class SerialReplicationChecker {
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
LOG.debug("Parent {} has not been finished yet for entry {}, give up",
Bytes.toStringBinary(regionName), entry);
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
LOG.debug("{} is in the first range, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
LOG.debug("Previous range for {} has not been finished yet, give up", entry);
return false;
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
LOG.debug("The previous range for {} has been finished, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
@ -229,8 +242,11 @@ class SerialReplicationChecker {
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
return true;
}
LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
canReplicateUnderSeqId);
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
@ -239,6 +255,7 @@ class SerialReplicationChecker {
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
LOG.trace("The sequence id for {} is continuous, pass");
previousPushedSeqId.increment();
return true;
}
@ -249,6 +266,7 @@ class SerialReplicationChecker {
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
LOG.debug("Can not push{}, wait", entry);
Thread.sleep(waitTimeMs);
}
}

View File

@ -19,10 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
@ -30,8 +36,10 @@ import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@ -54,6 +62,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@ -72,6 +82,8 @@ public class TestSerialReplicationChecker {
private static String WAL_FILE_NAME = "test.wal";
private Connection conn;
private SerialReplicationChecker checker;
@Rule
@ -98,8 +110,18 @@ public class TestSerialReplicationChecker {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
conn = mock(Connection.class);
when(conn.isClosed()).thenReturn(false);
doAnswer(new Answer<Table>() {
@Override
public Table answer(InvocationOnMock invocation) throws Throwable {
return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
}
}).when(conn).getTable(any(TableName.class));
Server server = mock(Server.class);
when(server.getConnection()).thenReturn(UTIL.getConnection());
when(server.getConnection()).thenReturn(conn);
when(source.getServer()).thenReturn(server);
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
tableName = TableName.valueOf(name.getMethodName());
@ -129,8 +151,10 @@ public class TestSerialReplicationChecker {
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
if (state != null) {
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
}
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
@ -154,6 +178,15 @@ public class TestSerialReplicationChecker {
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
}
private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents));
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
@Test
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
@ -173,4 +206,98 @@ public class TestSerialReplicationChecker {
setState(region, RegionState.State.OPENING);
assertFalse(checker.canPush(createEntry(region, 104), cell));
}
@Test
public void testCanPushUnder() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
updatePushedSeqId(region, 9);
Cell cell = createCell(region);
assertTrue(checker.canPush(createEntry(region, 20), cell));
verify(conn, times(1)).getTable(any(TableName.class));
// not continuous
for (int i = 22; i < 100; i += 2) {
assertTrue(checker.canPush(createEntry(region, i), cell));
}
// verify that we do not go to meta table
verify(conn, times(1)).getTable(any(TableName.class));
}
@Test
public void testCanPushIfContinuous() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addStateAndBarrier(region, RegionState.State.OPEN, 10);
updatePushedSeqId(region, 9);
Cell cell = createCell(region);
assertTrue(checker.canPush(createEntry(region, 20), cell));
verify(conn, times(1)).getTable(any(TableName.class));
// continuous
for (int i = 21; i < 100; i++) {
assertTrue(checker.canPush(createEntry(region, i), cell));
}
// verify that we do not go to meta table
verify(conn, times(1)).getTable(any(TableName.class));
}
@Test
public void testCanPushAfterMerge() throws IOException, ReplicationException {
// 0xFF is the escape byte when storing region name so let's make sure it can work.
byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
RegionInfo regionA =
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
RegionInfo regionB =
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
addStateAndBarrier(regionA, null, 10, 100);
addStateAndBarrier(regionB, null, 20, 200);
addStateAndBarrier(region, RegionState.State.OPEN, 200);
addParents(region, Arrays.asList(regionA, regionB));
Cell cell = createCell(region);
// can not push since both parents have not been finished yet
assertFalse(checker.canPush(createEntry(region, 300), cell));
updatePushedSeqId(regionB, 199);
// can not push since regionA has not been finished yet
assertFalse(checker.canPush(createEntry(region, 300), cell));
updatePushedSeqId(regionA, 99);
// can push since all parents have been finished
assertTrue(checker.canPush(createEntry(region, 300), cell));
}
@Test
public void testCanPushAfterSplit() throws IOException, ReplicationException {
// 0xFF is the escape byte when storing region name so let's make sure it can work.
byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
RegionInfo regionA =
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
RegionInfo regionB =
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
addStateAndBarrier(region, null, 10, 100);
addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
addParents(regionA, Arrays.asList(region));
addParents(regionB, Arrays.asList(region));
Cell cellA = createCell(regionA);
Cell cellB = createCell(regionB);
// can not push since parent has not been finished yet
assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
updatePushedSeqId(region, 99);
// can push since parent has been finished
assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
}
@Test
public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
// For binary search, equals to an element will result to a positive value, let's test whether
// it works.
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
Cell cell = createCell(region);
assertTrue(checker.canPush(createEntry(region, 10), cell));
assertFalse(checker.canPush(createEntry(region, 100), cell));
updatePushedSeqId(region, 99);
assertTrue(checker.canPush(createEntry(region, 100), cell));
}
}