HBASE-19524 Master side changes for moving peer modification from zk watcher to procedure
This commit is contained in:
parent
95af14fea6
commit
5e410d8140
|
@ -247,9 +247,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
/**
|
||||
* Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
|
||||
* method.
|
||||
* @param error the error message
|
||||
*/
|
||||
void remoteOperationFailed(TEnv env, String error);
|
||||
void remoteOperationFailed(TEnv env, RemoteProcedureException error);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,6 +27,7 @@ option optimize_for = SPEED;
|
|||
import "HBase.proto";
|
||||
import "RPC.proto";
|
||||
import "Snapshot.proto";
|
||||
import "Replication.proto";
|
||||
|
||||
// ============================================================================
|
||||
// WARNING - Compatibility rules
|
||||
|
@ -370,9 +371,10 @@ message GCMergedRegionsStateData {
|
|||
}
|
||||
|
||||
enum PeerModificationState {
|
||||
UPDATE_PEER_STORAGE = 1;
|
||||
REFRESH_PEER_ON_RS = 2;
|
||||
POST_PEER_MODIFICATION = 3;
|
||||
PRE_PEER_MODIFICATION = 1;
|
||||
UPDATE_PEER_STORAGE = 2;
|
||||
REFRESH_PEER_ON_RS = 3;
|
||||
POST_PEER_MODIFICATION = 4;
|
||||
}
|
||||
|
||||
message PeerModificationStateData {
|
||||
|
@ -398,3 +400,16 @@ message RefreshPeerParameter {
|
|||
required PeerModificationType type = 2;
|
||||
required ServerName target_server = 3;
|
||||
}
|
||||
|
||||
message ModifyPeerStateData {
|
||||
required string peer_id = 1;
|
||||
}
|
||||
|
||||
message AddPeerStateData {
|
||||
required ReplicationPeer peer_config = 1;
|
||||
required bool enabled = 2;
|
||||
}
|
||||
|
||||
message UpdatePeerConfigStateData {
|
||||
required ReplicationPeer peer_config = 1;
|
||||
}
|
|
@ -28,6 +28,7 @@ option optimize_for = SPEED;
|
|||
|
||||
import "HBase.proto";
|
||||
import "ClusterStatus.proto";
|
||||
import "ErrorHandling.proto";
|
||||
|
||||
message RegionServerStartupRequest {
|
||||
/** Port number this regionserver is up on */
|
||||
|
@ -152,7 +153,7 @@ message ReportProcedureDoneRequest {
|
|||
ERROR = 2;
|
||||
}
|
||||
required Status status = 2;
|
||||
optional string error = 3;
|
||||
optional ForeignExceptionMessage error = 3;
|
||||
}
|
||||
|
||||
message ReportProcedureDoneResponse {
|
||||
|
|
|
@ -84,6 +84,7 @@ message AddReplicationPeerRequest {
|
|||
}
|
||||
|
||||
message AddReplicationPeerResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message RemoveReplicationPeerRequest {
|
||||
|
@ -91,6 +92,7 @@ message RemoveReplicationPeerRequest {
|
|||
}
|
||||
|
||||
message RemoveReplicationPeerResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message EnableReplicationPeerRequest {
|
||||
|
@ -98,6 +100,7 @@ message EnableReplicationPeerRequest {
|
|||
}
|
||||
|
||||
message EnableReplicationPeerResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message DisableReplicationPeerRequest {
|
||||
|
@ -105,6 +108,7 @@ message DisableReplicationPeerRequest {
|
|||
}
|
||||
|
||||
message DisableReplicationPeerResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message GetReplicationPeerConfigRequest {
|
||||
|
@ -122,6 +126,7 @@ message UpdateReplicationPeerConfigRequest {
|
|||
}
|
||||
|
||||
message UpdateReplicationPeerConfigResponse {
|
||||
optional uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
message ListReplicationPeersRequest {
|
||||
|
|
|
@ -530,7 +530,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
for (String queueId : queueIds) {
|
||||
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||
if (queueInfo.getPeerId().equals(peerId)) {
|
||||
throw new ReplicationException("undeleted queue for peerId: " + peerId
|
||||
throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
|
||||
+ ", replicator: " + replicator + ", queueId: " + queueId);
|
||||
}
|
||||
}
|
||||
|
@ -538,7 +538,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
// Check for hfile-refs queue
|
||||
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
|
||||
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
|
||||
throw new ReplicationException("Undeleted queue for peerId: " + peerId
|
||||
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
|
||||
+ ", found in hfile-refs node path " + hfileRefsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
|
|
|
@ -128,7 +128,13 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
|
|||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
|
||||
|
@ -141,6 +147,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
|
||||
|
@ -331,15 +338,15 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private volatile boolean activeMaster = false;
|
||||
|
||||
// flag set after we complete initialization once active
|
||||
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
|
||||
private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
|
||||
|
||||
// flag set after master services are started,
|
||||
// initialization may have not completed yet.
|
||||
volatile boolean serviceStarted = false;
|
||||
|
||||
// flag set after we complete assignMeta.
|
||||
private final ProcedureEvent serverCrashProcessingEnabled =
|
||||
new ProcedureEvent("server crash processing");
|
||||
private final ProcedureEvent<?> serverCrashProcessingEnabled =
|
||||
new ProcedureEvent<>("server crash processing");
|
||||
|
||||
// Maximum time we should run balancer for
|
||||
private final int maxBlancingTime;
|
||||
|
@ -2319,11 +2326,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return true;
|
||||
}
|
||||
Pair<RegionInfo, ServerName> pair =
|
||||
new Pair(MetaTableAccessor.getRegionInfo(data),
|
||||
new Pair<>(MetaTableAccessor.getRegionInfo(data),
|
||||
MetaTableAccessor.getServerName(data,0));
|
||||
if (pair == null) {
|
||||
return false;
|
||||
}
|
||||
if (!pair.getFirst().getTable().equals(tableName)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2792,7 +2796,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ProcedureEvent getInitializedEvent() {
|
||||
public ProcedureEvent<?> getInitializedEvent() {
|
||||
return initialized;
|
||||
}
|
||||
|
||||
|
@ -2811,7 +2815,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
|
||||
}
|
||||
|
||||
public ProcedureEvent getServerCrashProcessingEnabledEvent() {
|
||||
public ProcedureEvent<?> getServerCrashProcessingEnabledEvent() {
|
||||
return serverCrashProcessingEnabled;
|
||||
}
|
||||
|
||||
|
@ -3377,54 +3381,36 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
return favoredNodesManager;
|
||||
}
|
||||
|
||||
private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
|
||||
long procId = procedureExecutor.submitProcedure(procedure);
|
||||
procedure.getLatch().await();
|
||||
return procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
|
||||
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
|
||||
this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled);
|
||||
if (cpHost != null) {
|
||||
cpHost.postAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
|
||||
peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
|
||||
return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preRemoveReplicationPeer(peerId);
|
||||
}
|
||||
public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
|
||||
this.replicationManager.removeReplicationPeer(peerId);
|
||||
if (cpHost != null) {
|
||||
cpHost.postRemoveReplicationPeer(peerId);
|
||||
}
|
||||
return executePeerProcedure(new RemovePeerProcedure(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preEnableReplicationPeer(peerId);
|
||||
}
|
||||
public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
|
||||
this.replicationManager.enableReplicationPeer(peerId);
|
||||
if (cpHost != null) {
|
||||
cpHost.postEnableReplicationPeer(peerId);
|
||||
}
|
||||
return executePeerProcedure(new EnablePeerProcedure(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preDisableReplicationPeer(peerId);
|
||||
}
|
||||
public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
|
||||
this.replicationManager.disableReplicationPeer(peerId);
|
||||
if (cpHost != null) {
|
||||
cpHost.postDisableReplicationPeer(peerId);
|
||||
}
|
||||
return executePeerProcedure(new DisablePeerProcedure(peerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3443,17 +3429,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId
|
||||
+ ", config=" + peerConfig);
|
||||
this.replicationManager.updatePeerConfig(peerId, peerConfig);
|
||||
if (cpHost != null) {
|
||||
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
|
||||
", config=" + peerConfig);
|
||||
return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3605,13 +3585,18 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
|
||||
public void remoteProcedureFailed(long procId, String error) {
|
||||
public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
|
||||
RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
|
||||
if (procedure != null) {
|
||||
procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return replicationManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method modifies the master's configuration in order to inject replication-related features
|
||||
*/
|
||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource;
|
|||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
|
||||
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
|
||||
|
@ -2238,7 +2239,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
|
||||
master.remoteProcedureCompleted(request.getProcId());
|
||||
} else {
|
||||
master.remoteProcedureFailed(request.getProcId(), request.getError());
|
||||
master.remoteProcedureFailed(request.getProcId(),
|
||||
RemoteProcedureException.fromProto(request.getError()));
|
||||
}
|
||||
return ReportProcedureDoneResponse.getDefaultInstance();
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,10 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -52,8 +53,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* A curated subset of services provided by {@link HMaster}.
|
||||
* For use internally only. Passed to Managers, Services and Chores so can pass less-than-a
|
||||
|
@ -136,7 +135,7 @@ public interface MasterServices extends Server {
|
|||
* @return Tripped when Master has finished initialization.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public ProcedureEvent getInitializedEvent();
|
||||
public ProcedureEvent<?> getInitializedEvent();
|
||||
|
||||
/**
|
||||
* @return Master's instance of {@link MetricsMaster}
|
||||
|
@ -430,26 +429,26 @@ public interface MasterServices extends Server {
|
|||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
*/
|
||||
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Removes a peer and stops the replication
|
||||
* @param peerId a short name that identifies the peer
|
||||
*/
|
||||
void removeReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
long removeReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Restart the replication stream to the specified peer
|
||||
* @param peerId a short name that identifies the peer
|
||||
*/
|
||||
void enableReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
long enableReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Stop the replication stream to the specified peer
|
||||
* @param peerId a short name that identifies the peer
|
||||
*/
|
||||
void disableReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
long disableReplicationPeer(String peerId) throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Returns the configured ReplicationPeerConfig for the specified peer
|
||||
|
@ -459,12 +458,17 @@ public interface MasterServices extends Server {
|
|||
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* Returns the {@link ReplicationManager}.
|
||||
*/
|
||||
ReplicationManager getReplicationManager();
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the specified peer
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig new config for the peer
|
||||
*/
|
||||
void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -16,12 +16,10 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.assignment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -33,13 +31,16 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Base class for the Assign and Unassign Procedure.
|
||||
*
|
||||
|
@ -416,7 +417,7 @@ public abstract class RegionTransitionProcedure
|
|||
}
|
||||
|
||||
@Override
|
||||
public void remoteOperationFailed(MasterProcedureEnv env, String error) {
|
||||
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
||||
// should not be called for region operation until we modified the open/close region procedure
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.HMaster;
|
|||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
|
@ -137,6 +138,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
|||
return remoteDispatcher;
|
||||
}
|
||||
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return master.getReplicationManager();
|
||||
}
|
||||
|
||||
public boolean isRunning() {
|
||||
if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
|
||||
return master.getMasterProcedureExecutor().isRunning();
|
||||
|
|
|
@ -78,7 +78,7 @@ public abstract class ProcedurePrepareLatch {
|
|||
protected abstract void countDown(final Procedure proc);
|
||||
public abstract void await() throws IOException;
|
||||
|
||||
protected static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) {
|
||||
public static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) {
|
||||
if (latch != null) {
|
||||
latch.countDown(proc);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData;
|
||||
|
||||
/**
|
||||
* The procedure for adding a new replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AddPeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AddPeerProcedure.class);
|
||||
|
||||
private ReplicationPeerConfig peerConfig;
|
||||
|
||||
private boolean enabled;
|
||||
|
||||
public AddPeerProcedure() {
|
||||
}
|
||||
|
||||
public AddPeerProcedure(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) {
|
||||
super(peerId);
|
||||
this.peerConfig = peerConfig;
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.ADD;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||
LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId +
|
||||
", config " + peerConfig);
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(AddPeerStateData.newBuilder()
|
||||
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
AddPeerStateData data = serializer.deserialize(AddPeerStateData.class);
|
||||
peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
|
||||
enabled = data.getEnabled();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The procedure for disabling a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DisablePeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(DisablePeerProcedure.class);
|
||||
|
||||
public DisablePeerProcedure() {
|
||||
}
|
||||
|
||||
public DisablePeerProcedure(String peerId) {
|
||||
super(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.DISABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preDisableReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception {
|
||||
env.getReplicationManager().disableReplicationPeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||
LOG.info("Successfully disabled peer " + peerId);
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.postDisableReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The procedure for enabling a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class EnablePeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(EnablePeerProcedure.class);
|
||||
|
||||
public EnablePeerProcedure() {
|
||||
}
|
||||
|
||||
public EnablePeerProcedure(String peerId) {
|
||||
super(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.ENABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preEnableReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
|
||||
env.getReplicationManager().enableReplicationPeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||
LOG.info("Successfully enabled peer " + peerId);
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.postEnableReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,15 +21,22 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyPeerStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
|
||||
|
||||
/**
|
||||
* The base class for all replication peer related procedure.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class ModifyPeerProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState>
|
||||
|
@ -39,11 +46,21 @@ public abstract class ModifyPeerProcedure
|
|||
|
||||
protected String peerId;
|
||||
|
||||
// used to keep compatible with old client where we can only returns after updateStorage.
|
||||
protected ProcedurePrepareLatch latch;
|
||||
|
||||
protected ModifyPeerProcedure() {
|
||||
}
|
||||
|
||||
protected ModifyPeerProcedure(String peerId) {
|
||||
this.peerId = peerId;
|
||||
// TODO: temporarily set a 4.0 here to always wait for the procedure exection completed. Change
|
||||
// to 3.0 or 2.0 after the client modification is done.
|
||||
this.latch = ProcedurePrepareLatch.createLatch(4, 0);
|
||||
}
|
||||
|
||||
public ProcedurePrepareLatch getLatch() {
|
||||
return latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,28 +69,58 @@ public abstract class ModifyPeerProcedure
|
|||
}
|
||||
|
||||
/**
|
||||
* Return {@code false} means that the operation is invalid and we should give up, otherwise
|
||||
* {@code true}.
|
||||
* <p>
|
||||
* You need to call {@link #setFailure(String, Throwable)} to give the detail failure information.
|
||||
* Called before we start the actual processing. If an exception is thrown then we will give up
|
||||
* and mark the procedure as failed directly.
|
||||
*/
|
||||
protected abstract boolean updatePeerStorage() throws IOException;
|
||||
protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException;
|
||||
|
||||
protected void postPeerModification() {
|
||||
/**
|
||||
* We will give up and mark the procedure as failure if {@link IllegalArgumentException} is
|
||||
* thrown, for other type of Exception we will retry.
|
||||
*/
|
||||
protected abstract void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception;
|
||||
|
||||
/**
|
||||
* Called before we finish the procedure. The implementation can do some logging work, and also
|
||||
* call the coprocessor hook if any.
|
||||
* <p>
|
||||
* Notice that, since we have already done the actual work, throwing exception here will not fail
|
||||
* this procedure, we will just ignore it and finish the procedure as suceeded.
|
||||
*/
|
||||
protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException;
|
||||
|
||||
private void releaseLatch() {
|
||||
ProcedurePrepareLatch.releaseLatch(latch, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
switch (state) {
|
||||
case UPDATE_PEER_STORAGE:
|
||||
case PRE_PEER_MODIFICATION:
|
||||
try {
|
||||
if (!updatePeerStorage()) {
|
||||
assert isFailed() : "setFailure is not called";
|
||||
prePeerModification(env);
|
||||
} catch (IOException e) {
|
||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
||||
", mark the procedure as failure and give up", e);
|
||||
setFailure("prePeerModification", e);
|
||||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("update peer storage failed, retry", e);
|
||||
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case UPDATE_PEER_STORAGE:
|
||||
try {
|
||||
updatePeerStorage(env);
|
||||
} catch (IllegalArgumentException e) {
|
||||
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
|
||||
new DoNotRetryIOException(e));
|
||||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
} catch (Exception e) {
|
||||
LOG.warn(
|
||||
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
||||
|
@ -85,7 +132,13 @@ public abstract class ModifyPeerProcedure
|
|||
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case POST_PEER_MODIFICATION:
|
||||
postPeerModification();
|
||||
try {
|
||||
postPeerModification(env);
|
||||
} catch (IOException e) {
|
||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
||||
", ignore since the procedure has already done", e);
|
||||
}
|
||||
releaseLatch();
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
|
@ -107,6 +160,12 @@ public abstract class ModifyPeerProcedure
|
|||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
|
||||
throws IOException, InterruptedException {
|
||||
if (state == PeerModificationState.PRE_PEER_MODIFICATION ||
|
||||
state == PeerModificationState.UPDATE_PEER_STORAGE) {
|
||||
// actually the peer related operations has no rollback, but if we haven't done any
|
||||
// modifications on the peer storage, we can just return.
|
||||
return;
|
||||
}
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -122,6 +181,18 @@ public abstract class ModifyPeerProcedure
|
|||
|
||||
@Override
|
||||
protected PeerModificationState getInitialState() {
|
||||
return PeerModificationState.UPDATE_PEER_STORAGE;
|
||||
return PeerModificationState.PRE_PEER_MODIFICATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(ModifyPeerStateData.newBuilder().setPeerId(peerId).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
peerId = serializer.deserialize(ModifyPeerStateData.class).getPeerId();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -118,15 +120,22 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
|||
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
|
||||
}
|
||||
|
||||
private void complete(MasterProcedureEnv env, boolean succ) {
|
||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||
if (event == null) {
|
||||
LOG.warn("procedure event for " + getProcId() +
|
||||
" is null, maybe the procedure is created when recovery", new Exception());
|
||||
" is null, maybe the procedure is created when recovery",
|
||||
new Exception());
|
||||
return;
|
||||
}
|
||||
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer +
|
||||
(succ ? " suceeded" : " failed"));
|
||||
this.succ = succ;
|
||||
if (error != null) {
|
||||
LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed",
|
||||
error);
|
||||
this.succ = false;
|
||||
} else {
|
||||
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded");
|
||||
this.succ = true;
|
||||
}
|
||||
|
||||
event.wake(env.getProcedureScheduler());
|
||||
event = null;
|
||||
}
|
||||
|
@ -134,17 +143,18 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
|||
@Override
|
||||
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
|
||||
IOException exception) {
|
||||
complete(env, false);
|
||||
complete(env, exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||
complete(env, true);
|
||||
complete(env, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) {
|
||||
complete(env, false);
|
||||
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
|
||||
RemoteProcedureException error) {
|
||||
complete(env, error);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The procedure for removing a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RemovePeerProcedure.class);
|
||||
|
||||
public RemovePeerProcedure() {
|
||||
}
|
||||
|
||||
public RemovePeerProcedure(String peerId) {
|
||||
super(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.REMOVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preRemoveReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) throws Exception {
|
||||
env.getReplicationManager().removeReplicationPeer(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||
LOG.info("Successfully removed peer " + peerId);
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.postRemoveReplicationPeer(peerId);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,10 +27,8 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
|
@ -39,24 +37,21 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Manages and performs all replication admin operations.
|
||||
* <p>
|
||||
* Used to add/remove a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationManager {
|
||||
|
||||
private final Configuration conf;
|
||||
private final ZKWatcher zkw;
|
||||
private final ReplicationQueuesClient replicationQueuesClient;
|
||||
private final ReplicationPeers replicationPeers;
|
||||
|
||||
public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
this.zkw = zkw;
|
||||
try {
|
||||
this.replicationQueuesClient = ReplicationFactory
|
||||
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
|
||||
|
@ -70,7 +65,7 @@ public class ReplicationManager {
|
|||
}
|
||||
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException {
|
||||
throws ReplicationException {
|
||||
checkPeerConfig(peerConfig);
|
||||
replicationPeers.registerPeer(peerId, peerConfig, enabled);
|
||||
replicationPeers.peerConnected(peerId);
|
||||
|
@ -89,8 +84,8 @@ public class ReplicationManager {
|
|||
this.replicationPeers.disablePeer(peerId);
|
||||
}
|
||||
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException,
|
||||
ReplicationPeerNotFoundException {
|
||||
public ReplicationPeerConfig getPeerConfig(String peerId)
|
||||
throws ReplicationException, ReplicationPeerNotFoundException {
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
|
||||
if (peerConfig == null) {
|
||||
throw new ReplicationPeerNotFoundException(peerId);
|
||||
|
@ -110,9 +105,9 @@ public class ReplicationManager {
|
|||
List<String> peerIds = replicationPeers.getAllPeerIds();
|
||||
for (String peerId : peerIds) {
|
||||
if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
|
||||
peers.add(new ReplicationPeerDescription(peerId, replicationPeers
|
||||
.getStatusOfPeerFromBackingStore(peerId), replicationPeers
|
||||
.getReplicationPeerConfig(peerId)));
|
||||
peers.add(new ReplicationPeerDescription(peerId,
|
||||
replicationPeers.getStatusOfPeerFromBackingStore(peerId),
|
||||
replicationPeers.getReplicationPeerConfig(peerId)));
|
||||
}
|
||||
}
|
||||
return peers;
|
||||
|
@ -126,13 +121,12 @@ public class ReplicationManager {
|
|||
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
|
||||
*/
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException, IOException {
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) {
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
||||
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
|
||||
+ " when replicate_all flag is true");
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
|
||||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " +
|
||||
"when you want replicate all cluster");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
|
||||
peerConfig.getExcludeTableCFsMap());
|
||||
|
@ -141,7 +135,7 @@ public class ReplicationManager {
|
|||
&& !peerConfig.getExcludeNamespaces().isEmpty())
|
||||
|| (peerConfig.getExcludeTableCFsMap() != null
|
||||
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
|
||||
throw new ReplicationException(
|
||||
throw new IllegalArgumentException(
|
||||
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
|
||||
+ " when replicate_all flag is false");
|
||||
}
|
||||
|
@ -154,20 +148,24 @@ public class ReplicationManager {
|
|||
/**
|
||||
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
||||
* to the peer cluster.
|
||||
* 1. If peer config already has a namespace, then not allow set any table of this namespace
|
||||
* to the peer config.
|
||||
* 2. If peer config already has a table, then not allow set this table's namespace to the peer
|
||||
* config.
|
||||
*
|
||||
* <ol>
|
||||
* <li>If peer config already has a namespace, then not allow set any table of this namespace to
|
||||
* the peer config.</li>
|
||||
* <li>If peer config already has a table, then not allow set this table's namespace to the peer
|
||||
* config.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
|
||||
* replicated to the peer cluster.
|
||||
* 1. If peer config already has a exclude namespace, then not allow set any exclude table of
|
||||
* this namespace to the peer config.
|
||||
* 2. If peer config already has a exclude table, then not allow set this table's namespace
|
||||
* as a exclude namespace.
|
||||
* <ol>
|
||||
* <li>If peer config already has a exclude namespace, then not allow set any exclude table of
|
||||
* this namespace to the peer config.</li>
|
||||
* <li>If peer config already has a exclude table, then not allow set this table's namespace as a
|
||||
* exclude namespace.</li>
|
||||
* </ol>
|
||||
*/
|
||||
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) {
|
||||
if (namespaces == null || namespaces.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
@ -177,24 +175,22 @@ public class ReplicationManager {
|
|||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
if (namespaces.contains(table.getNamespaceAsString())) {
|
||||
throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
|
||||
throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces "
|
||||
+ table.getNamespaceAsString() + " in peer config");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||
throws IOException {
|
||||
String filterCSV = peerConfig.getConfiguration().
|
||||
get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterCSV != null && !filterCSV.isEmpty()){
|
||||
String [] filters = filterCSV.split(",");
|
||||
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) {
|
||||
String filterCSV = peerConfig.getConfiguration()
|
||||
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
if (filterCSV != null && !filterCSV.isEmpty()) {
|
||||
String[] filters = filterCSV.split(",");
|
||||
for (String filter : filters) {
|
||||
try {
|
||||
Class clazz = Class.forName(filter);
|
||||
Object o = clazz.newInstance();
|
||||
Class.forName(filter).newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
|
||||
throw new IllegalArgumentException("Configured WALEntryFilter " + filter +
|
||||
" could not be created. Failing add/update " + "peer operation.", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
|
||||
|
||||
/**
|
||||
* The procedure for updating the config for a replication peer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(UpdatePeerConfigProcedure.class);
|
||||
|
||||
private ReplicationPeerConfig peerConfig;
|
||||
|
||||
public UpdatePeerConfigProcedure() {
|
||||
}
|
||||
|
||||
public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) {
|
||||
super(peerId);
|
||||
this.peerConfig = peerConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.UPDATE_CONFIG;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env)
|
||||
throws IllegalArgumentException, Exception {
|
||||
env.getReplicationManager().updatePeerConfig(peerId, peerConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||
LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig);
|
||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
serializer.serialize(UpdatePeerConfigStateData.newBuilder()
|
||||
.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
peerConfig = ReplicationPeerConfigUtil
|
||||
.convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig());
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -144,6 +144,7 @@ import org.apache.hadoop.hbase.util.CompressionTest;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
|
||||
|
@ -3684,7 +3685,7 @@ public class HRegionServer extends HasThread implements
|
|||
ReportProcedureDoneRequest.newBuilder().setProcId(procId);
|
||||
if (error != null) {
|
||||
builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
|
||||
.setError(Throwables.getStackTraceAsString(error));
|
||||
.setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error));
|
||||
} else {
|
||||
builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
|
@ -45,7 +45,10 @@ public class RefreshPeerCallable implements RSProcedureCallable {
|
|||
if (initError != null) {
|
||||
throw initError;
|
||||
}
|
||||
rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close();
|
||||
Path dir = new Path("/" + peerId);
|
||||
if (rs.getFileSystem().exists(dir)) {
|
||||
rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
|
@ -368,7 +369,6 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
|
||||
@Override
|
||||
public ClusterConnection getClusterConnection() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -398,20 +398,24 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeReplicationPeer(String peerId) throws ReplicationException {
|
||||
public long removeReplicationPeer(String peerId) throws ReplicationException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -421,8 +425,9 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException, IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -457,7 +462,6 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
|
||||
@Override
|
||||
public ProcedureEvent getInitializedEvent() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -470,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
public Connection createConnection(Configuration conf) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationManager getReplicationManager() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
|
||||
public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
|
||||
|
||||
|
@ -34,8 +34,15 @@ public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean updatePeerStorage() throws IOException {
|
||||
return true;
|
||||
protected void prePeerModification(MasterProcedureEnv env) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updatePeerStorage(MasterProcedureEnv env) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postPeerModification(MasterProcedureEnv env) {
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue