HBASE-27806 Support dynamic reinitializing replication peer storage (#5195)
Signed-off-by: Liangjun He <heliangjun@apache.org>
(cherry picked from commit 18ae733b15
)
This commit is contained in:
parent
e9ffc1b07e
commit
fe2992a312
|
@ -23,12 +23,13 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeerImpl implements ReplicationPeer {
|
public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver {
|
||||||
|
|
||||||
private final Configuration conf;
|
private volatile Configuration conf;
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
|
|
||||||
|
@ -122,4 +123,9 @@ public class ReplicationPeerImpl implements ReplicationPeer {
|
||||||
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
|
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
|
||||||
this.peerConfigListeners.add(listener);
|
this.peerConfigListeners.add(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,25 +24,38 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
|
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
|
||||||
* clusters that data is replicated to.
|
* clusters that data is replicated to.
|
||||||
|
* <p>
|
||||||
|
* We implement {@link ConfigurationObserver} mainly for recreating the
|
||||||
|
* {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
|
||||||
|
* restarting the region server.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeers {
|
public class ReplicationPeers implements ConfigurationObserver {
|
||||||
|
|
||||||
private final Configuration conf;
|
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
|
||||||
|
|
||||||
|
private volatile Configuration conf;
|
||||||
|
|
||||||
// Map of peer clusters keyed by their id
|
// Map of peer clusters keyed by their id
|
||||||
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
|
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
|
||||||
private final ReplicationPeerStorage peerStorage;
|
private final FileSystem fs;
|
||||||
|
private final ZKWatcher zookeeper;
|
||||||
|
private volatile ReplicationPeerStorage peerStorage;
|
||||||
|
|
||||||
ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
|
ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
this.fs = fs;
|
||||||
|
this.zookeeper = zookeeper;
|
||||||
this.peerCache = new ConcurrentHashMap<>();
|
this.peerCache = new ConcurrentHashMap<>();
|
||||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
|
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
|
||||||
}
|
}
|
||||||
|
@ -134,4 +147,18 @@ public class ReplicationPeers {
|
||||||
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
|
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
|
||||||
peerId, enabled, peerConfig);
|
peerId, enabled, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
|
||||||
|
for (ReplicationPeerImpl peer : peerCache.values()) {
|
||||||
|
try {
|
||||||
|
peer.onConfigurationChange(
|
||||||
|
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableState;
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
|
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
|
||||||
|
@ -770,6 +769,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
|
|
||||||
this.replicationPeerManager =
|
this.replicationPeerManager =
|
||||||
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
|
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
|
||||||
|
this.configurationManager.registerObserver(replicationPeerManager);
|
||||||
this.replicationPeerModificationStateStore =
|
this.replicationPeerModificationStateStore =
|
||||||
new ReplicationPeerModificationStateStore(masterRegion);
|
new ReplicationPeerModificationStateStore(masterRegion);
|
||||||
|
|
||||||
|
@ -4235,12 +4235,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
|
||||||
allowedOnPath = ".*/src/test/.*")
|
|
||||||
public ConfigurationManager getConfigurationManager() {
|
|
||||||
return configurationManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setQuotasObserver(Configuration conf) {
|
private void setQuotasObserver(Configuration conf) {
|
||||||
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
||||||
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
|
@ -59,11 +60,14 @@ import org.apache.zookeeper.KeeperException;
|
||||||
* Manages and performs all replication admin operations.
|
* Manages and performs all replication admin operations.
|
||||||
* <p>
|
* <p>
|
||||||
* Used to add/remove a replication peer.
|
* Used to add/remove a replication peer.
|
||||||
|
* <p>
|
||||||
|
* Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
|
||||||
|
* supporting migrating across different replication peer storages without restarting master.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeerManager {
|
public class ReplicationPeerManager implements ConfigurationObserver {
|
||||||
|
|
||||||
private final ReplicationPeerStorage peerStorage;
|
private volatile ReplicationPeerStorage peerStorage;
|
||||||
|
|
||||||
private final ReplicationQueueStorage queueStorage;
|
private final ReplicationQueueStorage queueStorage;
|
||||||
|
|
||||||
|
@ -71,10 +75,18 @@ public class ReplicationPeerManager {
|
||||||
|
|
||||||
private final String clusterId;
|
private final String clusterId;
|
||||||
|
|
||||||
private final Configuration conf;
|
private volatile Configuration conf;
|
||||||
|
|
||||||
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
// for dynamic recreating ReplicationPeerStorage.
|
||||||
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
private final ZKWatcher zk;
|
||||||
|
|
||||||
|
ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
|
||||||
|
ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
|
||||||
|
Configuration conf, String clusterId) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.zk = zk;
|
||||||
this.peerStorage = peerStorage;
|
this.peerStorage = peerStorage;
|
||||||
this.queueStorage = queueStorage;
|
this.queueStorage = queueStorage;
|
||||||
this.peers = peers;
|
this.peers = peers;
|
||||||
|
@ -426,7 +438,7 @@ public class ReplicationPeerManager {
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
||||||
}
|
}
|
||||||
return new ReplicationPeerManager(peerStorage,
|
return new ReplicationPeerManager(fs, zk, peerStorage,
|
||||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
|
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,4 +452,10 @@ public class ReplicationPeerManager {
|
||||||
}
|
}
|
||||||
return s1.equals(s2);
|
return s1.equals(s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
|
||||||
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
|
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
|
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.api.trace.StatusCode;
|
import io.opentelemetry.api.trace.StatusCode;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
|
@ -2366,6 +2367,17 @@ public class HRegionServer extends Thread
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerConfigurationObservers() {
|
private void registerConfigurationObservers() {
|
||||||
|
// Register Replication if possible, as now we support recreating replication peer storage, for
|
||||||
|
// migrating across different replication peer storages online
|
||||||
|
if (replicationSourceHandler instanceof ConfigurationObserver) {
|
||||||
|
configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
replicationSourceHandler != replicationSinkHandler
|
||||||
|
&& replicationSinkHandler instanceof ConfigurationObserver
|
||||||
|
) {
|
||||||
|
configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
|
||||||
|
}
|
||||||
// Registering the compactSplitThread object with the ConfigurationManager.
|
// Registering the compactSplitThread object with the ConfigurationManager.
|
||||||
configurationManager.registerObserver(this.compactSplitThread);
|
configurationManager.registerObserver(this.compactSplitThread);
|
||||||
configurationManager.registerObserver(this.rpcServices);
|
configurationManager.registerObserver(this.rpcServices);
|
||||||
|
@ -3821,8 +3833,9 @@ public class HRegionServer extends Thread
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns : Returns the ConfigurationManager object for testing purposes. */
|
/** Returns : Returns the ConfigurationManager object for testing purposes. */
|
||||||
@InterfaceAudience.Private
|
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||||
ConfigurationManager getConfigurationManager() {
|
allowedOnPath = ".*/src/test/.*")
|
||||||
|
public ConfigurationManager getConfigurationManager() {
|
||||||
return configurationManager;
|
return configurationManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
|
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
|
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
|
||||||
|
@ -56,15 +58,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
|
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
|
||||||
|
* <p>
|
||||||
|
* Implement {@link PropagatingConfigurationObserver} mainly for registering
|
||||||
|
* {@link ReplicationPeers}, so we can recreating the replication peer storage.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Replication implements ReplicationSourceService, ReplicationSinkService {
|
public class Replication
|
||||||
|
implements ReplicationSourceService, ReplicationSinkService, PropagatingConfigurationObserver {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
|
||||||
private boolean isReplicationForBulkLoadDataEnabled;
|
private boolean isReplicationForBulkLoadDataEnabled;
|
||||||
private ReplicationSourceManager replicationManager;
|
private ReplicationSourceManager replicationManager;
|
||||||
private ReplicationQueueStorage queueStorage;
|
private ReplicationQueueStorage queueStorage;
|
||||||
private ReplicationPeers replicationPeers;
|
private ReplicationPeers replicationPeers;
|
||||||
private Configuration conf;
|
private volatile Configuration conf;
|
||||||
private ReplicationSink replicationSink;
|
private ReplicationSink replicationSink;
|
||||||
// Hosting server
|
// Hosting server
|
||||||
private Server server;
|
private Server server;
|
||||||
|
@ -262,4 +268,19 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
||||||
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
|
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
|
||||||
this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
|
this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerChildren(ConfigurationManager manager) {
|
||||||
|
manager.registerObserver(replicationPeers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deregisterChildren(ConfigurationManager manager) {
|
||||||
|
manager.deregisterObserver(replicationPeers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestMigrateRepliationPeerStorageOnline {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
// use zookeeper first, and then migrate to filesystem
|
||||||
|
UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||||
|
ReplicationPeerStorageType.ZOOKEEPER.name());
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMigrate() throws Exception {
|
||||||
|
Admin admin = UTIL.getAdmin();
|
||||||
|
ReplicationPeerConfig rpc =
|
||||||
|
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
|
||||||
|
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
|
||||||
|
admin.addReplicationPeer("1", rpc);
|
||||||
|
|
||||||
|
// disable peer modification
|
||||||
|
admin.replicationPeerModificationSwitch(false, true);
|
||||||
|
|
||||||
|
// migrate replication peer data
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
|
||||||
|
new String[] { "zookeeper", "filesystem" }));
|
||||||
|
conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||||
|
ReplicationPeerStorageType.FILESYSTEM.name());
|
||||||
|
// confirm that we have copied the data
|
||||||
|
ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
|
||||||
|
.getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf);
|
||||||
|
assertNotNull(fsPeerStorage.getPeerConfig("1"));
|
||||||
|
|
||||||
|
for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||||
|
Configuration newConf = new Configuration(mt.getMaster().getConfiguration());
|
||||||
|
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||||
|
ReplicationPeerStorageType.FILESYSTEM.name());
|
||||||
|
mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
|
||||||
|
}
|
||||||
|
for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||||
|
Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration());
|
||||||
|
newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
|
||||||
|
ReplicationPeerStorageType.FILESYSTEM.name());
|
||||||
|
rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
admin.replicationPeerModificationSwitch(true);
|
||||||
|
admin.removeReplicationPeer("1");
|
||||||
|
|
||||||
|
// confirm that we will operation on the new peer storage
|
||||||
|
assertThat(fsPeerStorage.listPeerIds(), empty());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue