HBASE-19524 Master side changes for moving peer modification from zk watcher to procedure

This commit is contained in:
zhangduo 2017-12-18 15:22:36 +08:00
parent f17198ff19
commit 7f4bd0d371
23 changed files with 673 additions and 162 deletions

View File

@ -247,9 +247,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
/**
* Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
* method.
* @param error the error message
*/
void remoteOperationFailed(TEnv env, String error);
void remoteOperationFailed(TEnv env, RemoteProcedureException error);
}
/**

View File

@ -27,6 +27,7 @@ option optimize_for = SPEED;
import "HBase.proto";
import "RPC.proto";
import "Snapshot.proto";
import "Replication.proto";
// ============================================================================
// WARNING - Compatibility rules
@ -367,9 +368,10 @@ message GCMergedRegionsStateData {
}
enum PeerModificationState {
UPDATE_PEER_STORAGE = 1;
REFRESH_PEER_ON_RS = 2;
POST_PEER_MODIFICATION = 3;
PRE_PEER_MODIFICATION = 1;
UPDATE_PEER_STORAGE = 2;
REFRESH_PEER_ON_RS = 3;
POST_PEER_MODIFICATION = 4;
}
message PeerModificationStateData {
@ -395,3 +397,16 @@ message RefreshPeerParameter {
required PeerModificationType type = 2;
required ServerName target_server = 3;
}
message ModifyPeerStateData {
required string peer_id = 1;
}
message AddPeerStateData {
required ReplicationPeer peer_config = 1;
required bool enabled = 2;
}
message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
}

View File

@ -28,6 +28,7 @@ option optimize_for = SPEED;
import "HBase.proto";
import "ClusterStatus.proto";
import "ErrorHandling.proto";
message RegionServerStartupRequest {
/** Port number this regionserver is up on */
@ -152,7 +153,7 @@ message ReportProcedureDoneRequest {
ERROR = 2;
}
required Status status = 2;
optional string error = 3;
optional ForeignExceptionMessage error = 3;
}
message ReportProcedureDoneResponse {

View File

@ -84,6 +84,7 @@ message AddReplicationPeerRequest {
}
message AddReplicationPeerResponse {
optional uint64 proc_id = 1;
}
message RemoveReplicationPeerRequest {
@ -91,6 +92,7 @@ message RemoveReplicationPeerRequest {
}
message RemoveReplicationPeerResponse {
optional uint64 proc_id = 1;
}
message EnableReplicationPeerRequest {
@ -98,6 +100,7 @@ message EnableReplicationPeerRequest {
}
message EnableReplicationPeerResponse {
optional uint64 proc_id = 1;
}
message DisableReplicationPeerRequest {
@ -105,6 +108,7 @@ message DisableReplicationPeerRequest {
}
message DisableReplicationPeerResponse {
optional uint64 proc_id = 1;
}
message GetReplicationPeerConfigRequest {
@ -122,6 +126,7 @@ message UpdateReplicationPeerConfigRequest {
}
message UpdateReplicationPeerConfigResponse {
optional uint64 proc_id = 1;
}
message ListReplicationPeersRequest {

View File

@ -535,7 +535,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
throw new ReplicationException("undeleted queue for peerId: " + peerId
throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
+ ", replicator: " + replicator + ", queueId: " + queueId);
}
}
@ -543,7 +543,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// Check for hfile-refs queue
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new ReplicationException("Undeleted queue for peerId: " + peerId
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
+ ", found in hfile-refs node path " + hfileRefsZNode);
}
} catch (KeeperException e) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Service;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
@ -49,10 +50,12 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterMetrics;
@ -127,7 +130,13 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
@ -140,6 +149,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
@ -168,7 +178,6 @@ import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdLock;
@ -329,15 +338,15 @@ public class HMaster extends HRegionServer implements MasterServices {
private volatile boolean activeMaster = false;
// flag set after we complete initialization once active
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
// flag set after master services are started,
// initialization may have not completed yet.
volatile boolean serviceStarted = false;
// flag set after we complete assignMeta.
private final ProcedureEvent serverCrashProcessingEnabled =
new ProcedureEvent("server crash processing");
private final ProcedureEvent<?> serverCrashProcessingEnabled =
new ProcedureEvent<>("server crash processing");
// Maximum time we should run balancer for
private final int maxBlancingTime;
@ -1193,7 +1202,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
final Path rootDir = FSUtils.getRootDir(conf);
procedureStore = new WALProcedureStore(conf,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
@ -2313,11 +2321,8 @@ public class HMaster extends HRegionServer implements MasterServices {
return true;
}
Pair<RegionInfo, ServerName> pair =
new Pair(MetaTableAccessor.getRegionInfo(data),
new Pair<>(MetaTableAccessor.getRegionInfo(data),
MetaTableAccessor.getServerName(data,0));
if (pair == null) {
return false;
}
if (!pair.getFirst().getTable().equals(tableName)) {
return false;
}
@ -2743,7 +2748,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public ProcedureEvent getInitializedEvent() {
public ProcedureEvent<?> getInitializedEvent() {
return initialized;
}
@ -2762,7 +2767,7 @@ public class HMaster extends HRegionServer implements MasterServices {
procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
}
public ProcedureEvent getServerCrashProcessingEnabledEvent() {
public ProcedureEvent<?> getServerCrashProcessingEnabledEvent() {
return serverCrashProcessingEnabled;
}
@ -3313,54 +3318,36 @@ public class HMaster extends HRegionServer implements MasterServices {
return favoredNodesManager;
}
private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;
}
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled);
if (cpHost != null) {
cpHost.postAddReplicationPeer(peerId, peerConfig);
}
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
}
@Override
public void removeReplicationPeer(String peerId) throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId);
}
public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
this.replicationManager.removeReplicationPeer(peerId);
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
}
return executePeerProcedure(new RemovePeerProcedure(peerId));
}
@Override
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preEnableReplicationPeer(peerId);
}
public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
this.replicationManager.enableReplicationPeer(peerId);
if (cpHost != null) {
cpHost.postEnableReplicationPeer(peerId);
}
return executePeerProcedure(new EnablePeerProcedure(peerId));
}
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preDisableReplicationPeer(peerId);
}
public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
this.replicationManager.disableReplicationPeer(peerId);
if (cpHost != null) {
cpHost.postDisableReplicationPeer(peerId);
}
return executePeerProcedure(new DisablePeerProcedure(peerId));
}
@Override
@ -3379,17 +3366,11 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId
+ ", config=" + peerConfig);
this.replicationManager.updatePeerConfig(peerId, peerConfig);
if (cpHost != null) {
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
}
LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
", config=" + peerConfig);
return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
}
@Override
@ -3542,10 +3523,15 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
public void remoteProcedureFailed(long procId, String error) {
public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
if (procedure != null) {
procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
}
}
@Override
public ReplicationManager getReplicationManager() {
return replicationManager;
}
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
@ -2256,7 +2257,8 @@ public class MasterRpcServices extends RSRpcServices
if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
master.remoteProcedureCompleted(request.getProcId());
} else {
master.remoteProcedureFailed(request.getProcId(), request.getError());
master.remoteProcedureFailed(request.getProcId(),
RemoteProcedureException.fromProto(request.getError()));
}
return ReportProcedureDoneResponse.getDefaultInstance();
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,10 +17,11 @@
*/
package org.apache.hadoop.hbase.master;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -52,8 +53,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Service;
/**
* A curated subset of services provided by {@link HMaster}.
* For use internally only. Passed to Managers, Services and Chores so can pass less-than-a
@ -136,7 +135,7 @@ public interface MasterServices extends Server {
* @return Tripped when Master has finished initialization.
*/
@VisibleForTesting
public ProcedureEvent getInitializedEvent();
public ProcedureEvent<?> getInitializedEvent();
/**
* @return Master's instance of {@link MetricsMaster}
@ -430,26 +429,26 @@ public interface MasterServices extends Server {
* @param peerConfig configuration for the replication slave cluster
* @param enabled peer state, true if ENABLED and false if DISABLED
*/
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException;
/**
* Removes a peer and stops the replication
* @param peerId a short name that identifies the peer
*/
void removeReplicationPeer(String peerId) throws ReplicationException, IOException;
long removeReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Restart the replication stream to the specified peer
* @param peerId a short name that identifies the peer
*/
void enableReplicationPeer(String peerId) throws ReplicationException, IOException;
long enableReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Stop the replication stream to the specified peer
* @param peerId a short name that identifies the peer
*/
void disableReplicationPeer(String peerId) throws ReplicationException, IOException;
long disableReplicationPeer(String peerId) throws ReplicationException, IOException;
/**
* Returns the configured ReplicationPeerConfig for the specified peer
@ -459,12 +458,17 @@ public interface MasterServices extends Server {
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
IOException;
/**
* Returns the {@link ReplicationManager}.
*/
ReplicationManager getReplicationManager();
/**
* Update the peerConfig for the specified peer
* @param peerId a short name that identifies the peer
* @param peerConfig new config for the peer
*/
void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException;
/**

View File

@ -16,12 +16,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -33,13 +31,16 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Base class for the Assign and Unassign Procedure.
*
@ -415,7 +416,7 @@ public abstract class RegionTransitionProcedure
}
@Override
public void remoteOperationFailed(MasterProcedureEnv env, String error) {
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
// should not be called for region operation until we modified the open/close region procedure
throw new UnsupportedOperationException();
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@ -137,6 +138,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return remoteDispatcher;
}
public ReplicationManager getReplicationManager() {
return master.getReplicationManager();
}
public boolean isRunning() {
if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
return master.getMasterProcedureExecutor().isRunning();

View File

@ -64,7 +64,7 @@ public abstract class ProcedurePrepareLatch {
protected abstract void countDown(final Procedure proc);
public abstract void await() throws IOException;
protected static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) {
public static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) {
if (latch != null) {
latch.countDown(proc);
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData;
/**
* The procedure for adding a new replication peer.
*/
@InterfaceAudience.Private
public class AddPeerProcedure extends ModifyPeerProcedure {
private static final Log LOG = LogFactory.getLog(AddPeerProcedure.class);
private ReplicationPeerConfig peerConfig;
private boolean enabled;
public AddPeerProcedure() {
}
public AddPeerProcedure(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) {
super(peerId);
this.peerConfig = peerConfig;
this.enabled = enabled;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ADD;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId +
", config " + peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(AddPeerStateData.newBuilder()
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
AddPeerStateData data = serializer.deserialize(AddPeerStateData.class);
peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
enabled = data.getEnabled();
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The procedure for disabling a replication peer.
*/
@InterfaceAudience.Private
public class DisablePeerProcedure extends ModifyPeerProcedure {
private static final Log LOG = LogFactory.getLog(DisablePeerProcedure.class);
public DisablePeerProcedure() {
}
public DisablePeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.DISABLE;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preDisableReplicationPeer(peerId);
}
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception {
env.getReplicationManager().disableReplicationPeer(peerId);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully disabled peer " + peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postDisableReplicationPeer(peerId);
}
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The procedure for enabling a replication peer.
*/
@InterfaceAudience.Private
public class EnablePeerProcedure extends ModifyPeerProcedure {
private static final Log LOG = LogFactory.getLog(EnablePeerProcedure.class);
public EnablePeerProcedure() {
}
public EnablePeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.ENABLE;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preEnableReplicationPeer(peerId);
}
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
env.getReplicationManager().enableReplicationPeer(peerId);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully enabled peer " + peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postEnableReplicationPeer(peerId);
}
}
}

View File

@ -21,15 +21,22 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyPeerStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
/**
* The base class for all replication peer related procedure.
*/
@InterfaceAudience.Private
public abstract class ModifyPeerProcedure
extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState>
@ -39,11 +46,21 @@ public abstract class ModifyPeerProcedure
protected String peerId;
// used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch;
protected ModifyPeerProcedure() {
}
protected ModifyPeerProcedure(String peerId) {
this.peerId = peerId;
// TODO: temporarily set a 4.0 here to always wait for the procedure exection completed. Change
// to 3.0 or 2.0 after the client modification is done.
this.latch = ProcedurePrepareLatch.createLatch(4, 0);
}
public ProcedurePrepareLatch getLatch() {
return latch;
}
@Override
@ -52,28 +69,58 @@ public abstract class ModifyPeerProcedure
}
/**
* Return {@code false} means that the operation is invalid and we should give up, otherwise
* {@code true}.
* <p>
* You need to call {@link #setFailure(String, Throwable)} to give the detail failure information.
* Called before we start the actual processing. If an exception is thrown then we will give up
* and mark the procedure as failed directly.
*/
protected abstract boolean updatePeerStorage() throws IOException;
protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException;
protected void postPeerModification() {
/**
* We will give up and mark the procedure as failure if {@link IllegalArgumentException} is
* thrown, for other type of Exception we will retry.
*/
protected abstract void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception;
/**
* Called before we finish the procedure. The implementation can do some logging work, and also
* call the coprocessor hook if any.
* <p>
* Notice that, since we have already done the actual work, throwing exception here will not fail
* this procedure, we will just ignore it and finish the procedure as suceeded.
*/
protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException;
private void releaseLatch() {
ProcedurePrepareLatch.releaseLatch(latch, this);
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case UPDATE_PEER_STORAGE:
case PRE_PEER_MODIFICATION:
try {
if (!updatePeerStorage()) {
assert isFailed() : "setFailure is not called";
prePeerModification(env);
} catch (IOException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
", mark the procedure as failure and give up", e);
setFailure("prePeerModification", e);
releaseLatch();
return Flow.NO_MORE_STATE;
}
} catch (IOException e) {
LOG.warn("update peer storage failed, retry", e);
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (IllegalArgumentException e) {
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
new DoNotRetryIOException(e));
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (Exception e) {
LOG.warn(
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
@ -85,7 +132,13 @@ public abstract class ModifyPeerProcedure
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
postPeerModification();
try {
postPeerModification(env);
} catch (IOException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
", ignore since the procedure has already done", e);
}
releaseLatch();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
@ -107,6 +160,12 @@ public abstract class ModifyPeerProcedure
@Override
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
throws IOException, InterruptedException {
if (state == PeerModificationState.PRE_PEER_MODIFICATION ||
state == PeerModificationState.UPDATE_PEER_STORAGE) {
// actually the peer related operations has no rollback, but if we haven't done any
// modifications on the peer storage, we can just return.
return;
}
throw new UnsupportedOperationException();
}
@ -122,6 +181,18 @@ public abstract class ModifyPeerProcedure
@Override
protected PeerModificationState getInitialState() {
return PeerModificationState.UPDATE_PEER_STORAGE;
return PeerModificationState.PRE_PEER_MODIFICATION;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(ModifyPeerStateData.newBuilder().setPeerId(peerId).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
peerId = serializer.deserialize(ModifyPeerStateData.class).getPeerId();
}
}

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -118,15 +120,22 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
}
private void complete(MasterProcedureEnv env, boolean succ) {
private void complete(MasterProcedureEnv env, Throwable error) {
if (event == null) {
LOG.warn("procedure event for " + getProcId() +
" is null, maybe the procedure is created when recovery", new Exception());
" is null, maybe the procedure is created when recovery",
new Exception());
return;
}
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer +
(succ ? " suceeded" : " failed"));
this.succ = succ;
if (error != null) {
LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed",
error);
this.succ = false;
} else {
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded");
this.succ = true;
}
event.wake(env.getProcedureScheduler());
event = null;
}
@ -134,17 +143,18 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
IOException exception) {
complete(env, false);
complete(env, exception);
}
@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
complete(env, true);
complete(env, null);
}
@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) {
complete(env, false);
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
complete(env, error);
}
@Override

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The procedure for removing a replication peer.
*/
@InterfaceAudience.Private
public class RemovePeerProcedure extends ModifyPeerProcedure {
private static final Log LOG = LogFactory.getLog(RemovePeerProcedure.class);
public RemovePeerProcedure() {
}
public RemovePeerProcedure(String peerId) {
super(peerId);
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.REMOVE;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId);
}
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
env.getReplicationManager().removeReplicationPeer(peerId);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully removed peer " + peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
}
}
}

View File

@ -27,10 +27,8 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@ -39,24 +37,21 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Manages and performs all replication admin operations.
* <p>
* Used to add/remove a replication peer.
*/
@InterfaceAudience.Private
public class ReplicationManager {
private final Configuration conf;
private final ZKWatcher zkw;
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException {
this.conf = conf;
this.zkw = zkw;
try {
this.replicationQueuesClient = ReplicationFactory
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
@ -70,7 +65,7 @@ public class ReplicationManager {
}
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException {
throws ReplicationException {
checkPeerConfig(peerConfig);
replicationPeers.registerPeer(peerId, peerConfig, enabled);
replicationPeers.peerConnected(peerId);
@ -89,8 +84,8 @@ public class ReplicationManager {
this.replicationPeers.disablePeer(peerId);
}
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException,
ReplicationPeerNotFoundException {
public ReplicationPeerConfig getPeerConfig(String peerId)
throws ReplicationException, ReplicationPeerNotFoundException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
if (peerConfig == null) {
throw new ReplicationPeerNotFoundException(peerId);
@ -110,9 +105,9 @@ public class ReplicationManager {
List<String> peerIds = replicationPeers.getAllPeerIds();
for (String peerId : peerIds) {
if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
peers.add(new ReplicationPeerDescription(peerId, replicationPeers
.getStatusOfPeerFromBackingStore(peerId), replicationPeers
.getReplicationPeerConfig(peerId)));
peers.add(new ReplicationPeerDescription(peerId,
replicationPeers.getStatusOfPeerFromBackingStore(peerId),
replicationPeers.getReplicationPeerConfig(peerId)));
}
}
return peers;
@ -126,13 +121,12 @@ public class ReplicationManager {
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
*/
private void checkPeerConfig(ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
private void checkPeerConfig(ReplicationPeerConfig peerConfig) {
if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
+ " when replicate_all flag is true");
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " +
"when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
@ -141,7 +135,7 @@ public class ReplicationManager {
&& !peerConfig.getExcludeNamespaces().isEmpty())
|| (peerConfig.getExcludeTableCFsMap() != null
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new ReplicationException(
throw new IllegalArgumentException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
+ " when replicate_all flag is false");
}
@ -154,20 +148,24 @@ public class ReplicationManager {
/**
* Set a namespace in the peer config means that all tables in this namespace will be replicated
* to the peer cluster.
* 1. If peer config already has a namespace, then not allow set any table of this namespace
* to the peer config.
* 2. If peer config already has a table, then not allow set this table's namespace to the peer
* config.
*
* <ol>
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
* the peer config.</li>
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
* config.</li>
* </ol>
* <p>
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
* replicated to the peer cluster.
* 1. If peer config already has a exclude namespace, then not allow set any exclude table of
* this namespace to the peer config.
* 2. If peer config already has a exclude table, then not allow set this table's namespace
* as a exclude namespace.
* <ol>
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
* this namespace to the peer config.</li>
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
* exclude namespace.</li>
* </ol>
*/
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
Map<TableName, ? extends Collection<String>> tableCfs) {
if (namespaces == null || namespaces.isEmpty()) {
return;
}
@ -177,24 +175,22 @@ public class ReplicationManager {
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
if (namespaces.contains(table.getNamespaceAsString())) {
throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces "
+ table.getNamespaceAsString() + " in peer config");
}
}
}
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws IOException {
String filterCSV = peerConfig.getConfiguration().
get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) {
String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {
try {
Class clazz = Class.forName(filter);
Object o = clazz.newInstance();
Class.forName(filter).newInstance();
} catch (Exception e) {
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
throw new IllegalArgumentException("Configured WALEntryFilter " + filter +
" could not be created. Failing add/update " + "peer operation.", e);
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
/**
* The procedure for updating the config for a replication peer.
*/
@InterfaceAudience.Private
public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
private static final Log LOG = LogFactory.getLog(UpdatePeerConfigProcedure.class);
private ReplicationPeerConfig peerConfig;
public UpdatePeerConfigProcedure() {
}
public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) {
super(peerId);
this.peerConfig = peerConfig;
}
@Override
public PeerOperationType getPeerOperationType() {
return PeerOperationType.UPDATE_CONFIG;
}
@Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env)
throws IllegalArgumentException, Exception {
env.getReplicationManager().updatePeerConfig(peerId, peerConfig);
}
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(UpdatePeerConfigStateData.newBuilder()
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
peerConfig = ReplicationPeerConfigUtil
.convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig());
}
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@ -209,7 +210,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
@ -3741,7 +3741,7 @@ public class HRegionServer extends HasThread implements
ReportProcedureDoneRequest.newBuilder().setProcId(procId);
if (error != null) {
builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
.setError(Throwables.getStackTraceAsString(error));
.setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error));
} else {
builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.executor.EventType;
@ -45,7 +45,10 @@ public class RefreshPeerCallable implements RSProcedureCallable {
if (initError != null) {
throw initError;
}
rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close();
Path dir = new Path("/" + peerId);
if (rs.getFileSystem().exists(dir)) {
rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close();
}
return null;
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -369,7 +370,6 @@ public class MockNoopMasterServices implements MasterServices, Server {
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@ -399,20 +399,24 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
return 0;
}
@Override
public void removeReplicationPeer(String peerId) throws ReplicationException {
public long removeReplicationPeer(String peerId) throws ReplicationException {
return 0;
}
@Override
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
return 0;
}
@Override
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
return 0;
}
@Override
@ -422,8 +426,9 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
return 0;
}
@Override
@ -458,7 +463,6 @@ public class MockNoopMasterServices implements MasterServices, Server {
@Override
public ProcedureEvent getInitializedEvent() {
// TODO Auto-generated method stub
return null;
}
@ -471,4 +475,9 @@ public class MockNoopMasterServices implements MasterServices, Server {
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
@Override
public ReplicationManager getReplicationManager() {
return null;
}
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
@ -34,8 +34,15 @@ public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
}
@Override
protected boolean updatePeerStorage() throws IOException {
return true;
protected void prePeerModification(MasterProcedureEnv env) {
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) {
}
@Override
protected void postPeerModification(MasterProcedureEnv env) {
}
}