HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
Signed-off-by: Xin Sun <ddupgs@gmail.com> Reviewed-by: GeorryHuang <huangzhuoyue@apache.org>
This commit is contained in:
parent
9f6dfe6d3f
commit
fa37aed8f6
|
@ -147,6 +147,16 @@ public abstract class AsyncRpcRetryingCaller<T> {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sub classes can override this method to change the error type, to control the retry logic.
|
||||||
|
// For example, during rolling upgrading, if we call this newly added method, we will get a
|
||||||
|
// UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to
|
||||||
|
// use the old method first, so the sub class could change the exception type to something not a
|
||||||
|
// DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to
|
||||||
|
// make the rpc call.
|
||||||
|
protected Throwable preProcessError(Throwable error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
protected final void onError(Throwable t, Supplier<String> errMsg,
|
protected final void onError(Throwable t, Supplier<String> errMsg,
|
||||||
Consumer<Throwable> updateCachedLocation) {
|
Consumer<Throwable> updateCachedLocation) {
|
||||||
if (future.isDone()) {
|
if (future.isDone()) {
|
||||||
|
@ -156,7 +166,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
|
||||||
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
|
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Throwable error = translateException(t);
|
Throwable error = preProcessError(translateException(t));
|
||||||
// We use this retrying caller to open a scanner, as it is idempotent, but we may throw
|
// We use this retrying caller to open a scanner, as it is idempotent, but we may throw
|
||||||
// ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
|
// ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
|
||||||
// also fetch data when opening a scanner. The intention here is that if we hit a
|
// also fetch data when opening a scanner. The intention here is that if we hit a
|
||||||
|
|
|
@ -56,6 +56,9 @@ public final class ReplicationUtils {
|
||||||
// since some FileSystem implementation may not support atomic rename.
|
// since some FileSystem implementation may not support atomic rename.
|
||||||
public static final String RENAME_WAL_SUFFIX = ".ren";
|
public static final String RENAME_WAL_SUFFIX = ".ren";
|
||||||
|
|
||||||
|
public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME =
|
||||||
|
"org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
|
||||||
|
|
||||||
private ReplicationUtils() {
|
private ReplicationUtils() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -43,6 +44,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
|
||||||
|
|
||||||
private final Entry[] entries;
|
private final Entry[] entries;
|
||||||
|
|
||||||
|
// whether to use replay instead of replicateToReplica, during rolling upgrading if the target
|
||||||
|
// region server has not been upgraded then it will not have the replicateToReplica method, so we
|
||||||
|
// could use replay method first, though it is not perfect.
|
||||||
|
private boolean useReplay;
|
||||||
|
|
||||||
public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
|
public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
|
||||||
AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
|
AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
|
||||||
RegionInfo replica, List<Entry> entries) {
|
RegionInfo replica, List<Entry> entries) {
|
||||||
|
@ -53,6 +59,27 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
|
||||||
this.entries = entries.toArray(new Entry[0]);
|
this.entries = entries.toArray(new Entry[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Throwable preProcessError(Throwable error) {
|
||||||
|
if (error instanceof DoNotRetryIOException &&
|
||||||
|
error.getCause() instanceof UnsupportedOperationException) {
|
||||||
|
// fallback to use replay, and also return the cause to let the upper retry
|
||||||
|
useReplay = true;
|
||||||
|
return error.getCause();
|
||||||
|
}
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onComplete(HRegionLocation loc) {
|
||||||
|
if (controller.failed()) {
|
||||||
|
onError(controller.getFailed(),
|
||||||
|
() -> "Call to " + loc.getServerName() + " for " + replica + " failed",
|
||||||
|
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
|
||||||
|
} else {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void call(HRegionLocation loc) {
|
private void call(HRegionLocation loc) {
|
||||||
AdminService.Interface stub;
|
AdminService.Interface stub;
|
||||||
try {
|
try {
|
||||||
|
@ -67,15 +94,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
|
||||||
.buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
|
.buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
|
||||||
resetCallTimeout();
|
resetCallTimeout();
|
||||||
controller.setCellScanner(pair.getSecond());
|
controller.setCellScanner(pair.getSecond());
|
||||||
stub.replicateToReplica(controller, pair.getFirst(), r -> {
|
if (useReplay) {
|
||||||
if (controller.failed()) {
|
stub.replay(controller, pair.getFirst(), r -> onComplete(loc));
|
||||||
onError(controller.getFailed(),
|
|
||||||
() -> "Call to " + loc.getServerName() + " for " + replica + " failed",
|
|
||||||
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
|
|
||||||
} else {
|
} else {
|
||||||
future.complete(null);
|
stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc));
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
@ -69,6 +71,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeerManager {
|
public class ReplicationPeerManager {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
|
||||||
|
|
||||||
private final ReplicationPeerStorage peerStorage;
|
private final ReplicationPeerStorage peerStorage;
|
||||||
|
|
||||||
private final ReplicationQueueStorage queueStorage;
|
private final ReplicationQueueStorage queueStorage;
|
||||||
|
@ -546,7 +550,13 @@ public class ReplicationPeerManager {
|
||||||
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
|
||||||
for (String peerId : peerStorage.listPeerIds()) {
|
for (String peerId : peerStorage.listPeerIds()) {
|
||||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||||
|
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
|
||||||
|
.equals(peerConfig.getReplicationEndpointImpl())) {
|
||||||
|
// we do not use this endpoint for region replication any more, see HBASE-26233
|
||||||
|
LOG.warn("Legacy region replication peer found, removing: {}", peerConfig);
|
||||||
|
peerStorage.removePeer(peerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
|
peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
|
||||||
peerStorage.updatePeerConfig(peerId, peerConfig);
|
peerStorage.updatePeerConfig(peerId, peerConfig);
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
|
|
|
@ -342,8 +342,14 @@ public class ReplicationSourceManager {
|
||||||
* @param peerId the id of the replication peer
|
* @param peerId the id of the replication peer
|
||||||
* @return the source that was created
|
* @return the source that was created
|
||||||
*/
|
*/
|
||||||
ReplicationSourceInterface addSource(String peerId) throws IOException {
|
void addSource(String peerId) throws IOException {
|
||||||
ReplicationPeer peer = replicationPeers.getPeer(peerId);
|
ReplicationPeer peer = replicationPeers.getPeer(peerId);
|
||||||
|
if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
|
||||||
|
.equals(peer.getPeerConfig().getReplicationEndpointImpl())) {
|
||||||
|
// we do not use this endpoint for region replication any more, see HBASE-26233
|
||||||
|
LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
|
||||||
|
return;
|
||||||
|
}
|
||||||
ReplicationSourceInterface src = createSource(peerId, peer);
|
ReplicationSourceInterface src = createSource(peerId, peer);
|
||||||
// synchronized on latestPaths to avoid missing the new log
|
// synchronized on latestPaths to avoid missing the new log
|
||||||
synchronized (this.latestPaths) {
|
synchronized (this.latestPaths) {
|
||||||
|
@ -370,7 +376,6 @@ public class ReplicationSourceManager {
|
||||||
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
|
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
|
||||||
}
|
}
|
||||||
src.startup();
|
src.startup();
|
||||||
return src;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,129 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure we could fallback to use replay method if replicateToReplica method is not present,
|
||||||
|
* i.e, we are connecting an old region server.
|
||||||
|
*/
|
||||||
|
@Category({ RegionServerTests.class, SmallTests.class })
|
||||||
|
public class TestFallbackToUseReplay {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestFallbackToUseReplay.class);
|
||||||
|
|
||||||
|
private static Configuration CONF = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
private static AsyncClusterConnectionImpl CONN;
|
||||||
|
|
||||||
|
private static AsyncRegionReplicationRetryingCaller CALLER;
|
||||||
|
|
||||||
|
private static RegionInfo REPLICA =
|
||||||
|
RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build();
|
||||||
|
|
||||||
|
private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws IOException {
|
||||||
|
CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||||
|
AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
|
||||||
|
when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong()))
|
||||||
|
.thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA,
|
||||||
|
ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()))));
|
||||||
|
AdminService.Interface stub = mock(AdminService.Interface.class);
|
||||||
|
// fail the call to replicateToReplica
|
||||||
|
doAnswer(i -> {
|
||||||
|
HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class);
|
||||||
|
controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException()));
|
||||||
|
RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
|
||||||
|
done.run(null);
|
||||||
|
return null;
|
||||||
|
}).when(stub).replicateToReplica(any(), any(), any());
|
||||||
|
doAnswer(i -> {
|
||||||
|
REPLAY_CALLED.set(true);
|
||||||
|
RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
|
||||||
|
done.run(null);
|
||||||
|
return null;
|
||||||
|
}).when(stub).replay(any(), any(), any());
|
||||||
|
CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null,
|
||||||
|
User.getCurrent()) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
AsyncRegionLocator getLocator() {
|
||||||
|
return locator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Interface getAdminStub(ServerName serverName) throws IOException {
|
||||||
|
return stub;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN,
|
||||||
|
10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA,
|
||||||
|
Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws IOException {
|
||||||
|
Closeables.close(CONN, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFallback() {
|
||||||
|
CALLER.call().join();
|
||||||
|
assertTrue(REPLAY_CALLED.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.regionreplication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure we could start the cluster with RegionReplicaReplicationEndpoint configured.
|
||||||
|
*/
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestStartupWithLegacyRegionReplicationEndpoint {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestStartupWithLegacyRegionReplicationEndpoint.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
|
||||||
|
.setClusterKey("127.0.0.1:2181:/hbase")
|
||||||
|
.setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
|
||||||
|
// can not use Admin.addPeer as it will fail with ClassNotFound
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig,
|
||||||
|
true);
|
||||||
|
UTIL.getMiniHBaseCluster().stopRegionServer(0);
|
||||||
|
RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer();
|
||||||
|
// we should still have this peer
|
||||||
|
assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy"));
|
||||||
|
// but at RS side, we should not have this peer loaded as replication source
|
||||||
|
assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager()
|
||||||
|
.getSources().isEmpty());
|
||||||
|
UTIL.shutdownMiniHBaseCluster();
|
||||||
|
UTIL.restartHBaseCluster(1);
|
||||||
|
// now we should have removed the peer
|
||||||
|
assertThrows(ReplicationPeerNotFoundException.class,
|
||||||
|
() -> UTIL.getAdmin().getReplicationPeerConfig("legacy"));
|
||||||
|
// at rs side, we should not have the peer this time, not only for not having replication source
|
||||||
|
assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService()
|
||||||
|
.getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue