HBASE-20476 Open sequence number could go backwards in AssignProcedure
This commit is contained in:
parent
ce08826f0d
commit
e8603e1d7c
|
@ -285,8 +285,9 @@ public class AssignProcedure extends RegionTransitionProcedure {
|
||||||
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
if (openSeqNum < regionNode.getOpenSeqNum()) {
|
||||||
LOG.warn("Skipping update of open seqnum with " + openSeqNum +
|
LOG.warn("Skipping update of open seqnum with " + openSeqNum +
|
||||||
" because current seqnum=" + regionNode.getOpenSeqNum());
|
" because current seqnum=" + regionNode.getOpenSeqNum());
|
||||||
}
|
} else {
|
||||||
regionNode.setOpenSeqNum(openSeqNum);
|
regionNode.setOpenSeqNum(openSeqNum);
|
||||||
|
}
|
||||||
// Leave the state here as OPENING for now. We set it to OPEN in
|
// Leave the state here as OPENING for now. We set it to OPEN in
|
||||||
// REGION_TRANSITION_FINISH section where we do a bunch of checks.
|
// REGION_TRANSITION_FINISH section where we do a bunch of checks.
|
||||||
// regionNode.setState(RegionState.State.OPEN, RegionState.State.OPENING);
|
// regionNode.setState(RegionState.State.OPEN, RegionState.State.OPENING);
|
||||||
|
|
|
@ -166,6 +166,7 @@ public class RegionStateStore {
|
||||||
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
|
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
|
||||||
hasGlobalReplicationScope(regionInfo.getTable())) {
|
hasGlobalReplicationScope(regionInfo.getTable())) {
|
||||||
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
|
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
|
||||||
|
info.append(", repBarrier=").append(openSeqNum);
|
||||||
}
|
}
|
||||||
info.append(", openSeqNum=").append(openSeqNum);
|
info.append(", openSeqNum=").append(openSeqNum);
|
||||||
info.append(", regionLocation=").append(regionLocation);
|
info.append(", regionLocation=").append(regionLocation);
|
||||||
|
|
|
@ -230,10 +230,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
|
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
|
||||||
Connection conn = env.getMasterServices().getConnection();
|
Connection conn = env.getMasterServices().getConnection();
|
||||||
if (!needSetLastPushedSequenceId(tsm, tableName)) {
|
if (!needSetLastPushedSequenceId(tsm, tableName)) {
|
||||||
|
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (Pair<String, Long> name2Barrier : MetaTableAccessor
|
for (Pair<String, Long> name2Barrier : MetaTableAccessor
|
||||||
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
|
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
|
||||||
|
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
|
||||||
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
|
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
|
||||||
queueStorage);
|
queueStorage);
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,6 +119,7 @@ public class SerialReplicationTestBase {
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
|
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
|
||||||
UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000);
|
UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000);
|
||||||
|
UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100);
|
||||||
UTIL.startMiniCluster(3);
|
UTIL.startMiniCluster(3);
|
||||||
// disable balancer
|
// disable balancer
|
||||||
UTIL.getAdmin().balancerSwitch(false, true);
|
UTIL.getAdmin().balancerSwitch(false, true);
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class TestReplicationBase {
|
||||||
protected static final int NB_ROWS_IN_BIG_BATCH =
|
protected static final int NB_ROWS_IN_BIG_BATCH =
|
||||||
NB_ROWS_IN_BATCH * 10;
|
NB_ROWS_IN_BATCH * 10;
|
||||||
protected static final long SLEEP_TIME = 500;
|
protected static final long SLEEP_TIME = 500;
|
||||||
protected static final int NB_RETRIES = 10;
|
protected static final int NB_RETRIES = 50;
|
||||||
|
|
||||||
protected static final TableName tableName = TableName.valueOf("test");
|
protected static final TableName tableName = TableName.valueOf("test");
|
||||||
protected static final byte[] famName = Bytes.toBytes("f");
|
protected static final byte[] famName = Bytes.toBytes("f");
|
||||||
|
@ -195,6 +195,7 @@ public class TestReplicationBase {
|
||||||
conf1.setInt("replication.source.maxretriesmultiplier", 10);
|
conf1.setInt("replication.source.maxretriesmultiplier", 10);
|
||||||
conf1.setFloat("replication.source.ratio", 1.0f);
|
conf1.setFloat("replication.source.ratio", 1.0f);
|
||||||
conf1.setBoolean("replication.source.eof.autorecovery", true);
|
conf1.setBoolean("replication.source.eof.autorecovery", true);
|
||||||
|
conf1.setLong("hbase.serial.replication.waiting.ms", 100);
|
||||||
|
|
||||||
// Parameter config
|
// Parameter config
|
||||||
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
|
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
|
||||||
|
|
Loading…
Reference in New Issue