HBASE-19957 General framework to transit sync replication state

This commit is contained in:
zhangduo 2018-02-09 18:33:28 +08:00
parent 00e54aae24
commit 39dd81a7c6
33 changed files with 729 additions and 299 deletions

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.Collection;
@ -25,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The POJO equivalent of ReplicationProtos.ReplicationPeerDescription
* The POJO equivalent of ReplicationProtos.ReplicationPeerDescription.
* <p>
* To developer, here we do not store the new sync replication state since it is just an
* intermediate state and this class is public.
*/
@InterfaceAudience.Public
public class ReplicationPeerDescription {

View File

@ -29,14 +29,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* Used by synchronous replication. Indicate the state of the current cluster in a synchronous
* replication peer. The state may be one of {@link SyncReplicationState#ACTIVE},
* {@link SyncReplicationState#DOWNGRADE_ACTIVE} or
* {@link SyncReplicationState#STANDBY}.
* {@link SyncReplicationState#DOWNGRADE_ACTIVE} or {@link SyncReplicationState#STANDBY}.
* <p>
* For asynchronous replication, the state is {@link SyncReplicationState#NONE}.
*/
@InterfaceAudience.Public
public enum SyncReplicationState {
NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY;
NONE(0), ACTIVE(1), DOWNGRADE_ACTIVE(2), STANDBY(3);
private final byte value;
private SyncReplicationState(int value) {
this.value = (byte) value;
}
public static SyncReplicationState valueOf(int value) {
switch (value) {
@ -53,13 +58,17 @@ public enum SyncReplicationState {
}
}
public int value() {
return value & 0xFF;
}
public static byte[] toByteArray(SyncReplicationState state) {
return ProtobufUtil
.prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
.prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
}
public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException {
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
}
}

View File

@ -1355,6 +1355,9 @@ public final class HConstants {
public static final String NOT_IMPLEMENTED = "Not implemented";
// TODO: need to find a better place to hold it.
public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -391,6 +391,17 @@ enum PeerModificationState {
POST_PEER_MODIFICATION = 8;
}
enum PeerSyncReplicationStateTransitionState {
PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
REPLAY_REMOTE_WAL_IN_PEER = 4;
REOPEN_ALL_REGIONS_IN_PEER = 5;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8;
}
message PeerModificationStateData {
required string peer_id = 1;
}
@ -401,18 +412,23 @@ enum PeerModificationType {
ENABLE_PEER = 3;
DISABLE_PEER = 4;
UPDATE_PEER_CONFIG = 5;
TRANSIT_SYNC_REPLICATION_STATE = 6;
}
message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
/** We need multiple stages for sync replication state transition **/
optional uint32 stage = 4 [default = 0];
}
message RefreshPeerParameter {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
/** We need multiple stages for sync replication state transition **/
optional uint32 stage = 4 [default = 0];;
}
message PeerProcedureStateData {
@ -459,5 +475,7 @@ message InitMetaStateData {
}
message TransitPeerSyncReplicationStateStateData {
required SyncReplicationState syncReplicationState = 1;
/** Could be null if we fail in pre check, so optional */
optional SyncReplicationState fromState = 1;
required SyncReplicationState toState = 2;
}

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@ -36,7 +37,14 @@ public class ReplicationPeerImpl implements ReplicationPeer {
private volatile PeerState peerState;
private volatile SyncReplicationState syncReplicationState;
// The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync
// replication state. Embedded in one int so user can not get an inconsistency view of state and
// new state.
private volatile int syncReplicationStateBits;
private static final int SHIFT = 16;
private static final int AND_BITS = 0xFFFF;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
@ -48,12 +56,14 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
boolean peerState, SyncReplicationState syncReplicationState) {
boolean peerState, SyncReplicationState syncReplicationState,
SyncReplicationState newSyncReplicationState) {
this.conf = conf;
this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
this.syncReplicationState = syncReplicationState;
this.syncReplicationStateBits =
syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
this.peerConfigListeners = new ArrayList<>();
}
@ -66,6 +76,16 @@ public class ReplicationPeerImpl implements ReplicationPeer {
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
}
public void setNewSyncReplicationState(SyncReplicationState newState) {
this.syncReplicationStateBits =
(this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT);
}
public void transitSyncReplicationState() {
this.syncReplicationStateBits =
(this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT);
}
/**
* Get the identifier of this peer
* @return string representation of the id (short)
@ -80,9 +100,26 @@ public class ReplicationPeerImpl implements ReplicationPeer {
return peerState;
}
private static SyncReplicationState getSyncReplicationState(int bits) {
return SyncReplicationState.valueOf(bits & AND_BITS);
}
private static SyncReplicationState getNewSyncReplicationState(int bits) {
return SyncReplicationState.valueOf(bits >>> SHIFT);
}
public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() {
int bits = this.syncReplicationStateBits;
return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits));
}
public SyncReplicationState getNewSyncReplicationState() {
return getNewSyncReplicationState(syncReplicationStateBits);
}
@Override
public SyncReplicationState getSyncReplicationState() {
return syncReplicationState;
return getSyncReplicationState(syncReplicationStateBits);
}
@Override

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -72,16 +71,30 @@ public interface ReplicationPeerStorage {
ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
/**
* Set the state of current cluster in a synchronous replication peer.
* Set the new sync replication state that we are going to transit to.
* @throws ReplicationException if there are errors accessing the storage service.
*/
void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException;
/**
* Get the state of current cluster in a synchronous replication peer.
* Overwrite the sync replication state with the new sync replication state which is set with the
* {@link #setPeerNewSyncReplicationState(String, SyncReplicationState)} method above, and clear
* the new sync replication state.
* @throws ReplicationException if there are errors accessing the storage service.
*/
SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException;
void transitPeerSyncReplicationState(String peerId) throws ReplicationException;
/**
* Get the sync replication state.
* @throws ReplicationException if there are errors accessing the storage service.
*/
SyncReplicationState getPeerSyncReplicationState(String peerId) throws ReplicationException;
/**
* Get the new sync replication state. Will return {@link SyncReplicationState#NONE} if we are
* not in a transition.
* @throws ReplicationException if there are errors accessing the storage service.
*/
SyncReplicationState getPeerNewSyncReplicationState(String peerId) throws ReplicationException;
}

View File

@ -80,8 +80,8 @@ public class ReplicationPeers {
return true;
}
public void removePeer(String peerId) {
peerCache.remove(peerId);
public ReplicationPeerImpl removePeer(String peerId) {
return peerCache.remove(peerId);
}
/**
@ -110,22 +110,29 @@ public class ReplicationPeers {
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
peer.setPeerState(peerStorage.isPeerEnabled(peerId));
return peer.getPeerState();
}
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
return peer.getPeerConfig();
}
public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
peer.setNewSyncReplicationState(newState);
return newState;
}
public void transitPeerSyncReplicationState(String peerId) {
ReplicationPeerImpl peer = peerCache.get(peerId);
peer.transitSyncReplicationState();
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
@ -135,7 +142,9 @@ public class ReplicationPeers {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState);
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
}
}

View File

@ -53,7 +53,12 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state";
public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state";
public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = "new-sync-rep-state";
public static final byte[] NONE_STATE_ZNODE_BYTES =
SyncReplicationState.toByteArray(SyncReplicationState.NONE);
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
@ -85,7 +90,11 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@VisibleForTesting
public String getSyncReplicationStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE);
return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE);
}
private String getNewSyncReplicationStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE);
}
@Override
@ -97,14 +106,15 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
SyncReplicationState.toByteArray(syncReplicationState)));
SyncReplicationState.toByteArray(syncReplicationState)),
ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper, multiOps, false);
} catch (KeeperException e) {
throw new ReplicationException(
"Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" +
(enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
(enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
e);
}
}
@ -136,7 +146,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ReplicationPeerConfigUtil.toByteArray(peerConfig));
} catch (KeeperException e) {
throw new ReplicationException(
"There was a problem trying to save changes to the " + "replication peer " + peerId, e);
"There was a problem trying to save changes to the " + "replication peer " + peerId, e);
}
}
@ -170,38 +180,63 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
}
if (data == null || data.length == 0) {
throw new ReplicationException(
"Replication peer config data shouldn't be empty, peerId=" + peerId);
"Replication peer config data shouldn't be empty, peerId=" + peerId);
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
throw new ReplicationException(
"Failed to parse replication peer config for peer with id=" + peerId, e);
"Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
@Override
public void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
try {
ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId),
ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId),
SyncReplicationState.toByteArray(state));
} catch (KeeperException e) {
throw new ReplicationException(
"Unable to change the cluster state for the synchronous replication peer with id=" + peerId,
e);
"Unable to set the new sync replication state for peer with id=" + peerId, e);
}
}
@Override
public void transitPeerSyncReplicationState(String peerId) throws ReplicationException {
String newStateNode = getNewSyncReplicationStateNode(peerId);
try {
byte[] data = ZKUtil.getData(zookeeper, newStateNode);
ZKUtil.multiOrSequential(zookeeper,
Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES),
ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)),
false);
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException(
"Error transiting sync replication state for peer with id=" + peerId, e);
}
}
private SyncReplicationState getSyncReplicationState(String peerId, String path)
throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, path);
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
"Error getting sync replication state of path " + path + " for peer with id=" + peerId, e);
}
}
@Override
public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
throws ReplicationException {
return getSyncReplicationState(peerId, getNewSyncReplicationStateNode(peerId));
}
@Override
public SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException {
try {
byte[] data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
"Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
}
return getSyncReplicationState(peerId, getSyncReplicationStateNode(peerId));
}
}

View File

@ -1395,7 +1395,7 @@ public interface MasterObserver {
* Called before transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
* @param state a new state
* @param state the new state
*/
default void preTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
@ -1406,11 +1406,12 @@ public interface MasterObserver {
* Called after transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
* @param state a new state
* @param from the old state
* @param to the new state
*/
default void postTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
SyncReplicationState state) throws IOException {
SyncReplicationState from, SyncReplicationState to) throws IOException {
}
/**

View File

@ -133,10 +133,10 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
@ -3493,7 +3493,7 @@ public class HMaster extends HRegionServer implements MasterServices {
return favoredNodesManager;
}
private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;

View File

@ -1607,22 +1607,22 @@ public class MasterCoprocessorHost
});
}
public void preTransitReplicationPeerSyncReplicationState(final String peerId,
final SyncReplicationState clusterState) throws IOException {
public void preTransitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
observer.preTransitReplicationPeerSyncReplicationState(this, peerId, state);
}
});
}
public void postTransitReplicationPeerSyncReplicationState(final String peerId,
final SyncReplicationState clusterState) throws IOException {
public void postTransitReplicationPeerSyncReplicationState(String peerId,
SyncReplicationState from, SyncReplicationState to) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
observer.postTransitReplicationPeerSyncReplicationState(this, peerId, from, to);
}
});
}

View File

@ -46,7 +46,7 @@ public abstract class AbstractPeerProcedure<TState>
protected AbstractPeerProcedure(String peerId) {
this.peerId = peerId;
this.latch = ProcedurePrepareLatch.createLatch(2, 0);
this.latch = ProcedurePrepareLatch.createLatch(2, 1);
}
public ProcedurePrepareLatch getLatch() {
@ -94,4 +94,16 @@ public abstract class AbstractPeerProcedure<TState>
super.deserializeStateData(serializer);
peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId();
}
@Override
protected void rollbackState(MasterProcedureEnv env, TState state)
throws IOException, InterruptedException {
if (state == getInitialState()) {
// actually the peer related operations has no rollback, but if we haven't done any
// modifications on the peer storage yet, we can just return.
return;
}
throw new UnsupportedOperationException();
}
}

View File

@ -327,17 +327,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
@Override
protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
throws IOException, InterruptedException {
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
// actually the peer related operations has no rollback, but if we haven't done any
// modifications on the peer storage yet, we can just return.
return;
}
throw new UnsupportedOperationException();
}
@Override
protected PeerModificationState getState(int stateId) {
return PeerModificationState.forNumber(stateId);

View File

@ -55,6 +55,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
justification = "Will never change after construction")
private ServerName targetServer;
private int stage;
private boolean dispatched;
private ProcedureEvent<?> event;
@ -65,9 +67,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
}
public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
this(peerId, type, targetServer, 0);
}
public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer,
int stage) {
this.peerId = peerId;
this.type = type;
this.targetServer = targetServer;
this.stage = stage;
}
@Override
@ -92,6 +100,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
return PeerModificationType.DISABLE_PEER;
case UPDATE_CONFIG:
return PeerModificationType.UPDATE_PEER_CONFIG;
case TRANSIT_SYNC_REPLICATION_STATE:
return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
@ -109,6 +119,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
return PeerOperationType.DISABLE;
case UPDATE_PEER_CONFIG:
return PeerOperationType.UPDATE_CONFIG;
case TRANSIT_SYNC_REPLICATION_STATE:
return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
@ -119,7 +131,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
assert targetServer.equals(remote);
return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
.setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build()
.toByteArray());
}
private void complete(MasterProcedureEnv env, Throwable error) {
@ -196,7 +209,7 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build());
}
@Override
@ -205,5 +218,6 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
peerId = data.getPeerId();
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
stage = data.getStage();
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -50,6 +49,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
/**
* Manages and performs all replication admin operations.
* <p>
@ -64,15 +66,11 @@ public class ReplicationPeerManager {
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition =
new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) {
{
put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
put(SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE));
}
};
private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>>
allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY,
EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
@ -165,9 +163,9 @@ public class ReplicationPeerManager {
if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
throw new DoNotRetryIOException(
"Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
"dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
" does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
"Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
"dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
" does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
}
if (oldPeerConfig.isSyncReplication()) {
@ -180,15 +178,19 @@ public class ReplicationPeerManager {
return desc;
}
public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws DoNotRetryIOException {
/**
* @return the old state.
*/
public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
if (allowedToStates == null || !allowedToStates.contains(state)) {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId);
" to " + state + " for peer id=" + peerId);
}
return fromState;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@ -199,8 +201,8 @@ public class ReplicationPeerManager {
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
: SyncReplicationState.NONE;
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
: SyncReplicationState.NONE;
peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
peers.put(peerId,
new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
@ -240,7 +242,7 @@ public class ReplicationPeerManager {
ReplicationPeerDescription desc = peers.get(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
ReplicationPeerConfigBuilder newPeerConfigBuilder =
ReplicationPeerConfig.newBuilder(peerConfig);
ReplicationPeerConfig.newBuilder(peerConfig);
// we need to use the new conf to overwrite the old one.
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
@ -257,7 +259,7 @@ public class ReplicationPeerManager {
return new ArrayList<>(peers.values());
}
return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
.collect(Collectors.toList());
.collect(Collectors.toList());
}
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
@ -269,12 +271,23 @@ public class ReplicationPeerManager {
queueStorage.removeLastSequenceIds(peerId);
}
public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state)
public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
peerStorage.setPeerNewSyncReplicationState(peerId, state);
}
public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
throws ReplicationException {
if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
// Only transit if this is not a retry
peerStorage.transitPeerSyncReplicationState(peerId);
}
ReplicationPeerDescription desc = peers.get(peerId);
peerStorage.setPeerSyncReplicationState(peerId, state);
peers.put(peerId,
new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state));
if (desc.getSyncReplicationState() != newState) {
// Only recreate the desc if this is not a retry
peers.put(peerId,
new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
}
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
@ -301,10 +314,10 @@ public class ReplicationPeerManager {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
// Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
// cluster.
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
+ "when you want replicate all cluster");
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
"when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
@ -312,13 +325,13 @@ public class ReplicationPeerManager {
// If replicate_all flag is false, it means all user tables can't be replicated to peer
// cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
// cluster.
if ((peerConfig.getExcludeNamespaces() != null
&& !peerConfig.getExcludeNamespaces().isEmpty())
|| (peerConfig.getExcludeTableCFsMap() != null
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
if ((peerConfig.getExcludeNamespaces() != null &&
!peerConfig.getExcludeNamespaces().isEmpty()) ||
(peerConfig.getExcludeTableCFsMap() != null &&
!peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
+ " when replicate_all flag is false");
"Need clean exclude-namespaces or exclude-table-cfs config firstly" +
" when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
@ -338,11 +351,11 @@ public class ReplicationPeerManager {
// TODO: Add namespace, replicat_all flag back
if (peerConfig.replicateAllUserTables()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
@ -350,7 +363,7 @@ public class ReplicationPeerManager {
for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
if (cfs != null && !cfs.isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
"Only support replicated table config for sync replication peer");
}
}
}
@ -394,7 +407,7 @@ public class ReplicationPeerManager {
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {

View File

@ -18,11 +18,12 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
@ -32,26 +33,29 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
/**
* The procedure for transit current cluster state for a synchronous replication peer.
* The procedure for transit current sync replication state for a synchronous replication peer.
*/
@InterfaceAudience.Private
public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure {
public class TransitPeerSyncReplicationStateProcedure
extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
private static final Logger LOG =
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
private SyncReplicationState state;
private SyncReplicationState fromState;
private SyncReplicationState toState;
public TransitPeerSyncReplicationStateProcedure() {
}
public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
super(peerId);
this.state = state;
this.toState = state;
}
@Override
@ -59,100 +63,155 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur
return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state);
}
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state);
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state);
}
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}",
state, peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state);
}
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
.setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build());
TransitPeerSyncReplicationStateStateData.Builder builder =
TransitPeerSyncReplicationStateStateData.newBuilder()
.setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
if (fromState != null) {
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
}
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
TransitPeerSyncReplicationStateStateData data =
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState());
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
if (data.hasFromState()) {
fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
}
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
protected PeerSyncReplicationStateTransitionState getState(int stateId) {
return PeerSyncReplicationStateTransitionState.forNumber(stateId);
}
@Override
protected int getStateId(PeerSyncReplicationStateTransitionState state) {
return state.getNumber();
}
@Override
protected PeerSyncReplicationStateTransitionState getInitialState() {
return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
}
private void preTransit(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
}
fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
}
private void postTransit(MasterProcedureEnv env) throws IOException {
LOG.info(
"Successfully transit current cluster state from {} to {} for sync replication peer {}",
fromState, toState, peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
fromState, toState);
}
}
private List<RegionInfo> getRegionsToReopen(MasterProcedureEnv env) {
return env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet()
.stream()
.flatMap(tn -> env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn).stream())
.collect(Collectors.toList());
}
@Override
protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case PRE_PEER_MODIFICATION:
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
prePeerModification(env);
preTransit(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch();
LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " +
"when transiting sync replication peer state to {}, " +
"mark the procedure as failure and give up", peerId, toState, e);
setFailure("master-transit-peer-sync-replication-state", e);
return Flow.NO_MORE_STATE;
}
setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try {
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
} catch (ReplicationException e) {
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
peerId, e);
LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " +
"replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
e);
throw new ProcedureYieldException();
case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new));
if (fromState == SyncReplicationState.STANDBY &&
toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
} else {
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
// TODO: Need add child procedure for every RegionServer
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
case REPLAY_REMOTE_WAL_IN_PEER:
// TODO: replay remote wal when transiting from S to DA.
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
case REOPEN_ALL_REGIONS_IN_PEER:
try {
postPeerModification(env);
} catch (ReplicationException e) {
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
getClass().getName(), peerId, e);
throw new ProcedureYieldException();
addChildProcedure(
env.getAssignmentManager().createReopenProcedures(getRegionsToReopen(env)));
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
LOG.warn("Failed to schedule region reopen for peer {} when starting transiting sync " +
"replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
try {
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
} catch (ReplicationException e) {
LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " +
"replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
postTransit(env);
} catch (IOException e) {
LOG.warn(
"Failed to call post CP hook for peer {} when transiting sync replication " +
"peer state from {} to {}, ignore since the procedure has already done",
peerId, fromState, toState, e);
}
releaseLatch();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
private void releaseLatch() {
ProcedurePrepareLatch.releaseLatch(latch, this);
}
}

View File

@ -1802,26 +1802,32 @@ public class HRegionServer extends HasThread implements
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf));
if (isMasterNoTableOrSystemTableOnly) {
conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false);
}
WALFactory factory = new WALFactory(conf, serverName.toString());
if (!isMasterNoTableOrSystemTableOnly) {
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Path logDir = new Path(walRootDir, logName);
LOG.debug("logDir={}", logDir);
if (this.walFs.exists(logDir)) {
throw new RegionServerRunningException(
"Region server has already created directory at " + this.serverName.toString());
Path logDir = new Path(walRootDir, logName);
LOG.debug("logDir={}", logDir);
if (this.walFs.exists(logDir)) {
throw new RegionServerRunningException(
"Region server has already created directory at " + this.serverName.toString());
}
// Always create wal directory as now we need this when master restarts to find out the live
// region servers.
if (!this.walFs.mkdirs(logDir)) {
throw new IOException("Can not create wal directory " + logDir);
}
// Instantiate replication if replication enabled. Pass it the log directories.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
factory.getWALProvider());
}
// Always create wal directory as now we need this when master restarts to find out the live
// region servers.
if (!this.walFs.mkdirs(logDir)) {
throw new IOException("Can not create wal directory " + logDir);
}
// Instantiate replication if replication enabled. Pass it the log directories.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
factory.getWALProvider());
this.walFactory = factory;
}
@ -2943,11 +2949,6 @@ public class HRegionServer extends HasThread implements
*/
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
if ((server instanceof HMaster) &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
return;
}
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);

View File

@ -18,16 +18,21 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A source for a replication stream has to expose this service.
* This service allows an application to hook into the
* regionserver and watch for new transactions.
* A source for a replication stream has to expose this service. This service allows an application
* to hook into the regionserver and watch for new transactions.
*/
@InterfaceAudience.Private
public interface ReplicationSourceService extends ReplicationService {
/**
* Returns an info provider for sync replication peer.
*/
SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider();
/**
* Returns a Handler to handle peer procedures.
*/

View File

@ -28,8 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface PeerActionListener {
default void peerRemoved(String peerId) {}
static final PeerActionListener DUMMY = new PeerActionListener() {};
default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to) {}
SyncReplicationState to, int stage) {}
}

View File

@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
@ -29,13 +28,16 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface PeerProcedureHandler {
public void addPeer(String peerId) throws ReplicationException, IOException;
void addPeer(String peerId) throws ReplicationException, IOException;
public void removePeer(String peerId) throws ReplicationException, IOException;
void removePeer(String peerId) throws ReplicationException, IOException;
public void disablePeer(String peerId) throws ReplicationException, IOException;
void disablePeer(String peerId) throws ReplicationException, IOException;
public void enablePeer(String peerId) throws ReplicationException, IOException;
void enablePeer(String peerId) throws ReplicationException, IOException;
public void updatePeerConfig(String peerId) throws ReplicationException, IOException;
void updatePeerConfig(String peerId) throws ReplicationException, IOException;
void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
throws ReplicationException, IOException;
}

View File

@ -19,23 +19,32 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private final ReplicationSourceManager replicationSourceManager;
private final PeerActionListener peerActionListener;
private final KeyLocker<String> peersLock = new KeyLocker<>();
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
PeerActionListener peerActionListener) {
this.replicationSourceManager = replicationSourceManager;
this.peerActionListener = peerActionListener;
}
@Override
@ -61,7 +70,6 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
ReplicationPeerImpl peer = null;
PeerState oldState = null;
@ -72,7 +80,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldState = peer.getPeerState();
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
// RS need to start work with the new replication state change
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
@ -132,4 +140,42 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
peerLock.unlock();
}
}
@Override
public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
throws ReplicationException, IOException {
ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
Lock peerLock = peersLock.acquireLock(peerId);
try {
ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
if (!peer.getPeerConfig().isSyncReplication()) {
throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
}
SyncReplicationState newState = peer.getNewSyncReplicationState();
if (stage == 0) {
if (newState != SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
"this should be a retry, give up", peerId, newState);
return;
}
newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
SyncReplicationState oldState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
} else {
if (newState == SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been clear, and the " +
"current state is {}, this should be a retry, give up", peerId, newState);
return;
}
SyncReplicationState oldState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
peer.transitSyncReplicationState();
}
} finally {
peerLock.unlock();
}
}
}

View File

@ -35,12 +35,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
public class RefreshPeerCallable implements RSProcedureCallable {
private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class);
private HRegionServer rs;
private String peerId;
private PeerModificationType type;
private int stage;
private Exception initError;
@Override
@ -67,6 +70,9 @@ public class RefreshPeerCallable implements RSProcedureCallable {
case UPDATE_PEER_CONFIG:
handler.updatePeerConfig(this.peerId);
break;
case TRANSIT_SYNC_REPLICATION_STATE:
handler.transitSyncReplicationPeerState(peerId, stage, rs);
break;
default:
throw new IllegalArgumentException("Unknown peer modification type: " + type);
}
@ -80,6 +86,7 @@ public class RefreshPeerCallable implements RSProcedureCallable {
RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
this.stage = param.getStage();
} catch (InvalidProtocolBufferException e) {
initError = e;
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
@ -66,6 +67,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
/** Statistics thread schedule pool */
@ -120,19 +122,30 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
mapping);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
if (walProvider instanceof SyncReplicationWALProvider) {
SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
peerActionListener = syncWALProvider;
syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
}
}
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
this.peerProcedureHandler =
new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
}
@Override
@ -270,4 +283,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
}
@Override
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider;
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@ -136,6 +137,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// For recovered source, the queue id's format is peer_id-servername-*
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
@ -170,9 +173,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
// CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying.
WALFileLengthProvider walFileLengthProvider,
SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
@ -185,10 +187,11 @@ public class ReplicationSourceManager implements ReplicationListener {
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
// seconds
// 30 seconds
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
@ -249,8 +252,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
* HFile Refs
* <ol>
* <li>Add peer to replicationPeers</li>
* <li>Add the normal source and related replication queue</li>
* <li>Add HFile Refs</li>
* </ol>
* @param peerId the id of replication peer
*/
public void addPeer(String peerId) throws IOException {
@ -269,13 +275,16 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
* and related replication queues 3. Remove the normal source and related replication queue 4.
* Remove HFile Refs
* <ol>
* <li>Remove peer for replicationPeers</li>
* <li>Remove all the recovered sources for the specified id and related replication queues</li>
* <li>Remove the normal source and related replication queue</li>
* <li>Remove HFile Refs</li>
* </ol>
* @param peerId the id of the replication peer
*/
public void removePeer(String peerId) {
replicationPeers.removePeer(peerId);
ReplicationPeer peer = replicationPeers.removePeer(peerId);
String terminateMessage = "Replication stream was removed by a user";
List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
@ -306,7 +315,10 @@ public class ReplicationSourceManager implements ReplicationListener {
deleteQueue(peerId);
this.walsById.remove(peerId);
}
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
if (peerConfig.isSyncReplication()) {
syncReplicationPeerMappingManager.remove(peerId, peerConfig);
}
// Remove HFile Refs
abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
}
@ -358,6 +370,10 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
if (peerConfig.isSyncReplication()) {
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
}
src.startup();
return src;
}
@ -435,6 +451,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsById.remove(src.getQueueId());
}
/**

View File

@ -19,17 +19,25 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Get the peer id and remote root dir if the region is synchronously replicated.
* Get the information for a sync replication peer.
*/
@InterfaceAudience.Private
public interface SyncReplicationPeerProvider {
public interface SyncReplicationPeerInfoProvider {
/**
* Return the peer id and remote WAL directory if the region is synchronously replicated.
* Return the peer id and remote WAL directory if the region is synchronously replicated and the
* state is {@link SyncReplicationState#ACTIVE}.
*/
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
/**
* Check whether the give region is contained in a sync replication peer which is in the given
* state.
*/
boolean isInState(RegionInfo info, SyncReplicationState state);
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProvider {
private final ReplicationPeers replicationPeers;
private final SyncReplicationPeerMappingManager mapping;
SyncReplicationPeerInfoProviderImpl(ReplicationPeers replicationPeers,
SyncReplicationPeerMappingManager mapping) {
this.replicationPeers = replicationPeers;
this.mapping = mapping;
}
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
String peerId = mapping.getPeerId(info);
if (peerId == null) {
return Optional.empty();
}
ReplicationPeer peer = replicationPeers.getPeer(peerId);
if (peer == null) {
return Optional.empty();
}
if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) {
return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
} else {
return Optional.empty();
}
}
@Override
public boolean isInState(RegionInfo info, SyncReplicationState state) {
String peerId = mapping.getPeerId(info);
if (peerId == null) {
return false;
}
ReplicationPeer peer = replicationPeers.getPeer(peerId);
if (peer == null) {
return false;
}
return peer.getSyncReplicationState() == state;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to map region to sync replication peer id.
* <p>
* TODO: now only support include table options.
*/
@InterfaceAudience.Private
class SyncReplicationPeerMappingManager {
private final ConcurrentMap<TableName, String> table2PeerId = new ConcurrentHashMap<>();
void add(String peerId, ReplicationPeerConfig peerConfig) {
peerConfig.getTableCFsMap().keySet().forEach(tn -> table2PeerId.put(tn, peerId));
}
void remove(String peerId, ReplicationPeerConfig peerConfig) {
peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove);
}
String getPeerId(RegionInfo info) {
return table2PeerId.get(info.getTable());
}
}

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@ -67,7 +67,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final WALProvider provider;
private final SyncReplicationPeerProvider peerProvider;
private SyncReplicationPeerInfoProvider peerInfoProvider;
private WALFactory factory;
@ -85,9 +85,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final KeyLocker<String> createLock = new KeyLocker<>();
SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) {
SyncReplicationWALProvider(WALProvider provider) {
this.provider = provider;
this.peerProvider = peerProvider;
}
public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) {
this.peerInfoProvider = peerInfoProvider;
}
@Override
@ -99,7 +102,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
this.conf = conf;
this.factory = factory;
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
@ -112,9 +115,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
Path remoteWALDirPath = new Path(remoteWALDir);
FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@ -139,7 +142,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public WAL getWAL(RegionInfo region) throws IOException {
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerProvider.getPeerIdAndRemoteWALDir(region);
peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
return getWAL(pair.getFirst(), pair.getSecond());
@ -220,15 +223,13 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
provider.addWALActionsListener(listener);
}
@Override
public void peerRemoved(String peerId) {
safeClose(peerId2WAL.remove(peerId));
}
@Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
SyncReplicationState to) {
assert to == SyncReplicationState.DOWNGRADE_ACTIVE;
safeClose(peerId2WAL.remove(peerId));
SyncReplicationState to, int stage) {
// TODO: stage 0
if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE &&
stage == 1) {
safeClose(peerId2WAL.remove(peerId));
}
}
}

View File

@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@ -144,18 +144,6 @@ public class WALFactory {
}
}
/**
* instantiate a provider from a config property. requires conf to have already been set (as well
* as anything the provider might need to read).
*/
private WALProvider getProvider(String key, String defaultValue, String providerId)
throws IOException {
WALProvider provider = createProvider(getProviderClass(key, defaultValue));
provider.init(this, conf, providerId);
provider.addWALActionsListener(new MetricsWAL());
return provider;
}
/**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
@ -173,7 +161,13 @@ public class WALFactory {
this.factoryId = factoryId;
// end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) {
provider = new SyncReplicationWALProvider(provider);
}
provider.init(this, conf, null);
provider.addWALActionsListener(new MetricsWAL());
this.provider = provider;
} else {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
@ -182,26 +176,6 @@ public class WALFactory {
}
}
/**
* A temporary constructor for testing synchronous replication.
* <p>
* Remove it once we can integrate the synchronous replication logic in RS.
*/
@VisibleForTesting
WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider)
throws IOException {
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
AbstractFSWALProvider.Reader.class);
this.conf = conf;
this.factoryId = factoryId;
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
this.provider = new SyncReplicationWALProvider(provider, peerProvider);
this.provider.init(this, conf, null);
this.provider.addWALActionsListener(new MetricsWAL());
}
/**
* Shutdown all WALs and clean up any underlying storage.
* Use only when you will not need to replay and edits that have gone to any wals from this
@ -250,8 +224,9 @@ public class WALFactory {
if (provider != null) {
return provider;
}
provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
AbstractFSWALProvider.META_WAL_PROVIDER_ID);
provider = createProvider(getProviderClass(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER));
provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
provider.addWALActionsListener(new MetricsWAL());
if (metaProvider.compareAndSet(null, provider)) {
return provider;
} else {

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -1008,7 +1009,7 @@ public class TestReplicationAdmin {
@Test
public void testTransitSyncReplicationPeerState() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
builder.setReplicateAllUserTables(false);

View File

@ -175,7 +175,10 @@ public abstract class TestReplicationSourceManager {
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
SyncReplicationState.toByteArray(SyncReplicationState.NONE));
ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state",
ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);

View File

@ -27,6 +27,7 @@ import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -51,7 +54,7 @@ public class TestSyncReplicationWALProvider {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -69,19 +72,30 @@ public class TestSyncReplicationWALProvider {
private static WALFactory FACTORY;
private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
if (info.getTable().equals(TABLE)) {
return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
} else {
return Optional.empty();
public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
@Override
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
if (info.getTable().equals(TABLE)) {
return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
} else {
return Optional.empty();
}
}
@Override
public boolean isInState(RegionInfo info, SyncReplicationState state) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState
return false;
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());
UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
}
@ -151,9 +165,9 @@ public class TestSyncReplicationWALProvider {
DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
assertEquals(2, FACTORY.getWALs().size());
testReadWrite(wal);
SyncReplicationWALProvider walProvider =
(SyncReplicationWALProvider) FACTORY.getWALProvider();
walProvider.peerRemoved(PEER_ID);
SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider();
walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE,
SyncReplicationState.DOWNGRADE_ACTIVE, 1);
assertEquals(1, FACTORY.getWALs().size());
}
}