HBASE-19864 Use protobuf instead of enum.ordinal to store SyncReplicationState

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-26 16:50:48 +08:00 committed by zhangduo
parent d8842dc3d4
commit 1481bd9481
8 changed files with 67 additions and 36 deletions

View File

@ -403,7 +403,7 @@ public final class ReplicationPeerConfigUtil {
ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
ReplicationPeerConfig config = convert(desc.getConfig());
return new ReplicationPeerDescription(desc.getId(), enabled, config,
SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber()));
toSyncReplicationState(desc.getSyncReplicationState()));
}
public static ReplicationProtos.ReplicationPeerDescription
@ -411,17 +411,33 @@ public final class ReplicationPeerConfigUtil {
ReplicationProtos.ReplicationPeerDescription.Builder builder =
ReplicationProtos.ReplicationPeerDescription.newBuilder();
builder.setId(desc.getPeerId());
ReplicationProtos.ReplicationState.Builder stateBuilder =
ReplicationProtos.ReplicationState.newBuilder();
stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED :
ReplicationProtos.ReplicationState.State.DISABLED);
builder.setState(stateBuilder.build());
builder.setConfig(convert(desc.getPeerConfig()));
builder.setSyncReplicationState(
ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal()));
builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState()));
return builder.build();
}
public static ReplicationProtos.SyncReplicationState
toSyncReplicationState(SyncReplicationState state) {
ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder =
ReplicationProtos.SyncReplicationState.newBuilder();
syncReplicationStateBuilder
.setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal()));
return syncReplicationStateBuilder.build();
}
public static SyncReplicationState
toSyncReplicationState(ReplicationProtos.SyncReplicationState state) {
return SyncReplicationState.valueOf(state.getState().getNumber());
}
public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);

View File

@ -17,8 +17,15 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* Used by synchronous replication. Indicate the state of the current cluster in a synchronous
* replication peer. The state may be one of {@link SyncReplicationState#ACTIVE},
@ -45,4 +52,14 @@ public enum SyncReplicationState {
throw new IllegalArgumentException("Unknown synchronous replication state " + value);
}
}
public static byte[] toByteArray(SyncReplicationState state) {
return ProtobufUtil
.prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
}
public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException {
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
.parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
}
}

View File

@ -1871,10 +1871,9 @@ public final class RequestConverter {
}
public static TransitReplicationPeerSyncReplicationStateRequest
buildTransitReplicationPeerSyncReplicationStateRequest(String peerId,
SyncReplicationState state) {
buildTransitReplicationPeerSyncReplicationStateRequest(String peerId,
SyncReplicationState state) {
return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId)
.setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
.build();
.setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build();
}
}

View File

@ -66,11 +66,14 @@ message ReplicationState {
/**
* Indicate the state of the current cluster in a synchronous replication peer.
*/
enum SyncReplicationState {
NONE = 0;
ACTIVE = 1;
DOWNGRADE_ACTIVE = 2;
STANDBY = 3;
message SyncReplicationState {
enum State {
NONE = 0;
ACTIVE = 1;
DOWNGRADE_ACTIVE = 2;
STANDBY = 3;
}
required State state = 1;
}
/**

View File

@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -97,7 +97,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
Bytes.toBytes(syncReplicationState.ordinal())));
SyncReplicationState.toByteArray(syncReplicationState)));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper, multiOps, false);
@ -181,29 +181,27 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
}
@Override
public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
public void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal());
try {
ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes);
ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId),
SyncReplicationState.toByteArray(state));
} catch (KeeperException e) {
throw new ReplicationException(
"Unable to change the cluster state for the synchronous replication peer with id=" +
peerId,
e);
"Unable to change the cluster state for the synchronous replication peer with id=" + peerId,
e);
}
}
@Override
public SyncReplicationState getPeerSyncReplicationState(String peerId)
throws ReplicationException {
byte[] data;
try {
data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
} catch (KeeperException | InterruptedException e) {
byte[] data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
"Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
"Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
}
return SyncReplicationState.valueOf(Bytes.toInt(data));
}
}

View File

@ -85,7 +85,6 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.AccessController;
@ -1952,13 +1951,13 @@ public class MasterRpcServices extends RSRpcServices
@Override
public TransitReplicationPeerSyncReplicationStateResponse
transitReplicationPeerSyncReplicationState(RpcController controller,
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
transitReplicationPeerSyncReplicationState(RpcController controller,
TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
try {
long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(),
SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber()));
ReplicationPeerConfigUtil.toSyncReplicationState(request.getSyncReplicationState()));
return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId)
.build();
.build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* The procedure for transit current cluster state for a synchronous replication peer.
@ -89,16 +89,15 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
.setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
.build());
.setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
TransitPeerSyncReplicationStateStateData data =
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber());
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState());
}
@Override

View File

@ -175,7 +175,7 @@ public abstract class TestReplicationSourceManager {
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
Bytes.toBytes(SyncReplicationState.NONE.ordinal()));
SyncReplicationState.toByteArray(SyncReplicationState.NONE));
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);