diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index b1c17136dc1..474ded3a372 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -319,6 +319,9 @@ public final class ReplicationPeerConfigUtil { excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } + if (peer.hasRemoteWALDir()) { + builder.setRemoteWALDir(peer.getRemoteWALDir()); + } return builder.build(); } @@ -376,6 +379,9 @@ public final class ReplicationPeerConfigUtil { } } + if (peerConfig.getRemoteWALDir() != null) { + builder.setRemoteWALDir(peerConfig.getRemoteWALDir()); + } return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index e0d9a4c86b7..97abc74d1ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -47,6 +47,8 @@ public class ReplicationPeerConfig { private Set excludeNamespaces = null; private long bandwidth = 0; private final boolean serial; + // Used by synchronous replication + private String remoteWALDir; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -66,6 +68,7 @@ public class ReplicationPeerConfig { : null; this.bandwidth = builder.bandwidth; this.serial = builder.serial; + this.remoteWALDir = builder.remoteWALDir; } private Map> @@ -213,6 +216,10 @@ public class ReplicationPeerConfig { return this; } + public String getRemoteWALDir() { + return this.remoteWALDir; + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } @@ -230,7 +237,8 @@ public class ReplicationPeerConfig { .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()); + .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial()) + .setRemoteWALDir(peerConfig.getRemoteWALDir()); return builder; } @@ -259,6 +267,8 @@ public class ReplicationPeerConfig { private boolean serial = false; + private String remoteWALDir = null; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -327,6 +337,11 @@ public class ReplicationPeerConfig { return this; } + public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) { + this.remoteWALDir = dir; + return this; + } + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data @@ -357,6 +372,9 @@ public class ReplicationPeerConfig { } builder.append("bandwidth=").append(bandwidth).append(","); builder.append("serial=").append(serial); + if (this.remoteWALDir != null) { + builder.append(",remoteWALDir=").append(remoteWALDir); + } return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 4c531c5ada9..58ff220e563 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -149,6 +149,13 @@ public interface ReplicationPeerConfigBuilder { */ ReplicationPeerConfigBuilder setSerial(boolean serial); + /** + * Set the remote peer cluster's wal directory. Used by synchronous replication. + * @param dir the remote peer cluster's wal directory + * @return {@code this} + */ + ReplicationPeerConfigBuilder setRemoteWALDir(String dir); + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 557b87cf74e..20dd0495bbc 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -49,6 +49,7 @@ message ReplicationPeer { repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; optional bool serial = 11; + optional string remoteWALDir = 12; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 87d0111b7c9..05ecd61f5ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -149,6 +149,21 @@ public class ReplicationPeerManager { oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } + + if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { + throw new DoNotRetryIOException( + "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " + + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId + + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); + } + + if (oldPeerConfig.getRemoteWALDir() != null) { + if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { + throw new DoNotRetryIOException( + "Changing the replicated namespace/table config on a synchronous replication " + + "peer(peerId: " + peerId + ") is not allowed."); + } + } return desc; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 685c5605dee..e47110032f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -906,4 +906,81 @@ public class TestReplicationAdmin { // OK } } + + @Test + public void testPeerRemoteWALDir() throws Exception { + String rootDir = "hdfs://srv1:9999/hbase"; + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertNull(rpc.getRemoteWALDir()); + + try { + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteWALDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteWALDir()); + + try { + builder.setRemoteWALDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder.setRemoteWALDir(null); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote wal dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + builder.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Set namespaces = new HashSet<>(); + namespaces.add("ns1"); + builder.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Map> tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); + builder.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 5b875957c1f..d1f1344d8a3 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -64,16 +64,20 @@ module Hbase table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) peer_state = args.fetch(STATE, nil) + remote_wal_dir = args.fetch(REMOTE_WAL_DIR, nil) # Create and populate a ReplicationPeerConfig - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder() + builder = ReplicationPeerConfig.newBuilder() builder.set_cluster_key(cluster_key) unless endpoint_classname.nil? builder.set_replication_endpoint_impl(endpoint_classname) end + unless remote_wal_dir.nil? + builder.setRemoteWALDir(remote_wal_dir) + end + unless config.nil? builder.putAllConfiguration(config) end @@ -228,8 +232,7 @@ module Hbase namespaces.each do |n| ns_set.add(n) end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end @@ -248,8 +251,7 @@ module Hbase ns_set.remove(n) end end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index 28484cbc57e..2870dfb027d 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -77,6 +77,7 @@ module HBaseConstants VALUE = 'VALUE'.freeze ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'.freeze CLUSTER_KEY = 'CLUSTER_KEY'.freeze + REMOTE_WAL_DIR = 'REMOTE_WAL_DIR'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze STATE = 'STATE'.freeze diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index eb2da830277..4b6f294767a 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -35,7 +35,7 @@ to the peer cluster. An optional parameter for table column families identifies which tables and/or column families will be replicated to the peer cluster. -Notice: Set a namespace in the peer config means that all tables in this namespace +Note: Set a namespace in the peer config means that all tables in this namespace will be replicated to the peer cluster. So if you already have set a namespace in peer config, then you can't set this namespace's tables in the peer config again. @@ -74,6 +74,25 @@ the key TABLE_CFS. Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified. If ENDPOINT_CLASSNAME is specified, CLUSTER_KEY is optional and should only be specified if a particular custom endpoint requires it. +The default replication peer is asynchronous. You can also add a synchronous replication peer +with REMOTE_WAL_DIR parameter. Meanwhile, synchronous replication peer also support other optional +config for asynchronous replication peer. + +Examples: + + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "ENABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "DISABLED", REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", NAMESPACES => ["ns1", "ns2"] + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_WAL_DIR => "hdfs://srv1:9999/hbase", TABLE_CFS => { "table1" => [] } + +Note: The REMOTE_WAL_DIR is not allowed to change. + EOF end diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index eefcc426a39..f3ab7496a56 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -39,7 +39,8 @@ EOF peers = replication_admin.list_peers formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME - STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH + REMOTE_ROOT_DIR STATE REPLICATE_ALL + NAMESPACES TABLE_CFS BANDWIDTH SERIAL]) peers.each do |peer| @@ -53,8 +54,20 @@ EOF namespaces = replication_admin.show_peer_namespaces(config) tableCFs = replication_admin.show_peer_tableCFs_by_config(config) end - formatter.row([id, config.getClusterKey, - config.getReplicationEndpointImpl, state, + cluster_key = 'nil' + unless config.getClusterKey.nil? + cluster_key = config.getClusterKey + end + endpoint_classname = 'nil' + unless config.getReplicationEndpointImpl.nil? + endpoint_classname = config.getReplicationEndpointImpl + end + remote_root_dir = 'nil' + unless config.getRemoteWALDir.nil? + remote_root_dir = config.getRemoteWALDir + end + formatter.row([id, cluster_key, endpoint_classname, + remote_root_dir, state, config.replicateAllUserTables, namespaces, tableCFs, config.getBandwidth, config.isSerial]) end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 29de710b588..5d04fbba878 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -97,6 +97,22 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "add_peer: remote wal dir" do + cluster_key = "server1.cie.com:2181:/hbase" + remote_wal_dir = "hdfs://srv1:9999/hbase" + args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(remote_wal_dir, peer.getPeerConfig.getRemoteWALDir) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase"