From 5e410d814004288fe25320d8caf31880903d3e05 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 18 Dec 2017 15:22:36 +0800 Subject: [PATCH] HBASE-19524 Master side changes for moving peer modification from zk watcher to procedure --- .../procedure2/RemoteProcedureDispatcher.java | 3 +- .../src/main/protobuf/MasterProcedure.proto | 21 +++- .../main/protobuf/RegionServerStatus.proto | 3 +- .../src/main/protobuf/Replication.proto | 5 + .../replication/ReplicationPeersZKImpl.java | 4 +- .../apache/hadoop/hbase/master/HMaster.java | 93 ++++++++--------- .../hbase/master/MasterRpcServices.java | 4 +- .../hadoop/hbase/master/MasterServices.java | 26 ++--- .../assignment/RegionTransitionProcedure.java | 13 +-- .../master/procedure/MasterProcedureEnv.java | 5 + .../procedure/ProcedurePrepareLatch.java | 2 +- .../master/replication/AddPeerProcedure.java | 97 ++++++++++++++++++ .../replication/DisablePeerProcedure.java | 70 +++++++++++++ .../replication/EnablePeerProcedure.java | 69 +++++++++++++ .../replication/ModifyPeerProcedure.java | 99 ++++++++++++++++--- .../replication/RefreshPeerProcedure.java | 28 ++++-- .../replication/RemovePeerProcedure.java | 69 +++++++++++++ .../replication/ReplicationManager.java | 76 +++++++------- .../UpdatePeerConfigProcedure.java | 92 +++++++++++++++++ .../hbase/regionserver/HRegionServer.java | 5 +- .../regionserver}/RefreshPeerCallable.java | 7 +- .../hbase/master/MockNoopMasterServices.java | 23 +++-- .../replication/DummyModifyPeerProcedure.java | 13 ++- 23 files changed, 669 insertions(+), 158 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/{master/replication => replication/regionserver}/RefreshPeerCallable.java (90%) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 7e3dde6f87e..fb852c3e883 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -247,9 +247,8 @@ public abstract class RemoteProcedureDispatcher initialized = new ProcedureEvent<>("master initialized"); // flag set after master services are started, // initialization may have not completed yet. volatile boolean serviceStarted = false; // flag set after we complete assignMeta. - private final ProcedureEvent serverCrashProcessingEnabled = - new ProcedureEvent("server crash processing"); + private final ProcedureEvent serverCrashProcessingEnabled = + new ProcedureEvent<>("server crash processing"); // Maximum time we should run balancer for private final int maxBlancingTime; @@ -2319,11 +2326,8 @@ public class HMaster extends HRegionServer implements MasterServices { return true; } Pair pair = - new Pair(MetaTableAccessor.getRegionInfo(data), + new Pair<>(MetaTableAccessor.getRegionInfo(data), MetaTableAccessor.getServerName(data,0)); - if (pair == null) { - return false; - } if (!pair.getFirst().getTable().equals(tableName)) { return false; } @@ -2792,7 +2796,7 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent getInitializedEvent() { return initialized; } @@ -2811,7 +2815,7 @@ public class HMaster extends HRegionServer implements MasterServices { procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); } - public ProcedureEvent getServerCrashProcessingEnabledEvent() { + public ProcedureEvent getServerCrashProcessingEnabledEvent() { return serverCrashProcessingEnabled; } @@ -3377,54 +3381,36 @@ public class HMaster extends HRegionServer implements MasterServices { return favoredNodesManager; } + private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { + long procId = procedureExecutor.submitProcedure(procedure); + procedure.getLatch().await(); + return procId; + } + @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preAddReplicationPeer(peerId, peerConfig); - } - LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" - + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); - this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled); - if (cpHost != null) { - cpHost.postAddReplicationPeer(peerId, peerConfig); - } + LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); + return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled)); } @Override - public void removeReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preRemoveReplicationPeer(peerId); - } + public long removeReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId); - this.replicationManager.removeReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postRemoveReplicationPeer(peerId); - } + return executePeerProcedure(new RemovePeerProcedure(peerId)); } @Override - public void enableReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preEnableReplicationPeer(peerId); - } + public long enableReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId); - this.replicationManager.enableReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postEnableReplicationPeer(peerId); - } + return executePeerProcedure(new EnablePeerProcedure(peerId)); } @Override - public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preDisableReplicationPeer(peerId); - } + public long disableReplicationPeer(String peerId) throws ReplicationException, IOException { LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId); - this.replicationManager.disableReplicationPeer(peerId); - if (cpHost != null) { - cpHost.postDisableReplicationPeer(peerId); - } + return executePeerProcedure(new DisablePeerProcedure(peerId)); } @Override @@ -3443,17 +3429,11 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { - if (cpHost != null) { - cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); - } - LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId - + ", config=" + peerConfig); - this.replicationManager.updatePeerConfig(peerId, peerConfig); - if (cpHost != null) { - cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); - } + LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId + + ", config=" + peerConfig); + return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig)); } @Override @@ -3605,13 +3585,18 @@ public class HMaster extends HRegionServer implements MasterServices { } } - public void remoteProcedureFailed(long procId, String error) { + public void remoteProcedureFailed(long procId, RemoteProcedureException error) { RemoteProcedure procedure = getRemoteProcedure(procId); if (procedure != null) { procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error); } } + @Override + public ReplicationManager getReplicationManager() { + return replicationManager; + } + /** * This method modifies the master's configuration in order to inject replication-related features */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index cbfc9e809dc..7bd355ada61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; @@ -2238,7 +2239,8 @@ public class MasterRpcServices extends RSRpcServices if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { master.remoteProcedureCompleted(request.getProcId()); } else { - master.remoteProcedureFailed(request.getProcId(), request.getError()); + master.remoteProcedureFailed(request.getProcId(), + RemoteProcedureException.fromProto(request.getError())); } return ReportProcedureDoneResponse.getDefaultInstance(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index c9472568fc4..b0bf9ca61fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,10 +17,11 @@ */ package org.apache.hadoop.hbase.master; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -52,8 +53,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Service; - /** * A curated subset of services provided by {@link HMaster}. * For use internally only. Passed to Managers, Services and Chores so can pass less-than-a @@ -136,7 +135,7 @@ public interface MasterServices extends Server { * @return Tripped when Master has finished initialization. */ @VisibleForTesting - public ProcedureEvent getInitializedEvent(); + public ProcedureEvent getInitializedEvent(); /** * @return Master's instance of {@link MetricsMaster} @@ -430,26 +429,26 @@ public interface MasterServices extends Server { * @param peerConfig configuration for the replication slave cluster * @param enabled peer state, true if ENABLED and false if DISABLED */ - void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException; /** * Removes a peer and stops the replication * @param peerId a short name that identifies the peer */ - void removeReplicationPeer(String peerId) throws ReplicationException, IOException; + long removeReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Restart the replication stream to the specified peer * @param peerId a short name that identifies the peer */ - void enableReplicationPeer(String peerId) throws ReplicationException, IOException; + long enableReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Stop the replication stream to the specified peer * @param peerId a short name that identifies the peer */ - void disableReplicationPeer(String peerId) throws ReplicationException, IOException; + long disableReplicationPeer(String peerId) throws ReplicationException, IOException; /** * Returns the configured ReplicationPeerConfig for the specified peer @@ -459,12 +458,17 @@ public interface MasterServices extends Server { ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, IOException; + /** + * Returns the {@link ReplicationManager}. + */ + ReplicationManager getReplicationManager(); + /** * Update the peerConfig for the specified peer * @param peerId a short name that identifies the peer * @param peerConfig new config for the peer */ - void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 76d2875693e..a0e58f3ff57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -16,12 +16,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -33,13 +31,16 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * Base class for the Assign and Unassign Procedure. * @@ -416,7 +417,7 @@ public abstract class RegionTransitionProcedure } @Override - public void remoteOperationFailed(MasterProcedureEnv env, String error) { + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { // should not be called for region operation until we modified the open/close region procedure throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 64d671b6751..19e6f9b9b5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; @@ -137,6 +138,10 @@ public class MasterProcedureEnv implements ConfigurationObserver { return remoteDispatcher; } + public ReplicationManager getReplicationManager() { + return master.getReplicationManager(); + } + public boolean isRunning() { if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; return master.getMasterProcedureExecutor().isRunning(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index 283cde0a722..2011c0b9de9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -78,7 +78,7 @@ public abstract class ProcedurePrepareLatch { protected abstract void countDown(final Procedure proc); public abstract void await() throws IOException; - protected static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) { + public static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) { if (latch != null) { latch.countDown(proc); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java new file mode 100644 index 00000000000..c3862d8c06f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData; + +/** + * The procedure for adding a new replication peer. + */ +@InterfaceAudience.Private +public class AddPeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(AddPeerProcedure.class); + + private ReplicationPeerConfig peerConfig; + + private boolean enabled; + + public AddPeerProcedure() { + } + + public AddPeerProcedure(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { + super(peerId); + this.peerConfig = peerConfig; + this.enabled = enabled; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preAddReplicationPeer(peerId, peerConfig); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId + + ", config " + peerConfig); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(AddPeerStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + AddPeerStateData data = serializer.deserialize(AddPeerStateData.class); + peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + enabled = data.getEnabled(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java new file mode 100644 index 00000000000..0b32db9cb1a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for disabling a replication peer. + */ +@InterfaceAudience.Private +public class DisablePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(DisablePeerProcedure.class); + + public DisablePeerProcedure() { + } + + public DisablePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.DISABLE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preDisableReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception { + env.getReplicationManager().disableReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully disabled peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postDisableReplicationPeer(peerId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java new file mode 100644 index 00000000000..92ba000944e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for enabling a replication peer. + */ +@InterfaceAudience.Private +public class EnablePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(EnablePeerProcedure.class); + + public EnablePeerProcedure() { + } + + public EnablePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ENABLE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preEnableReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { + env.getReplicationManager().enableReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully enabled peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postEnableReplicationPeer(peerId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index fca05a7e012..7076babd6f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -21,15 +21,22 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyPeerStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +/** + * The base class for all replication peer related procedure. + */ @InterfaceAudience.Private public abstract class ModifyPeerProcedure extends StateMachineProcedure @@ -39,11 +46,21 @@ public abstract class ModifyPeerProcedure protected String peerId; + // used to keep compatible with old client where we can only returns after updateStorage. + protected ProcedurePrepareLatch latch; + protected ModifyPeerProcedure() { } protected ModifyPeerProcedure(String peerId) { this.peerId = peerId; + // TODO: temporarily set a 4.0 here to always wait for the procedure exection completed. Change + // to 3.0 or 2.0 after the client modification is done. + this.latch = ProcedurePrepareLatch.createLatch(4, 0); + } + + public ProcedurePrepareLatch getLatch() { + return latch; } @Override @@ -52,28 +69,58 @@ public abstract class ModifyPeerProcedure } /** - * Return {@code false} means that the operation is invalid and we should give up, otherwise - * {@code true}. - *

- * You need to call {@link #setFailure(String, Throwable)} to give the detail failure information. + * Called before we start the actual processing. If an exception is thrown then we will give up + * and mark the procedure as failed directly. */ - protected abstract boolean updatePeerStorage() throws IOException; + protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException; - protected void postPeerModification() { + /** + * We will give up and mark the procedure as failure if {@link IllegalArgumentException} is + * thrown, for other type of Exception we will retry. + */ + protected abstract void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception; + + /** + * Called before we finish the procedure. The implementation can do some logging work, and also + * call the coprocessor hook if any. + *

+ * Notice that, since we have already done the actual work, throwing exception here will not fail + * this procedure, we will just ignore it and finish the procedure as suceeded. + */ + protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; + + private void releaseLatch() { + ProcedurePrepareLatch.releaseLatch(latch, this); } @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { switch (state) { + case PRE_PEER_MODIFICATION: + try { + prePeerModification(env); + } catch (IOException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", mark the procedure as failure and give up", e); + setFailure("prePeerModification", e); + releaseLatch(); + return Flow.NO_MORE_STATE; + } + setNextState(PeerModificationState.UPDATE_PEER_STORAGE); + return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { - if (!updatePeerStorage()) { - assert isFailed() : "setFailure is not called"; - return Flow.NO_MORE_STATE; - } - } catch (IOException e) { - LOG.warn("update peer storage failed, retry", e); + updatePeerStorage(env); + } catch (IllegalArgumentException e) { + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", + new DoNotRetryIOException(e)); + releaseLatch(); + return Flow.NO_MORE_STATE; + } catch (Exception e) { + LOG.warn( + getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.REFRESH_PEER_ON_RS); @@ -85,7 +132,13 @@ public abstract class ModifyPeerProcedure setNextState(PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; case POST_PEER_MODIFICATION: - postPeerModification(); + try { + postPeerModification(env); + } catch (IOException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", ignore since the procedure has already done", e); + } + releaseLatch(); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -107,6 +160,12 @@ public abstract class ModifyPeerProcedure @Override protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) throws IOException, InterruptedException { + if (state == PeerModificationState.PRE_PEER_MODIFICATION || + state == PeerModificationState.UPDATE_PEER_STORAGE) { + // actually the peer related operations has no rollback, but if we haven't done any + // modifications on the peer storage, we can just return. + return; + } throw new UnsupportedOperationException(); } @@ -122,6 +181,18 @@ public abstract class ModifyPeerProcedure @Override protected PeerModificationState getInitialState() { - return PeerModificationState.UPDATE_PEER_STORAGE; + return PeerModificationState.PRE_PEER_MODIFICATION; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(ModifyPeerStateData.newBuilder().setPeerId(peerId).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + peerId = serializer.deserialize(ModifyPeerStateData.class).getPeerId(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 18da487adf7..ddc2401eb42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -118,15 +120,22 @@ public class RefreshPeerProcedure extends Procedure .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); } - private void complete(MasterProcedureEnv env, boolean succ) { + private void complete(MasterProcedureEnv env, Throwable error) { if (event == null) { LOG.warn("procedure event for " + getProcId() + - " is null, maybe the procedure is created when recovery", new Exception()); + " is null, maybe the procedure is created when recovery", + new Exception()); return; } - LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + - (succ ? " suceeded" : " failed")); - this.succ = succ; + if (error != null) { + LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed", + error); + this.succ = false; + } else { + LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded"); + this.succ = true; + } + event.wake(env.getProcedureScheduler()); event = null; } @@ -134,17 +143,18 @@ public class RefreshPeerProcedure extends Procedure @Override public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { - complete(env, false); + complete(env, exception); } @Override public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, true); + complete(env, null); } @Override - public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) { - complete(env, false); + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + complete(env, error); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java new file mode 100644 index 00000000000..3daad6d5ab2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The procedure for removing a replication peer. + */ +@InterfaceAudience.Private +public class RemovePeerProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(RemovePeerProcedure.class); + + public RemovePeerProcedure() { + } + + public RemovePeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.REMOVE; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preRemoveReplicationPeer(peerId); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { + env.getReplicationManager().removeReplicationPeer(peerId); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully removed peer " + peerId); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRemoveReplicationPeer(peerId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index f36b2e27113..b6f8784bfd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -27,10 +27,8 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -39,24 +37,21 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; /** * Manages and performs all replication admin operations. + *

* Used to add/remove a replication peer. */ @InterfaceAudience.Private public class ReplicationManager { - - private final Configuration conf; - private final ZKWatcher zkw; private final ReplicationQueuesClient replicationQueuesClient; private final ReplicationPeers replicationPeers; public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) throws IOException { - this.conf = conf; - this.zkw = zkw; try { this.replicationQueuesClient = ReplicationFactory .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); @@ -70,7 +65,7 @@ public class ReplicationManager { } public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException, IOException { + throws ReplicationException { checkPeerConfig(peerConfig); replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.peerConnected(peerId); @@ -89,8 +84,8 @@ public class ReplicationManager { this.replicationPeers.disablePeer(peerId); } - public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException, - ReplicationPeerNotFoundException { + public ReplicationPeerConfig getPeerConfig(String peerId) + throws ReplicationException, ReplicationPeerNotFoundException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); if (peerConfig == null) { throw new ReplicationPeerNotFoundException(peerId); @@ -110,9 +105,9 @@ public class ReplicationManager { List peerIds = replicationPeers.getAllPeerIds(); for (String peerId : peerIds) { if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { - peers.add(new ReplicationPeerDescription(peerId, replicationPeers - .getStatusOfPeerFromBackingStore(peerId), replicationPeers - .getReplicationPeerConfig(peerId))); + peers.add(new ReplicationPeerDescription(peerId, + replicationPeers.getStatusOfPeerFromBackingStore(peerId), + replicationPeers.getReplicationPeerConfig(peerId))); } } return peers; @@ -126,13 +121,12 @@ public class ReplicationManager { * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. */ - private void checkPeerConfig(ReplicationPeerConfig peerConfig) - throws ReplicationException, IOException { + private void checkPeerConfig(ReplicationPeerConfig peerConfig) { if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) - || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new ReplicationException("Need clean namespaces or table-cfs config firstly" - + " when replicate_all flag is true"); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); @@ -141,7 +135,7 @@ public class ReplicationManager { && !peerConfig.getExcludeNamespaces().isEmpty()) || (peerConfig.getExcludeTableCFsMap() != null && !peerConfig.getExcludeTableCFsMap().isEmpty())) { - throw new ReplicationException( + throw new IllegalArgumentException( "Need clean exclude-namespaces or exclude-table-cfs config firstly" + " when replicate_all flag is false"); } @@ -154,20 +148,24 @@ public class ReplicationManager { /** * Set a namespace in the peer config means that all tables in this namespace will be replicated * to the peer cluster. - * 1. If peer config already has a namespace, then not allow set any table of this namespace - * to the peer config. - * 2. If peer config already has a table, then not allow set this table's namespace to the peer - * config. - * + *

    + *
  1. If peer config already has a namespace, then not allow set any table of this namespace to + * the peer config.
  2. + *
  3. If peer config already has a table, then not allow set this table's namespace to the peer + * config.
  4. + *
+ *

* Set a exclude namespace in the peer config means that all tables in this namespace can't be * replicated to the peer cluster. - * 1. If peer config already has a exclude namespace, then not allow set any exclude table of - * this namespace to the peer config. - * 2. If peer config already has a exclude table, then not allow set this table's namespace - * as a exclude namespace. + *

    + *
  1. If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config.
  2. + *
  3. If peer config already has a exclude table, then not allow set this table's namespace as a + * exclude namespace.
  4. + *
*/ private void checkNamespacesAndTableCfsConfigConflict(Set namespaces, - Map> tableCfs) throws ReplicationException { + Map> tableCfs) { if (namespaces == null || namespaces.isEmpty()) { return; } @@ -177,24 +175,22 @@ public class ReplicationManager { for (Map.Entry> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); if (namespaces.contains(table.getNamespaceAsString())) { - throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces " + throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces " + table.getNamespaceAsString() + " in peer config"); } } } - private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) - throws IOException { - String filterCSV = peerConfig.getConfiguration(). - get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); - if (filterCSV != null && !filterCSV.isEmpty()){ - String [] filters = filterCSV.split(","); + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) { + String filterCSV = peerConfig.getConfiguration() + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); for (String filter : filters) { try { - Class clazz = Class.forName(filter); - Object o = clazz.newInstance(); + Class.forName(filter).newInstance(); } catch (Exception e) { - throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + throw new IllegalArgumentException("Configured WALEntryFilter " + filter + " could not be created. Failing add/update " + "peer operation.", e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java new file mode 100644 index 00000000000..435eefc4c14 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData; + +/** + * The procedure for updating the config for a replication peer. + */ +@InterfaceAudience.Private +public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { + + private static final Log LOG = LogFactory.getLog(UpdatePeerConfigProcedure.class); + + private ReplicationPeerConfig peerConfig; + + public UpdatePeerConfigProcedure() { + } + + public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) { + super(peerId); + this.peerConfig = peerConfig; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.UPDATE_CONFIG; + } + + @Override + protected void prePeerModification(MasterProcedureEnv env) throws IOException { + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); + } + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) + throws IllegalArgumentException, Exception { + env.getReplicationManager().updatePeerConfig(peerId, peerConfig); + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) throws IOException { + LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig); + MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); + } + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(UpdatePeerConfigStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + peerConfig = ReplicationPeerConfigUtil + .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 862a84bf34e..c95ac37f67a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -144,6 +144,7 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; @@ -3684,7 +3685,7 @@ public class HRegionServer extends HasThread implements ReportProcedureDoneRequest.newBuilder().setProcId(procId); if (error != null) { builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) - .setError(Throwables.getStackTraceAsString(error)); + .setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error)); } else { builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java similarity index 90% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 4e091078bb2..a47a4838262 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.master.replication; +package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.executor.EventType; @@ -45,7 +45,10 @@ public class RefreshPeerCallable implements RSProcedureCallable { if (initError != null) { throw initError; } - rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close(); + Path dir = new Path("/" + peerId); + if (rs.getFileSystem().exists(dir)) { + rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close(); + } return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index be91aa0fdde..e88710efc2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -368,7 +369,6 @@ public class MockNoopMasterServices implements MasterServices { @Override public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub return null; } @@ -398,20 +398,24 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { + return 0; } @Override - public void removeReplicationPeer(String peerId) throws ReplicationException { + public long removeReplicationPeer(String peerId) throws ReplicationException { + return 0; } @Override - public void enableReplicationPeer(String peerId) throws ReplicationException, IOException { + public long enableReplicationPeer(String peerId) throws ReplicationException, IOException { + return 0; } @Override - public void disableReplicationPeer(String peerId) throws ReplicationException, IOException { + public long disableReplicationPeer(String peerId) throws ReplicationException, IOException { + return 0; } @Override @@ -421,8 +425,9 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) + public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { + return 0; } @Override @@ -457,7 +462,6 @@ public class MockNoopMasterServices implements MasterServices { @Override public ProcedureEvent getInitializedEvent() { - // TODO Auto-generated method stub return null; } @@ -470,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices { public Connection createConnection(Configuration conf) throws IOException { return null; } + + @Override + public ReplicationManager getReplicationManager() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java index 44343d71227..ed7c6fa383b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.master.replication; -import java.io.IOException; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; public class DummyModifyPeerProcedure extends ModifyPeerProcedure { @@ -34,8 +34,15 @@ public class DummyModifyPeerProcedure extends ModifyPeerProcedure { } @Override - protected boolean updatePeerStorage() throws IOException { - return true; + protected void prePeerModification(MasterProcedureEnv env) { + } + + @Override + protected void updatePeerStorage(MasterProcedureEnv env) { + } + + @Override + protected void postPeerModification(MasterProcedureEnv env) { } }