HBASE-20129 Add UT for serial replication checker
This commit is contained in:
parent
f29bf1d778
commit
8b61a061d3
|
@ -19,12 +19,14 @@ package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,7 +37,6 @@ import java.util.TreeMap;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell.Type;
|
import org.apache.hadoop.hbase.Cell.Type;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
@ -150,11 +151,13 @@ public class MetaTableAccessor {
|
||||||
META_REGION_PREFIX, 0, len);
|
META_REGION_PREFIX, 0, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
|
@VisibleForTesting
|
||||||
|
public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
|
||||||
|
|
||||||
private static final String REPLICATION_PARENT_SEPARATOR = "|";
|
private static final byte ESCAPE_BYTE = (byte) 0xFF;
|
||||||
|
|
||||||
|
private static final byte SEPARATED_BYTE = 0x00;
|
||||||
|
|
||||||
private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
|
|
||||||
/**
|
/**
|
||||||
* Lists all of the table regions currently in META.
|
* Lists all of the table regions currently in META.
|
||||||
* Deprecated, keep there until some test use this.
|
* Deprecated, keep there until some test use this.
|
||||||
|
@ -1921,10 +1924,51 @@ public class MetaTableAccessor {
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) {
|
||||||
|
for (byte b : regionName) {
|
||||||
|
if (b == ESCAPE_BYTE) {
|
||||||
|
out.write(ESCAPE_BYTE);
|
||||||
|
}
|
||||||
|
out.write(b);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static byte[] getParentsBytes(List<RegionInfo> parents) {
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
Iterator<RegionInfo> iter = parents.iterator();
|
||||||
|
writeRegionName(bos, iter.next().getRegionName());
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
bos.write(ESCAPE_BYTE);
|
||||||
|
bos.write(SEPARATED_BYTE);
|
||||||
|
writeRegionName(bos, iter.next().getRegionName());
|
||||||
|
}
|
||||||
|
return bos.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<byte[]> parseParentsBytes(byte[] bytes) {
|
||||||
|
List<byte[]> parents = new ArrayList<>();
|
||||||
|
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
for (int i = 0; i < bytes.length; i++) {
|
||||||
|
if (bytes[i] == ESCAPE_BYTE) {
|
||||||
|
i++;
|
||||||
|
if (bytes[i] == SEPARATED_BYTE) {
|
||||||
|
parents.add(bos.toByteArray());
|
||||||
|
bos.reset();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// fall through to append the byte
|
||||||
|
}
|
||||||
|
bos.write(bytes[i]);
|
||||||
|
}
|
||||||
|
if (bos.size() > 0) {
|
||||||
|
parents.add(bos.toByteArray());
|
||||||
|
}
|
||||||
|
return parents;
|
||||||
|
}
|
||||||
|
|
||||||
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
|
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
|
||||||
byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
|
byte[] value = getParentsBytes(parents);
|
||||||
.map(RegionInfo::getRegionNameAsString).collect(Collectors
|
|
||||||
.collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
|
|
||||||
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
|
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
|
||||||
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
|
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
|
||||||
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
|
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
|
||||||
|
@ -1995,6 +2039,14 @@ public class MetaTableAccessor {
|
||||||
public List<byte[]> getParentRegionNames() {
|
public List<byte[]> getParentRegionNames() {
|
||||||
return parentRegionNames;
|
return parentRegionNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" +
|
||||||
|
state + ", parentRegionNames=" +
|
||||||
|
parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) +
|
||||||
|
"]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getReplicationBarrier(Cell c) {
|
private static long getReplicationBarrier(Cell c) {
|
||||||
|
@ -2014,10 +2066,7 @@ public class MetaTableAccessor {
|
||||||
byte[] parentRegionsBytes =
|
byte[] parentRegionsBytes =
|
||||||
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
|
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
|
||||||
List<byte[]> parentRegionNames =
|
List<byte[]> parentRegionNames =
|
||||||
parentRegionsBytes != null
|
parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList();
|
||||||
? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
|
|
||||||
.map(Bytes::toBytes).collect(Collectors.toList())
|
|
||||||
: Collections.emptyList();
|
|
||||||
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
|
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||||
|
@ -108,6 +110,8 @@ import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class SerialReplicationChecker {
|
class SerialReplicationChecker {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class);
|
||||||
|
|
||||||
public static final String REPLICATION_SERIALLY_WAITING_KEY =
|
public static final String REPLICATION_SERIALLY_WAITING_KEY =
|
||||||
"hbase.serial.replication.waiting.ms";
|
"hbase.serial.replication.waiting.ms";
|
||||||
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
|
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
|
||||||
|
@ -182,9 +186,11 @@ class SerialReplicationChecker {
|
||||||
long seqId = entry.getKey().getSequenceId();
|
long seqId = entry.getKey().getSequenceId();
|
||||||
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
|
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
|
||||||
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
|
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
|
||||||
|
LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
|
||||||
long[] barriers = barrierResult.getBarriers();
|
long[] barriers = barrierResult.getBarriers();
|
||||||
int index = Arrays.binarySearch(barriers, seqId);
|
int index = Arrays.binarySearch(barriers, seqId);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
|
LOG.debug("{} is before the first barrier, pass", entry);
|
||||||
// This means we are in the range before the first record openSeqNum, this usually because the
|
// This means we are in the range before the first record openSeqNum, this usually because the
|
||||||
// wal is written before we enable serial replication for this table, just return true since
|
// wal is written before we enable serial replication for this table, just return true since
|
||||||
// we can not guarantee the order.
|
// we can not guarantee the order.
|
||||||
|
@ -203,22 +209,29 @@ class SerialReplicationChecker {
|
||||||
// we are in the first range, check whether we have parents
|
// we are in the first range, check whether we have parents
|
||||||
for (byte[] regionName : barrierResult.getParentRegionNames()) {
|
for (byte[] regionName : barrierResult.getParentRegionNames()) {
|
||||||
if (!isParentFinished(regionName)) {
|
if (!isParentFinished(regionName)) {
|
||||||
|
LOG.debug("Parent {} has not been finished yet for entry {}, give up",
|
||||||
|
Bytes.toStringBinary(regionName), entry);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isLastRangeAndOpening(barrierResult, index)) {
|
if (isLastRangeAndOpening(barrierResult, index)) {
|
||||||
|
LOG.debug("{} is in the last range and the region is opening, give up", entry);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
LOG.debug("{} is in the first range, pass", entry);
|
||||||
recordCanPush(encodedNameAsString, seqId, barriers, 1);
|
recordCanPush(encodedNameAsString, seqId, barriers, 1);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// check whether the previous range is finished
|
// check whether the previous range is finished
|
||||||
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
|
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
|
||||||
|
LOG.debug("Previous range for {} has not been finished yet, give up", entry);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (isLastRangeAndOpening(barrierResult, index)) {
|
if (isLastRangeAndOpening(barrierResult, index)) {
|
||||||
|
LOG.debug("{} is in the last range and the region is opening, give up", entry);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
LOG.debug("The previous range for {} has been finished, pass", entry);
|
||||||
recordCanPush(encodedNameAsString, seqId, barriers, index);
|
recordCanPush(encodedNameAsString, seqId, barriers, index);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -229,8 +242,11 @@ class SerialReplicationChecker {
|
||||||
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
|
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
|
||||||
if (canReplicateUnderSeqId != null) {
|
if (canReplicateUnderSeqId != null) {
|
||||||
if (seqId < canReplicateUnderSeqId.longValue()) {
|
if (seqId < canReplicateUnderSeqId.longValue()) {
|
||||||
|
LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
|
||||||
|
canReplicateUnderSeqId);
|
||||||
// we are already beyond the last safe point, remove
|
// we are already beyond the last safe point, remove
|
||||||
canPushUnder.invalidate(encodedNameAsString);
|
canPushUnder.invalidate(encodedNameAsString);
|
||||||
}
|
}
|
||||||
|
@ -239,6 +255,7 @@ class SerialReplicationChecker {
|
||||||
// has been moved to another RS and then back, so we need to check the barrier.
|
// has been moved to another RS and then back, so we need to check the barrier.
|
||||||
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
|
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
|
||||||
if (seqId == previousPushedSeqId.longValue() + 1) {
|
if (seqId == previousPushedSeqId.longValue() + 1) {
|
||||||
|
LOG.trace("The sequence id for {} is continuous, pass");
|
||||||
previousPushedSeqId.increment();
|
previousPushedSeqId.increment();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -249,6 +266,7 @@ class SerialReplicationChecker {
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
byte[] row = CellUtil.cloneRow(firstCellInEdit);
|
byte[] row = CellUtil.cloneRow(firstCellInEdit);
|
||||||
while (!canPush(entry, row)) {
|
while (!canPush(entry, row)) {
|
||||||
|
LOG.debug("Can not push{}, wait", entry);
|
||||||
Thread.sleep(waitTimeMs);
|
Thread.sleep(waitTimeMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.Cell.Type;
|
import org.apache.hadoop.hbase.Cell.Type;
|
||||||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
@ -30,8 +36,10 @@ import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
@ -54,6 +62,8 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
@ -72,6 +82,8 @@ public class TestSerialReplicationChecker {
|
||||||
|
|
||||||
private static String WAL_FILE_NAME = "test.wal";
|
private static String WAL_FILE_NAME = "test.wal";
|
||||||
|
|
||||||
|
private Connection conn;
|
||||||
|
|
||||||
private SerialReplicationChecker checker;
|
private SerialReplicationChecker checker;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
@ -98,8 +110,18 @@ public class TestSerialReplicationChecker {
|
||||||
ReplicationSource source = mock(ReplicationSource.class);
|
ReplicationSource source = mock(ReplicationSource.class);
|
||||||
when(source.getPeerId()).thenReturn(PEER_ID);
|
when(source.getPeerId()).thenReturn(PEER_ID);
|
||||||
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
|
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
|
||||||
|
conn = mock(Connection.class);
|
||||||
|
when(conn.isClosed()).thenReturn(false);
|
||||||
|
doAnswer(new Answer<Table>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Table answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
}).when(conn).getTable(any(TableName.class));
|
||||||
Server server = mock(Server.class);
|
Server server = mock(Server.class);
|
||||||
when(server.getConnection()).thenReturn(UTIL.getConnection());
|
when(server.getConnection()).thenReturn(conn);
|
||||||
when(source.getServer()).thenReturn(server);
|
when(source.getServer()).thenReturn(server);
|
||||||
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
|
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
|
||||||
tableName = TableName.valueOf(name.getMethodName());
|
tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
@ -129,8 +151,10 @@ public class TestSerialReplicationChecker {
|
||||||
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
|
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
|
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
|
||||||
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
|
if (state != null) {
|
||||||
Bytes.toBytes(state.name()));
|
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
|
||||||
|
Bytes.toBytes(state.name()));
|
||||||
|
}
|
||||||
for (int i = 0; i < barriers.length; i++) {
|
for (int i = 0; i < barriers.length; i++) {
|
||||||
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
|
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
|
||||||
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
|
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
|
||||||
|
@ -154,6 +178,15 @@ public class TestSerialReplicationChecker {
|
||||||
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
|
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
|
||||||
|
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
|
||||||
|
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
|
||||||
|
MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents));
|
||||||
|
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
|
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
|
||||||
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
|
@ -173,4 +206,98 @@ public class TestSerialReplicationChecker {
|
||||||
setState(region, RegionState.State.OPENING);
|
setState(region, RegionState.State.OPENING);
|
||||||
assertFalse(checker.canPush(createEntry(region, 104), cell));
|
assertFalse(checker.canPush(createEntry(region, 104), cell));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanPushUnder() throws IOException, ReplicationException {
|
||||||
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
|
addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
|
||||||
|
updatePushedSeqId(region, 9);
|
||||||
|
Cell cell = createCell(region);
|
||||||
|
assertTrue(checker.canPush(createEntry(region, 20), cell));
|
||||||
|
verify(conn, times(1)).getTable(any(TableName.class));
|
||||||
|
// not continuous
|
||||||
|
for (int i = 22; i < 100; i += 2) {
|
||||||
|
assertTrue(checker.canPush(createEntry(region, i), cell));
|
||||||
|
}
|
||||||
|
// verify that we do not go to meta table
|
||||||
|
verify(conn, times(1)).getTable(any(TableName.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanPushIfContinuous() throws IOException, ReplicationException {
|
||||||
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
|
addStateAndBarrier(region, RegionState.State.OPEN, 10);
|
||||||
|
updatePushedSeqId(region, 9);
|
||||||
|
Cell cell = createCell(region);
|
||||||
|
assertTrue(checker.canPush(createEntry(region, 20), cell));
|
||||||
|
verify(conn, times(1)).getTable(any(TableName.class));
|
||||||
|
// continuous
|
||||||
|
for (int i = 21; i < 100; i++) {
|
||||||
|
assertTrue(checker.canPush(createEntry(region, i), cell));
|
||||||
|
}
|
||||||
|
// verify that we do not go to meta table
|
||||||
|
verify(conn, times(1)).getTable(any(TableName.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanPushAfterMerge() throws IOException, ReplicationException {
|
||||||
|
// 0xFF is the escape byte when storing region name so let's make sure it can work.
|
||||||
|
byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
|
||||||
|
RegionInfo regionA =
|
||||||
|
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
|
||||||
|
RegionInfo regionB =
|
||||||
|
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
|
||||||
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
|
||||||
|
addStateAndBarrier(regionA, null, 10, 100);
|
||||||
|
addStateAndBarrier(regionB, null, 20, 200);
|
||||||
|
addStateAndBarrier(region, RegionState.State.OPEN, 200);
|
||||||
|
addParents(region, Arrays.asList(regionA, regionB));
|
||||||
|
Cell cell = createCell(region);
|
||||||
|
// can not push since both parents have not been finished yet
|
||||||
|
assertFalse(checker.canPush(createEntry(region, 300), cell));
|
||||||
|
updatePushedSeqId(regionB, 199);
|
||||||
|
// can not push since regionA has not been finished yet
|
||||||
|
assertFalse(checker.canPush(createEntry(region, 300), cell));
|
||||||
|
updatePushedSeqId(regionA, 99);
|
||||||
|
// can push since all parents have been finished
|
||||||
|
assertTrue(checker.canPush(createEntry(region, 300), cell));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanPushAfterSplit() throws IOException, ReplicationException {
|
||||||
|
// 0xFF is the escape byte when storing region name so let's make sure it can work.
|
||||||
|
byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
|
||||||
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
|
||||||
|
RegionInfo regionA =
|
||||||
|
RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
|
||||||
|
RegionInfo regionB =
|
||||||
|
RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
|
||||||
|
addStateAndBarrier(region, null, 10, 100);
|
||||||
|
addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
|
||||||
|
addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
|
||||||
|
addParents(regionA, Arrays.asList(region));
|
||||||
|
addParents(regionB, Arrays.asList(region));
|
||||||
|
Cell cellA = createCell(regionA);
|
||||||
|
Cell cellB = createCell(regionB);
|
||||||
|
// can not push since parent has not been finished yet
|
||||||
|
assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
|
||||||
|
assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
|
||||||
|
updatePushedSeqId(region, 99);
|
||||||
|
// can push since parent has been finished
|
||||||
|
assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
|
||||||
|
assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
|
||||||
|
// For binary search, equals to an element will result to a positive value, let's test whether
|
||||||
|
// it works.
|
||||||
|
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
|
addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
|
||||||
|
Cell cell = createCell(region);
|
||||||
|
assertTrue(checker.canPush(createEntry(region, 10), cell));
|
||||||
|
assertFalse(checker.canPush(createEntry(region, 100), cell));
|
||||||
|
updatePushedSeqId(region, 99);
|
||||||
|
assertTrue(checker.canPush(createEntry(region, 100), cell));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue