diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index b80ee16301f..fdae2880976 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -27,8 +27,8 @@ import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * A configuration for the replication peer cluster. diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index f0070f00ebd..fe45762212e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; import java.util.Arrays; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -45,13 +44,14 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -66,6 +66,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -333,19 +334,24 @@ public class VerifyReplication extends Configured implements Tool { final Configuration conf, String peerId) throws IOException { ZKWatcher localZKW = null; try { - localZKW = new ZKWatcher(conf, "VerifyReplication", - new Abortable() { - @Override public void abort(String why, Throwable e) {} - @Override public boolean isAborted() {return false;} - }); + localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { + @Override + public void abort(String why, Throwable e) { + } - ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf); - rp.init(); - - return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId)); + @Override + public boolean isAborted() { + return false; + } + }); + ReplicationPeerStorage storage = + ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); + ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); + return Pair.newPair(peerConfig, + ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf)); } catch (ReplicationException e) { - throw new IOException( - "An error occurred while trying to connect to the remove peer cluster", e); + throw new IOException("An error occurred while trying to connect to the remove peer cluster", + e); } finally { if (localZKW != null) { localZKW.close(); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index e58482eeca1..422801b4939 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -39,20 +37,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class ReplicationPeers { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); - private final Configuration conf; // Map of peer clusters keyed by their id private final ConcurrentMap peerCache; private final ReplicationPeerStorage peerStorage; - protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { + ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { this.conf = conf; this.peerCache = new ConcurrentHashMap<>(); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); } + public Configuration getConf() { + return conf; + } + public void init() throws ReplicationException { // Loading all existing peerIds into peer cache. for (String peerId : this.peerStorage.listPeerIds()) { @@ -120,22 +120,13 @@ public class ReplicationPeers { return peerCache.keySet(); } - public ReplicationPeerConfig getPeerConfig(String peerId) { - ReplicationPeer replicationPeer = this.peerCache.get(peerId); - if (replicationPeer == null) { - throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached"); - } - return replicationPeer.getPeerConfig(); - } - - public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException { - ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); - + public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, + Configuration baseConf) throws ReplicationException { Configuration otherConf; try { - otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); + otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey()); } catch (IOException e) { - throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e); + throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e); } if (!peerConfig.getConfiguration().isEmpty()) { @@ -179,8 +170,9 @@ public class ReplicationPeers { >>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface */ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { - ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId); + ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf); + return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled, + peerConfig); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 41f50d8763f..ee237f227ce 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -50,7 +49,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * ZK based replication queue storage. diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index d09a56be6da..2321e4fea22 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.replication; import java.io.ByteArrayOutputStream; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 2589199d8cd..07c6c154c94 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -238,12 +237,6 @@ public abstract class TestReplicationStateBasic { } catch (ReplicationException e) { } - try { - assertNull(rp.getPeerClusterConfiguration("bogus")); - fail("Should have thrown an ReplicationException when passed a bogus peerId"); - } catch (ReplicationException e) { - } - assertNumberOfPeers(0); // Add some peers @@ -258,7 +251,8 @@ public abstract class TestReplicationStateBasic { fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE))); + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers + .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); rp.getPeerStorage().removePeer(ID_ONE); rp.removePeer(ID_ONE); assertNumberOfPeers(1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index a4f9b3226c2..f0f77047ffd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -74,8 +73,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId + - ", config " + peerConfig); + LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, + peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 10e35a89f70..087157527e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -62,7 +61,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully disabled peer " + peerId); + LOG.info("Successfully disabled peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postDisableReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index f2a9f011173..890462ff5fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -62,7 +61,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully enabled peer " + peerId); + LOG.info("Successfully enabled peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postEnableReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index a682606bad1..c225619e5f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -84,10 +83,13 @@ public abstract class ModifyPeerProcedure * Called before we finish the procedure. The implementation can do some logging work, and also * call the coprocessor hook if any. *

- * Notice that, since we have already done the actual work, throwing exception here will not fail - * this procedure, we will just ignore it and finish the procedure as suceeded. + * Notice that, since we have already done the actual work, throwing {@code IOException} here will + * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If + * {@code ReplicationException} is thrown we will retry since this usually means we fails to + * update the peer storage. */ - protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; private void releaseLatch() { ProcedurePrepareLatch.releaseLatch(latch, this); @@ -101,16 +103,14 @@ public abstract class ModifyPeerProcedure try { prePeerModification(env); } catch (IOException e) { - LOG.warn( - getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + - peerId + ", mark the procedure as failure and give up", - e); + LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + + "mark the procedure as failure and give up", getClass().getName(), peerId, e); setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); releaseLatch(); return Flow.NO_MORE_STATE; } catch (ReplicationException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", retry", e); + LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), + peerId, e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.UPDATE_PEER_STORAGE); @@ -119,8 +119,8 @@ public abstract class ModifyPeerProcedure try { updatePeerStorage(env); } catch (ReplicationException e) { - LOG.warn( - getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); + LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, + e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.REFRESH_PEER_ON_RS); @@ -134,9 +134,13 @@ public abstract class ModifyPeerProcedure case POST_PEER_MODIFICATION: try { postPeerModification(env); + } catch (ReplicationException e) { + LOG.warn("{} failed to call postPeerModification for peer {}, retry", + getClass().getName(), peerId, e); + throw new ProcedureYieldException(); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", ignore since the procedure has already done", e); + LOG.warn("{} failed to call post CP hook for peer {}, " + + "ignore since the procedure has already done", getClass().getName(), peerId, e); } releaseLatch(); return Flow.NO_MORE_STATE; @@ -175,7 +179,7 @@ public abstract class ModifyPeerProcedure throws IOException, InterruptedException { if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any - // modifications on the peer storage, we can just return. + // modifications on the peer storage yet, we can just return. return; } throw new UnsupportedOperationException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index ba4285fd736..1253ef9bc51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; @@ -122,17 +121,15 @@ public class RefreshPeerProcedure extends Procedure private void complete(MasterProcedureEnv env, Throwable error) { if (event == null) { - LOG.warn("procedure event for " + getProcId() + - " is null, maybe the procedure is created when recovery", - new Exception()); + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); return; } if (error != null) { - LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed", - error); + LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); this.succ = false; } else { - LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded"); + LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); this.succ = true; } @@ -168,9 +165,9 @@ public class RefreshPeerProcedure extends Procedure dispatched = false; } if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { - LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type + - " to " + targetServer + ", this usually because the server is already dead," + - " give up and mark the procedure as complete"); + LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " + + "this usually because the server is already dead, " + + "give up and mark the procedure as complete", peerId, type, targetServer); return null; } dispatched = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 6e9c3840e8d..64faf2b0e00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -61,8 +60,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully removed peer " + peerId); + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId); + LOG.info("Successfully removed peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postRemoveReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index b6732d785ac..1414d225ba9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -217,6 +216,36 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } + private void removeAllQueues0(String peerId) throws ReplicationException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + queueStorage.removeQueue(replicator, queueId); + } + } + queueStorage.removeReplicatorIfQueueIsEmpty(replicator); + } + } + + public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still + // on-going when the refresh peer config procedure is done, if a RS which has already been + // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in + // the scan here, and if the RS who has claimed the queue crashed before creating recovered + // source, then the queue will leave there until the another RS detects the crash and helps + // removing the queue. + // A two pass scan can solve the problem. Anyway, the queue will not disappear during the + // claiming, it will either under the old RS or under the new RS, and a queue can only be + // claimed once after the refresh peer procedure done(as the next claim queue will just delete + // it), so we can make sure that a two pass scan will finally find the queue and remove it, + // unless it has already been removed by others. + removeAllQueues0(peerId); + removeAllQueues0(peerId); + queueStorage.removePeerFromHFileRefs(peerId); + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index a43532dd5f6..34974475ab0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; - import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -70,7 +69,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig); + LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java index e4be422388d..ac3e95a1435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java @@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; + import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index c3f33aa39d7..7ada24b575d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; import org.apache.log4j.Logger; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ce9882a89a2..d4d837cc667 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -296,9 +295,8 @@ public class ReplicationSourceManager implements ReplicationListener { */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); - ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); + ReplicationSourceInterface src = getReplicationSource(id, peer); synchronized (this.walsById) { this.sources.add(src); Map> walsByGroup = new HashMap<>(); @@ -493,8 +491,8 @@ public class ReplicationSourceManager implements ReplicationListener { * @param peerId the id of the peer cluster * @return the created source */ - private ReplicationSourceInterface getReplicationSource(String peerId, - ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException { + private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer) + throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -506,24 +504,24 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationEndpoint replicationEndpoint = null; try { - String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); + String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl(); if (replicationEndpointImpl == null) { // Default to HBase inter-cluster replication endpoint replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); } replicationEndpoint = Class.forName(replicationEndpointImpl) .asSubclass(ReplicationEndpoint.class).newInstance(); - if(rsServerHost != null) { - ReplicationEndpoint newReplicationEndPoint = rsServerHost - .postCreateReplicationEndPoint(replicationEndpoint); - if(newReplicationEndPoint != null) { + if (rsServerHost != null) { + ReplicationEndpoint newReplicationEndPoint = + rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); + if (newReplicationEndPoint != null) { // Override the newly created endpoint from the hook with configured end point replicationEndpoint = newReplicationEndPoint; } } } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors" - + " while initializing ReplicationSource for peer: " + peerId, e); + LOG.warn("Passed replication endpoint implementation throws errors" + + " while initializing ReplicationSource for peer: " + peerId, e); throw new IOException(e); } @@ -533,8 +531,8 @@ public class ReplicationSourceManager implements ReplicationListener { replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint - replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), - fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); + replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs, + peerId, clusterId, peer, metrics, tableDescriptors, server)); return src; } @@ -730,16 +728,6 @@ public class ReplicationSourceManager implements ReplicationListener { abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); continue; } - - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = replicationPeers.getPeerConfig(actualPeerId); - } catch (Exception e) { - LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS - + ", failed to read peer config", e); - abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); - continue; - } if (server instanceof ReplicationSyncUp.DummyServer && peer.getPeerState().equals(PeerState.DISABLED)) { LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip " @@ -761,7 +749,7 @@ public class ReplicationSourceManager implements ReplicationListener { } // enqueue sources - ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer); + ReplicationSourceInterface src = getReplicationSource(peerId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java index b09a8a7112e..1300376b977 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java @@ -15,19 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.client.replication; import java.io.IOException; - import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -39,6 +35,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ MediumTests.class, ClientTests.class }) public class TestReplicationAdminUsingProcedure extends TestReplicationBase {