HBASE-17388 Move ReplicationPeer and other replication related PB messages to the replication.proto

This commit is contained in:
Guanghao Zhang 2017-01-04 17:48:09 +08:00
parent 0f6c79eb12
commit e02ae7724d
11 changed files with 4237 additions and 4237 deletions

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@ -62,13 +62,13 @@ public final class ReplicationSerDeHelper {
}
/** convert map to TableCFs Object */
public static ZooKeeperProtos.TableCF[] convert(
public static ReplicationProtos.TableCF[] convert(
Map<TableName, ? extends Collection<String>> tableCfs) {
if (tableCfs == null) {
return null;
}
List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>();
ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
tableCFBuilder.clear();
tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
@ -80,7 +80,7 @@ public final class ReplicationSerDeHelper {
}
tableCFList.add(tableCFBuilder.build());
}
return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
}
public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
@ -95,12 +95,12 @@ public final class ReplicationSerDeHelper {
* This is only for read TableCFs information from TableCF node.
* Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
* */
public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) {
public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
return null;
}
List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>();
ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
String[] tables = tableCFsConfig.split(";");
for (String tab : tables) {
@ -142,17 +142,17 @@ public final class ReplicationSerDeHelper {
}
tableCFList.add(tableCFBuilder.build());
}
return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
}
/**
* Convert TableCFs Object to String.
* Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
* */
public static String convert(ZooKeeperProtos.TableCF[] tableCFs) {
public static String convert(ReplicationProtos.TableCF[] tableCFs) {
StringBuilder sb = new StringBuilder();
for (int i = 0, n = tableCFs.length; i < n; i++) {
ZooKeeperProtos.TableCF tableCF = tableCFs[i];
ReplicationProtos.TableCF tableCF = tableCFs[i];
String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
if (!Strings.isEmpty(namespace)) {
sb.append(namespace).append(".").
@ -175,10 +175,10 @@ public final class ReplicationSerDeHelper {
/**
* Get TableCF in TableCFs, if not exist, return null.
* */
public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs,
public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
String table) {
for (int i = 0, n = tableCFs.length; i < n; i++) {
ZooKeeperProtos.TableCF tableCF = tableCFs[i];
ReplicationProtos.TableCF tableCF = tableCFs[i];
if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
return tableCF;
}
@ -191,7 +191,7 @@ public final class ReplicationSerDeHelper {
* It is used for backward compatibility.
* Old format bytes have no PB_MAGIC Header
* */
public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
if (bytes == null) {
return null;
}
@ -202,20 +202,20 @@ public final class ReplicationSerDeHelper {
* Convert tableCFs string into Map.
* */
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig);
ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
return convert2Map(tableCFs);
}
/**
* Convert tableCFs Object to Map.
* */
public static Map<TableName, List<String>> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) {
public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
if (tableCFs == null || tableCFs.length == 0) {
return null;
}
Map<TableName, List<String>> tableCFsMap = new HashMap<TableName, List<String>>();
for (int i = 0, n = tableCFs.length; i < n; i++) {
ZooKeeperProtos.TableCF tableCF = tableCFs[i];
ReplicationProtos.TableCF tableCF = tableCFs[i];
List<String> families = new ArrayList<>();
for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
families.add(tableCF.getFamilies(j).toStringUtf8());
@ -239,9 +239,9 @@ public final class ReplicationSerDeHelper {
throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder =
ZooKeeperProtos.ReplicationPeer.newBuilder();
ZooKeeperProtos.ReplicationPeer peer;
ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder();
ReplicationProtos.ReplicationPeer peer;
try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
peer = builder.build();
@ -257,7 +257,7 @@ public final class ReplicationSerDeHelper {
}
}
public static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
if (peer.hasClusterkey()) {
peerConfig.setClusterKey(peer.getClusterkey());
@ -275,7 +275,7 @@ public final class ReplicationSerDeHelper {
}
Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap);
}
@ -293,8 +293,8 @@ public final class ReplicationSerDeHelper {
return peerConfig;
}
public static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder();
if (peerConfig.getClusterKey() != null) {
builder.setClusterkey(peerConfig.getClusterKey());
}
@ -316,7 +316,7 @@ public final class ReplicationSerDeHelper {
.build());
}
ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
if (tableCFs != null) {
for (int i = 0; i < tableCFs.length; i++) {
builder.addTableCfs(tableCFs[i]);

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -210,8 +210,8 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
* @throws DeserializationException
*/
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
return ReplicationProtos.ReplicationState.State.ENABLED == state;
}
/**
@ -219,13 +219,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder =
ZooKeeperProtos.ReplicationState.newBuilder();
ZooKeeperProtos.ReplicationState state;
ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState state;
try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
state = builder.build();

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@ -160,13 +160,13 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
@Override
public void enablePeer(String id) throws ReplicationException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
LOG.info("peer " + id + " is enabled");
}
@Override
public void disablePeer(String id) throws ReplicationException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
LOG.info("peer " + id + " is disabled");
}
@ -462,7 +462,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @param id
* @param state
*/
private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
throws ReplicationException {
try {
if (!peerExists(id)) {
@ -471,7 +471,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
String peerStateZNode = getPeerStateNode(id);
byte[] stateBytes =
(state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
(state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES;
if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);

View File

@ -23,13 +23,13 @@ import java.io.IOException;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -66,9 +66,9 @@ public abstract class ReplicationStateZKBase {
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
@ -110,9 +110,9 @@ public abstract class ReplicationStateZKBase {
* use as content of a peer-state znode under a peer cluster id as in
* /hbase/replication/peers/PEER_ID/peer-state.
*/
protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
ZooKeeperProtos.ReplicationState msg =
ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build();
protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
ReplicationProtos.ReplicationState msg =
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
// There is no toByteArray on this pb Message?
// 32 bytes is default which seems fair enough here.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
@ -1860,8 +1860,8 @@ public class ZKUtil {
}
// parse the data of the above peer znode.
try {
ZooKeeperProtos.ReplicationPeer.Builder builder =
ZooKeeperProtos.ReplicationPeer.newBuilder();
ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder();
ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
String clusterKey = builder.getClusterkey();
sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey);
@ -1885,8 +1885,8 @@ public class ZKUtil {
byte[] peerStateData;
try {
peerStateData = ZKUtil.getData(zkw, peerStateZnode);
ZooKeeperProtos.ReplicationState.Builder builder =
ZooKeeperProtos.ReplicationState.newBuilder();
ReplicationProtos.ReplicationState.Builder builder =
ReplicationProtos.ReplicationState.newBuilder();
ProtobufUtil.mergeFrom(builder, peerStateData, pblen, peerStateData.length - pblen);
sb.append(builder.getState().name());
} catch (IOException ipbe) {
@ -2054,7 +2054,7 @@ public class ZKUtil {
* for use as content of an wal position in a replication queue.
*/
public static byte[] positionToByteArray(final long position) {
byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
byte[] bytes = ReplicationProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
.build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
@ -2070,9 +2070,9 @@ public class ZKUtil {
}
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
ReplicationProtos.ReplicationHLogPosition.Builder builder =
ReplicationProtos.ReplicationHLogPosition.newBuilder();
ReplicationProtos.ReplicationHLogPosition position;
try {
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
position = builder.build();

View File

@ -24,7 +24,45 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "ZooKeeper.proto";
import "HBase.proto";
message TableCF {
optional TableName table_name = 1;
repeated bytes families = 2;
}
/**
* Used by replication. Holds a replication peer key.
*/
message ReplicationPeer {
// clusterkey is the concatenation of the slave cluster's
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
required string clusterkey = 1;
optional string replicationEndpointImpl = 2;
repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
optional int64 bandwidth = 7;
}
/**
* Used by replication. Holds whether enabled or disabled
*/
message ReplicationState {
enum State {
ENABLED = 0;
DISABLED = 1;
}
required State state = 1;
}
/**
* Used by replication. Holds the current position in an WAL file.
*/
message ReplicationHLogPosition {
required int64 position = 1;
}
message AddReplicationPeerRequest {
required string peer_id = 1;

View File

@ -105,44 +105,6 @@ message DeprecatedTableState {
required State state = 1 [default = ENABLED];
}
message TableCF {
optional TableName table_name = 1;
repeated bytes families = 2;
}
/**
* Used by replication. Holds a replication peer key.
*/
message ReplicationPeer {
// clusterkey is the concatenation of the slave cluster's
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
required string clusterkey = 1;
optional string replicationEndpointImpl = 2;
repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
optional int64 bandwidth = 7;
}
/**
* Used by replication. Holds whether enabled or disabled
*/
message ReplicationState {
enum State {
ENABLED = 0;
DISABLED = 1;
}
required State state = 1;
}
/**
* Used by replication. Holds the current position in an WAL file.
*/
message ReplicationHLogPosition {
required int64 position = 1;
}
/**
* Metadata associated with a table lock in zookeeper
*/

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -78,7 +78,7 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().size() == 0) {
// we copy TableCFs node into PeerNode
LOG.info("copy tableCFs into peerNode:" + peerId);
ZooKeeperProtos.TableCF[] tableCFs =
ReplicationProtos.TableCF[] tableCFs =
ReplicationSerDeHelper.parseTableCFs(
ZKUtil.getData(this.zookeeper, tableCFsNode));
if (tableCFs != null && tableCFs.length > 0) {

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -271,7 +271,7 @@ public class TestPerTableCFReplication {
@Test
public void testTableCFsHelperConverter() {
ZooKeeperProtos.TableCF[] tableCFs = null;
ReplicationProtos.TableCF[] tableCFs = null;
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null