HBASE-27213 Add support for claim queue operation (#4708)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
parent
e2d8db6dfc
commit
f81bdebedb
|
@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {
|
||||||
|
|
||||||
message RemovePeerStateData {
|
message RemovePeerStateData {
|
||||||
optional ReplicationPeer peer_config = 1;
|
optional ReplicationPeer peer_config = 1;
|
||||||
|
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message EnablePeerStateData {
|
message EnablePeerStateData {
|
||||||
|
@ -714,9 +715,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
|
||||||
}
|
}
|
||||||
|
|
||||||
enum AssignReplicationQueuesState {
|
enum AssignReplicationQueuesState {
|
||||||
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
|
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
|
||||||
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
|
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
|
||||||
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message AssignReplicationQueuesStateData {
|
message AssignReplicationQueuesStateData {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
|
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
|
||||||
|
@ -102,8 +103,12 @@ public class AssignReplicationQueuesProcedure
|
||||||
}
|
}
|
||||||
|
|
||||||
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
|
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
|
||||||
|
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
|
||||||
|
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
|
||||||
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
|
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
|
||||||
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
|
// filter out replication queue for deleted peers
|
||||||
|
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
|
||||||
|
.filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
|
||||||
if (queueIds.isEmpty()) {
|
if (queueIds.isEmpty()) {
|
||||||
LOG.debug("Finish claiming replication queues for {}", crashedServer);
|
LOG.debug("Finish claiming replication queues for {}", crashedServer);
|
||||||
// we are done
|
// we are done
|
||||||
|
@ -130,10 +135,6 @@ public class AssignReplicationQueuesProcedure
|
||||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
try {
|
try {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case ASSIGN_REPLICATION_QUEUES_PRE_CHECK:
|
|
||||||
// TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure
|
|
||||||
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES);
|
|
||||||
return Flow.HAS_MORE_STATE;
|
|
||||||
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
|
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
|
||||||
addMissingQueues(env);
|
addMissingQueues(env);
|
||||||
retryCounter = null;
|
retryCounter = null;
|
||||||
|
@ -183,7 +184,7 @@ public class AssignReplicationQueuesProcedure
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AssignReplicationQueuesState getInitialState() {
|
protected AssignReplicationQueuesState getInitialState() {
|
||||||
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK;
|
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -74,7 +74,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
||||||
* update the peer storage.
|
* update the peer storage.
|
||||||
*/
|
*/
|
||||||
protected abstract void postPeerModification(MasterProcedureEnv env)
|
protected abstract void postPeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException;
|
throws IOException, ReplicationException, ProcedureSuspendedException;
|
||||||
|
|
||||||
protected void releaseLatch(MasterProcedureEnv env) {
|
protected void releaseLatch(MasterProcedureEnv env) {
|
||||||
ProcedurePrepareLatch.releaseLatch(latch, this);
|
ProcedurePrepareLatch.releaseLatch(latch, this);
|
||||||
|
|
|
@ -18,10 +18,17 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
private ReplicationPeerConfig peerConfig;
|
private ReplicationPeerConfig peerConfig;
|
||||||
|
|
||||||
|
private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();
|
||||||
|
|
||||||
public RemovePeerProcedure() {
|
public RemovePeerProcedure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,15 +73,43 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||||
@Override
|
@Override
|
||||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||||
env.getReplicationPeerManager().removePeer(peerId);
|
env.getReplicationPeerManager().removePeer(peerId);
|
||||||
|
// record ongoing AssignReplicationQueuesProcedures after we update the peer storage
|
||||||
|
ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
|
||||||
|
.getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
|
||||||
|
.filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
|
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
|
||||||
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
|
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
|
||||||
|
LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
|
||||||
|
peerId);
|
||||||
|
}
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||||
|
env.getMasterServices().getMasterProcedureExecutor();
|
||||||
|
long[] unfinishedProcIds =
|
||||||
|
ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
|
||||||
|
.filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
|
||||||
|
if (unfinishedProcIds.length == 0) {
|
||||||
|
LOG.info(
|
||||||
|
"All assign replication queues procedures are finished when removing peer {}, move on",
|
||||||
|
peerId);
|
||||||
|
} else {
|
||||||
|
throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
|
||||||
|
"There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
|
||||||
|
unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env)
|
protected void postPeerModification(MasterProcedureEnv env)
|
||||||
throws IOException, ReplicationException {
|
throws IOException, ReplicationException, ProcedureSuspendedException {
|
||||||
|
checkAssignReplicationQueuesFinished(env);
|
||||||
|
|
||||||
if (peerConfig.isSyncReplication()) {
|
if (peerConfig.isSyncReplication()) {
|
||||||
removeRemoteWALs(env);
|
removeRemoteWALs(env);
|
||||||
}
|
}
|
||||||
|
@ -94,6 +131,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||||
if (peerConfig != null) {
|
if (peerConfig != null) {
|
||||||
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
||||||
}
|
}
|
||||||
|
builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
|
||||||
serializer.serialize(builder.build());
|
serializer.serialize(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,5 +142,6 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||||
if (data.hasPeerConfig()) {
|
if (data.hasPeerConfig()) {
|
||||||
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
|
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
|
||||||
}
|
}
|
||||||
|
ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,7 +237,7 @@ public class ReplicationSourceManager {
|
||||||
*/
|
*/
|
||||||
void init() throws IOException {
|
void init() throws IOException {
|
||||||
for (String id : this.replicationPeers.getAllPeerIds()) {
|
for (String id : this.replicationPeers.getAllPeerIds()) {
|
||||||
addSource(id);
|
addSource(id, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,7 +257,7 @@ public class ReplicationSourceManager {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
if (added) {
|
if (added) {
|
||||||
addSource(peerId);
|
addSource(peerId, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,11 +323,16 @@ public class ReplicationSourceManager {
|
||||||
/**
|
/**
|
||||||
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
|
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
|
||||||
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
|
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
|
||||||
* group and do replication
|
* group and do replication.
|
||||||
|
* <p/>
|
||||||
|
* We add a {@code init} parameter to indicate whether this is part of the initialization process.
|
||||||
|
* If so, we should skip adding the replication queues as this may introduce dead lock on region
|
||||||
|
* server start up and hbase:replication table online.
|
||||||
* @param peerId the id of the replication peer
|
* @param peerId the id of the replication peer
|
||||||
|
* @param init whether this call is part of the initialization process
|
||||||
* @return the source that was created
|
* @return the source that was created
|
||||||
*/
|
*/
|
||||||
void addSource(String peerId) throws IOException {
|
void addSource(String peerId, boolean init) throws IOException {
|
||||||
ReplicationPeer peer = replicationPeers.getPeer(peerId);
|
ReplicationPeer peer = replicationPeers.getPeer(peerId);
|
||||||
if (
|
if (
|
||||||
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
|
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
|
||||||
|
@ -352,11 +357,16 @@ public class ReplicationSourceManager {
|
||||||
NavigableSet<String> wals = new TreeSet<>();
|
NavigableSet<String> wals = new TreeSet<>();
|
||||||
wals.add(walPath.getName());
|
wals.add(walPath.getName());
|
||||||
walsByGroup.put(walPrefixAndPath.getKey(), wals);
|
walsByGroup.put(walPrefixAndPath.getKey(), wals);
|
||||||
// Abort RS and throw exception to make add peer failed
|
if (!init) {
|
||||||
// TODO: can record the length of the current wal file so we could replicate less data
|
// Abort RS and throw exception to make add peer failed
|
||||||
abortAndThrowIOExceptionWhenFail(
|
// Ideally we'd better use the current file size as offset so we can skip replicating
|
||||||
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
|
// the data before adding replication peer, but the problem is that the file may not end
|
||||||
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
|
// at a valid entry's ending, and the current WAL Reader implementation can not deal
|
||||||
|
// with reading from the middle of a WAL entry. Can improve later.
|
||||||
|
abortAndThrowIOExceptionWhenFail(
|
||||||
|
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
|
||||||
|
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
|
||||||
|
}
|
||||||
src.enqueueLog(walPath);
|
src.enqueueLog(walPath);
|
||||||
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
|
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
|
||||||
}
|
}
|
||||||
|
@ -795,9 +805,15 @@ public class ReplicationSourceManager {
|
||||||
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
|
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
|
||||||
*/
|
*/
|
||||||
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
|
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
|
||||||
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
|
// skip replicating meta wals
|
||||||
|
if (AbstractFSWALProvider.isMetaFile(wal)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// if no offset or the offset is just a place marker, replicate
|
||||||
|
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// otherwise, compare the timestamp
|
||||||
long walTs = AbstractFSWALProvider.getTimestamp(wal);
|
long walTs = AbstractFSWALProvider.getTimestamp(wal);
|
||||||
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
|
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
|
||||||
if (walTs < startWalTs) {
|
if (walTs < startWalTs) {
|
||||||
|
@ -892,7 +908,6 @@ public class ReplicationSourceManager {
|
||||||
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
|
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
|
||||||
groupOffset);
|
groupOffset);
|
||||||
}
|
}
|
||||||
walFilesPQ.add(file);
|
|
||||||
}
|
}
|
||||||
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
|
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
|
||||||
// mistake, we should use the sorted walFilesPQ instead
|
// mistake, we should use the sorted walFilesPQ instead
|
||||||
|
|
|
@ -156,7 +156,7 @@ public class TestClaimReplicationQueue extends TestReplicationBase {
|
||||||
hbaseAdmin.enableReplicationPeer(PEER_ID3);
|
hbaseAdmin.enableReplicationPeer(PEER_ID3);
|
||||||
|
|
||||||
EMPTY = false;
|
EMPTY = false;
|
||||||
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
|
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
|
||||||
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
|
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
|
||||||
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
|
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
|
import static org.hamcrest.Matchers.*;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionServerList;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||||
|
import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
|
||||||
|
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure we will wait until all the SCPs finished in RemovePeerProcedure.
|
||||||
|
* <p/>
|
||||||
|
* See HBASE-27109 for more details.
|
||||||
|
*/
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestRemovePeerProcedureWaitForSCP extends TestReplicationBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRemovePeerProcedureWaitForSCP.class);
|
||||||
|
|
||||||
|
private static final TableName tableName3 = TableName.valueOf("test3");
|
||||||
|
|
||||||
|
private static final String PEER_ID3 = "3";
|
||||||
|
|
||||||
|
private static Table table3;
|
||||||
|
|
||||||
|
private static volatile boolean EMPTY = false;
|
||||||
|
|
||||||
|
public static final class ServerManagerForTest extends ServerManager {
|
||||||
|
|
||||||
|
public ServerManagerForTest(MasterServices master, RegionServerList storage) {
|
||||||
|
super(master, storage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ServerName> getOnlineServersList() {
|
||||||
|
// return no region server to make the procedure hang
|
||||||
|
if (EMPTY) {
|
||||||
|
for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
|
||||||
|
if (e.getClassName().equals(AssignReplicationQueuesProcedure.class.getName())) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.getOnlineServersList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class HMasterForTest extends HMaster {
|
||||||
|
|
||||||
|
public HMasterForTest(Configuration conf) throws IOException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
|
||||||
|
throws IOException {
|
||||||
|
setupClusterConnection();
|
||||||
|
return new ServerManagerForTest(master, storage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
|
||||||
|
TestReplicationBase.setUpBeforeClass();
|
||||||
|
createTable(tableName3);
|
||||||
|
table3 = connection1.getTable(tableName3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUpBase() throws Exception {
|
||||||
|
super.setUpBase();
|
||||||
|
// set up two replication peers and only 1 rs to test claim replication queue with multiple
|
||||||
|
// round
|
||||||
|
addPeer(PEER_ID3, tableName3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void tearDownBase() throws Exception {
|
||||||
|
super.tearDownBase();
|
||||||
|
removePeer(PEER_ID3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
Closeables.close(table3, true);
|
||||||
|
TestReplicationBase.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWait() throws Exception {
|
||||||
|
// disable the peers
|
||||||
|
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||||
|
hbaseAdmin.disableReplicationPeer(PEER_ID3);
|
||||||
|
|
||||||
|
// put some data
|
||||||
|
UTIL1.loadTable(htable1, famName);
|
||||||
|
UTIL1.loadTable(table3, famName);
|
||||||
|
|
||||||
|
EMPTY = true;
|
||||||
|
UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
|
||||||
|
UTIL1.getMiniHBaseCluster().startRegionServer();
|
||||||
|
|
||||||
|
// since there is no active region server to get the replication queue, the procedure should be
|
||||||
|
// in WAITING_TIMEOUT state for most time to retry
|
||||||
|
HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
|
||||||
|
UTIL1.waitFor(30000,
|
||||||
|
() -> master.getProcedures().stream()
|
||||||
|
.filter(p -> p instanceof AssignReplicationQueuesProcedure)
|
||||||
|
.anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
|
||||||
|
|
||||||
|
// call remove replication peer, and make sure it will be stuck in the POST_PEER_MODIFICATION
|
||||||
|
// state.
|
||||||
|
hbaseAdmin.removeReplicationPeerAsync(PEER_ID3);
|
||||||
|
UTIL1.waitFor(30000,
|
||||||
|
() -> master.getProcedures().stream().filter(p -> p instanceof RemovePeerProcedure)
|
||||||
|
.anyMatch(p -> ((RemovePeerProcedure) p).getCurrentStateId()
|
||||||
|
== PeerModificationState.POST_PEER_MODIFICATION_VALUE));
|
||||||
|
Thread.sleep(5000);
|
||||||
|
assertEquals(PeerModificationState.POST_PEER_MODIFICATION_VALUE,
|
||||||
|
((RemovePeerProcedure) master.getProcedures().stream()
|
||||||
|
.filter(p -> p instanceof RemovePeerProcedure).findFirst().get()).getCurrentStateId());
|
||||||
|
EMPTY = false;
|
||||||
|
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
|
||||||
|
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
|
||||||
|
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
|
||||||
|
// the RemovePeerProcedure should have also finished
|
||||||
|
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
|
||||||
|
.filter(p -> p instanceof RemovePeerProcedure).allMatch(Procedure::isSuccess));
|
||||||
|
// make sure there is no remaining replication queues for PEER_ID3
|
||||||
|
assertThat(master.getReplicationPeerManager().getQueueStorage().listAllQueueIds(PEER_ID3),
|
||||||
|
empty());
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,12 +32,9 @@ import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
// revisit later when we reviewing the implementation for serial replication
|
|
||||||
@Ignore
|
|
||||||
@Category({ ReplicationTests.class, MediumTests.class })
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
public class TestSerialReplicationFailover extends SerialReplicationTestBase {
|
public class TestSerialReplicationFailover extends SerialReplicationTestBase {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue