HBASE-19078 Add a remote peer cluster wal directory config for synchronous replication

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-13 18:55:28 +08:00 committed by zhangduo
parent b3dea0378e
commit b4a1dbf768
11 changed files with 186 additions and 11 deletions

View File

@ -319,6 +319,9 @@ public final class ReplicationPeerConfigUtil {
excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
}
if (peer.hasRemoteWALDir()) {
builder.setRemoteWALDir(peer.getRemoteWALDir());
}
return builder.build();
}
@ -376,6 +379,9 @@ public final class ReplicationPeerConfigUtil {
}
}
if (peerConfig.getRemoteWALDir() != null) {
builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
}
return builder.build();
}

View File

@ -47,6 +47,8 @@ public class ReplicationPeerConfig {
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
private final boolean serial;
// Used by synchronous replication
private String remoteWALDir;
private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
@ -66,6 +68,7 @@ public class ReplicationPeerConfig {
: null;
this.bandwidth = builder.bandwidth;
this.serial = builder.serial;
this.remoteWALDir = builder.remoteWALDir;
}
private Map<TableName, List<String>>
@ -213,6 +216,10 @@ public class ReplicationPeerConfig {
return this;
}
public String getRemoteWALDir() {
return this.remoteWALDir;
}
public static ReplicationPeerConfigBuilder newBuilder() {
return new ReplicationPeerConfigBuilderImpl();
}
@ -230,7 +237,8 @@ public class ReplicationPeerConfig {
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
.setRemoteWALDir(peerConfig.getRemoteWALDir());
return builder;
}
@ -259,6 +267,8 @@ public class ReplicationPeerConfig {
private boolean serial = false;
private String remoteWALDir = null;
@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
@ -327,6 +337,11 @@ public class ReplicationPeerConfig {
return this;
}
public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) {
this.remoteWALDir = dir;
return this;
}
@Override
public ReplicationPeerConfig build() {
// It would be nice to validate the configuration, but we have to work with "old" data
@ -357,6 +372,9 @@ public class ReplicationPeerConfig {
}
builder.append("bandwidth=").append(bandwidth).append(",");
builder.append("serial=").append(serial);
if (this.remoteWALDir != null) {
builder.append(",remoteWALDir=").append(remoteWALDir);
}
return builder.toString();
}

View File

@ -149,6 +149,13 @@ public interface ReplicationPeerConfigBuilder {
*/
ReplicationPeerConfigBuilder setSerial(boolean serial);
/**
* Set the remote peer cluster's wal directory. Used by synchronous replication.
* @param dir the remote peer cluster's wal directory
* @return {@code this}
*/
ReplicationPeerConfigBuilder setRemoteWALDir(String dir);
/**
* Builds the configuration object from the current state of {@code this}.
* @return A {@link ReplicationPeerConfig} instance.

View File

@ -49,6 +49,7 @@ message ReplicationPeer {
repeated TableCF exclude_table_cfs = 9;
repeated bytes exclude_namespaces = 10;
optional bool serial = 11;
optional string remoteWALDir = 12;
}
/**

View File

@ -149,6 +149,21 @@ public class ReplicationPeerManager {
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
}
if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
throw new DoNotRetryIOException(
"Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
"dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
" does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
}
if (oldPeerConfig.getRemoteWALDir() != null) {
if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
throw new DoNotRetryIOException(
"Changing the replicated namespace/table config on a synchronous replication " +
"peer(peerId: " + peerId + ") is not allowed.");
}
}
return desc;
}

View File

@ -906,4 +906,81 @@ public class TestReplicationAdmin {
// OK
}
}
@Test
public void testPeerRemoteWALDir() throws Exception {
String rootDir = "hdfs://srv1:9999/hbase";
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertNull(rpc.getRemoteWALDir());
try {
builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
}
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
assertEquals(rootDir, rpc.getRemoteWALDir());
try {
builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
}
try {
builder.setRemoteWALDir(null);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
builder.setReplicateAllUserTables(false);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
Set<String> namespaces = new HashSet<>();
namespaces.add("ns1");
builder.setExcludeNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder(rpc);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>());
builder.setExcludeTableCFsMap(tableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail(
"Change replicated namespace/table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
}
}
}

View File

@ -64,16 +64,20 @@ module Hbase
table_cfs = args.fetch(TABLE_CFS, nil)
namespaces = args.fetch(NAMESPACES, nil)
peer_state = args.fetch(STATE, nil)
remote_wal_dir = args.fetch(REMOTE_WAL_DIR, nil)
# Create and populate a ReplicationPeerConfig
builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
.newBuilder()
builder = ReplicationPeerConfig.newBuilder()
builder.set_cluster_key(cluster_key)
unless endpoint_classname.nil?
builder.set_replication_endpoint_impl(endpoint_classname)
end
unless remote_wal_dir.nil?
builder.setRemoteWALDir(remote_wal_dir)
end
unless config.nil?
builder.putAllConfiguration(config)
end
@ -228,8 +232,7 @@ module Hbase
namespaces.each do |n|
ns_set.add(n)
end
builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
.newBuilder(rpc)
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end
@ -248,8 +251,7 @@ module Hbase
ns_set.remove(n)
end
end
builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
.newBuilder(rpc)
builder = ReplicationPeerConfig.newBuilder(rpc)
builder.setNamespaces(ns_set)
@admin.updateReplicationPeerConfig(id, builder.build)
end

View File

@ -77,6 +77,7 @@ module HBaseConstants
VALUE = 'VALUE'.freeze
ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'.freeze
CLUSTER_KEY = 'CLUSTER_KEY'.freeze
REMOTE_WAL_DIR = 'REMOTE_WAL_DIR'.freeze
TABLE_CFS = 'TABLE_CFS'.freeze
NAMESPACES = 'NAMESPACES'.freeze
STATE = 'STATE'.freeze

View File

@ -35,7 +35,7 @@ to the peer cluster.
An optional parameter for table column families identifies which tables and/or column families
will be replicated to the peer cluster.
Notice: Set a namespace in the peer config means that all tables in this namespace
Note: Set a namespace in the peer config means that all tables in this namespace
will be replicated to the peer cluster. So if you already have set a namespace in peer config,
then you can't set this namespace's tables in the peer config again.
@ -74,6 +74,25 @@ the key TABLE_CFS.
Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified. If ENDPOINT_CLASSNAME is specified, CLUSTER_KEY is
optional and should only be specified if a particular custom endpoint requires it.
The default replication peer is asynchronous. You can also add a synchronous replication peer
with REMOTE_WAL_DIR parameter. Meanwhile, synchronous replication peer also support other optional
config for asynchronous replication peer.
Examples:
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase"
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
STATE => "ENABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase"
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
STATE => "DISABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase"
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", NAMESPACES => ["ns1", "ns2"]
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", TABLE_CFS => { "table1" => [] }
Note: The REMOTE_WAL_DIR is not allowed to change.
EOF
end

View File

@ -39,7 +39,8 @@ EOF
peers = replication_admin.list_peers
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH
REMOTE_ROOT_DIR STATE REPLICATE_ALL
NAMESPACES TABLE_CFS BANDWIDTH
SERIAL])
peers.each do |peer|
@ -53,8 +54,20 @@ EOF
namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs_by_config(config)
end
formatter.row([id, config.getClusterKey,
config.getReplicationEndpointImpl, state,
cluster_key = 'nil'
unless config.getClusterKey.nil?
cluster_key = config.getClusterKey
end
endpoint_classname = 'nil'
unless config.getReplicationEndpointImpl.nil?
endpoint_classname = config.getReplicationEndpointImpl
end
remote_root_dir = 'nil'
unless config.getRemoteWALDir.nil?
remote_root_dir = config.getRemoteWALDir
end
formatter.row([id, cluster_key, endpoint_classname,
remote_root_dir, state,
config.replicateAllUserTables, namespaces, tableCFs,
config.getBandwidth, config.isSerial])
end

View File

@ -97,6 +97,22 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "add_peer: remote wal dir" do
cluster_key = "server1.cie.com:2181:/hbase"
remote_wal_dir = "hdfs://srv1:9999/hbase"
args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0)
assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
assert_equal(remote_wal_dir, peer.getPeerConfig.getRemoteWALDir)
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test "add_peer: single zk cluster key with enabled/disabled state" do
cluster_key = "server1.cie.com:2181:/hbase"