From fe2992a31251ec1490df72773ff1c75bcb398ede Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sun, 23 Apr 2023 22:25:31 +0800 Subject: [PATCH] HBASE-27806 Support dynamic reinitializing replication peer storage (#5195) Signed-off-by: Liangjun He (cherry picked from commit 18ae733b15ae2bc316f41af1a46e5619d2b35fe2) --- .../replication/ReplicationPeerImpl.java | 10 +- .../hbase/replication/ReplicationPeers.java | 33 +++++- .../apache/hadoop/hbase/master/HMaster.java | 8 +- .../replication/ReplicationPeerManager.java | 30 ++++- .../hbase/regionserver/HRegionServer.java | 17 ++- .../replication/regionserver/Replication.java | 25 ++++- ...estMigrateRepliationPeerStorageOnline.java | 104 ++++++++++++++++++ 7 files changed, 205 insertions(+), 22 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 1bcc667fcce..2392d620597 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -23,12 +23,13 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class ReplicationPeerImpl implements ReplicationPeer { +public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver { - private final Configuration conf; + private volatile Configuration conf; private final String id; @@ -122,4 +123,9 @@ public class ReplicationPeerImpl implements ReplicationPeer { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { this.peerConfigListeners.add(listener); } + + @Override + public void onConfigurationChange(Configuration conf) { + this.conf = conf; + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 3a579298f45..7aa7f89ecf5 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -24,25 +24,38 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This provides an class for maintaining a set of peer clusters. These peers are remote slave * clusters that data is replicated to. + *

+ * We implement {@link ConfigurationObserver} mainly for recreating the + * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without + * restarting the region server. */ @InterfaceAudience.Private -public class ReplicationPeers { +public class ReplicationPeers implements ConfigurationObserver { - private final Configuration conf; + private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); + + private volatile Configuration conf; // Map of peer clusters keyed by their id private final ConcurrentMap peerCache; - private final ReplicationPeerStorage peerStorage; + private final FileSystem fs; + private final ZKWatcher zookeeper; + private volatile ReplicationPeerStorage peerStorage; ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) { this.conf = conf; + this.fs = fs; + this.zookeeper = zookeeper; this.peerCache = new ConcurrentHashMap<>(); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); } @@ -134,4 +147,18 @@ public class ReplicationPeers { return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), peerId, enabled, peerConfig); } + + @Override + public void onConfigurationChange(Configuration conf) { + this.conf = conf; + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); + for (ReplicationPeerImpl peer : peerCache.values()) { + try { + peer.onConfigurationChange( + ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf)); + } catch (ReplicationException e) { + LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4abd7df915c..b61737020da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -106,7 +106,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.MasterStoppedException; @@ -770,6 +769,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.replicationPeerManager = ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId); + this.configurationManager.registerObserver(replicationPeerManager); this.replicationPeerModificationStateStore = new ReplicationPeerModificationStateStore(masterRegion); @@ -4235,12 +4235,6 @@ public class HMaster extends HRegionServer implements MasterServices { } } - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - public ConfigurationManager getConfigurationManager() { - return configurationManager; - } - private void setQuotasObserver(Configuration conf) { // Add the Observer to delete quotas on table deletion before starting all CPs by // default with quota support, avoiding if user specifically asks to not load this Observer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 423f6590dcc..b7c3e0b4984 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -59,11 +60,14 @@ import org.apache.zookeeper.KeeperException; * Manages and performs all replication admin operations. *

* Used to add/remove a replication peer. + *

+ * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for + * supporting migrating across different replication peer storages without restarting master. */ @InterfaceAudience.Private -public class ReplicationPeerManager { +public class ReplicationPeerManager implements ConfigurationObserver { - private final ReplicationPeerStorage peerStorage; + private volatile ReplicationPeerStorage peerStorage; private final ReplicationQueueStorage queueStorage; @@ -71,10 +75,18 @@ public class ReplicationPeerManager { private final String clusterId; - private final Configuration conf; + private volatile Configuration conf; - ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, - ConcurrentMap peers, Configuration conf, String clusterId) { + // for dynamic recreating ReplicationPeerStorage. + private final FileSystem fs; + + private final ZKWatcher zk; + + ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage, + ReplicationQueueStorage queueStorage, ConcurrentMap peers, + Configuration conf, String clusterId) { + this.fs = fs; + this.zk = zk; this.peerStorage = peerStorage; this.queueStorage = queueStorage; this.peers = peers; @@ -426,7 +438,7 @@ public class ReplicationPeerManager { boolean enabled = peerStorage.isPeerEnabled(peerId); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); } - return new ReplicationPeerManager(peerStorage, + return new ReplicationPeerManager(fs, zk, peerStorage, ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); } @@ -440,4 +452,10 @@ public class ReplicationPeerManager { } return s1.equals(s2); } + + @Override + public void onConfigurationChange(Configuration conf) { + this.conf = conf; + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a7ee222787f..7f65af9ee5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; +import com.google.errorprone.annotations.RestrictedApi; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; @@ -2366,6 +2367,17 @@ public class HRegionServer extends Thread } private void registerConfigurationObservers() { + // Register Replication if possible, as now we support recreating replication peer storage, for + // migrating across different replication peer storages online + if (replicationSourceHandler instanceof ConfigurationObserver) { + configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); + } + if ( + replicationSourceHandler != replicationSinkHandler + && replicationSinkHandler instanceof ConfigurationObserver + ) { + configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); + } // Registering the compactSplitThread object with the ConfigurationManager. configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.rpcServices); @@ -3821,8 +3833,9 @@ public class HRegionServer extends Thread } /** Returns : Returns the ConfigurationManager object for testing purposes. */ - @InterfaceAudience.Private - ConfigurationManager getConfigurationManager() { + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public ConfigurationManager getConfigurationManager() { return configurationManager; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index c3c74c03fd6..de4f17a9311 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; @@ -56,15 +58,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; /** * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. + *

+ * Implement {@link PropagatingConfigurationObserver} mainly for registering + * {@link ReplicationPeers}, so we can recreating the replication peer storage. */ @InterfaceAudience.Private -public class Replication implements ReplicationSourceService, ReplicationSinkService { +public class Replication + implements ReplicationSourceService, ReplicationSinkService, PropagatingConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(Replication.class); private boolean isReplicationForBulkLoadDataEnabled; private ReplicationSourceManager replicationManager; private ReplicationQueueStorage queueStorage; private ReplicationPeers replicationPeers; - private Configuration conf; + private volatile Configuration conf; private ReplicationSink replicationSink; // Hosting server private Server server; @@ -262,4 +268,19 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); } + + @Override + public void onConfigurationChange(Configuration conf) { + this.conf = conf; + } + + @Override + public void registerChildren(ConfigurationManager manager) { + manager.registerObserver(replicationPeers); + } + + @Override + public void deregisterChildren(ConfigurationManager manager) { + manager.deregisterObserver(replicationPeers); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java new file mode 100644 index 00000000000..7b0b10f2008 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestMigrateRepliationPeerStorageOnline { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUp() throws Exception { + // use zookeeper first, and then migrate to filesystem + UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.ZOOKEEPER.name()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testMigrate() throws Exception { + Admin admin = UTIL.getAdmin(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") + .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); + admin.addReplicationPeer("1", rpc); + + // disable peer modification + admin.replicationPeerModificationSwitch(false, true); + + // migrate replication peer data + Configuration conf = new Configuration(UTIL.getConfiguration()); + assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf), + new String[] { "zookeeper", "filesystem" })); + conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.FILESYSTEM.name()); + // confirm that we have copied the data + ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory + .getReplicationPeerStorage(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), conf); + assertNotNull(fsPeerStorage.getPeerConfig("1")); + + for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) { + Configuration newConf = new Configuration(mt.getMaster().getConfiguration()); + newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.FILESYSTEM.name()); + mt.getMaster().getConfigurationManager().notifyAllObservers(newConf); + } + for (RegionServerThread rt : UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + Configuration newConf = new Configuration(rt.getRegionServer().getConfiguration()); + newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + ReplicationPeerStorageType.FILESYSTEM.name()); + rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf); + } + + admin.replicationPeerModificationSwitch(true); + admin.removeReplicationPeer("1"); + + // confirm that we will operation on the new peer storage + assertThat(fsPeerStorage.listPeerIds(), empty()); + } +}