From 72702eeb70a4938f9ea2425a879b67e4959bf250 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 12 Jan 2018 16:18:42 +0800 Subject: [PATCH] HBASE-19783 Change replication peer cluster key/endpoint from a not-null value to null is not allowed Signed-off-by: zhangduo --- .../replication/ReplicationPeersZKImpl.java | 36 ++++++++----- .../replication/TestReplicationAdmin.java | 54 +++++++++++++++++++ 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 289d2aa954e..4e469edf818 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -28,6 +28,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; @@ -345,22 +346,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); } ReplicationPeerConfig existingConfig = peer.getPeerConfig(); - if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && - !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ - throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." - + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" - + newConfig.getClusterKey() + - "'"); + if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) { + throw new ReplicationException( + "Changing the cluster key on an existing peer is not allowed." + " Existing key '" + + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + "'"); } - String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); - if (newConfig.getReplicationEndpointImpl() != null && - !newConfig.getReplicationEndpointImpl().isEmpty() && - !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + if (!isStringEquals(newConfig.getReplicationEndpointImpl(), + existingConfig.getReplicationEndpointImpl())) { throw new ReplicationException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" - + existingConfig.getReplicationEndpointImpl() - + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + "' does not match new class '" + + newConfig.getReplicationEndpointImpl() + "'"); } + // Update existingConfig's peer config and peer data with the new values, but don't touch config // or data that weren't explicitly changed ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig); @@ -546,4 +545,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); } } + + /** + * For replication peer cluster key or endpoint class, null and empty string is same. So here + * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. + */ + private boolean isStringEquals(String s1, String s2) { + if (StringUtils.isBlank(s1)) { + return StringUtils.isBlank(s2); + } + return s1.equals(s2); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index fb29e9e1cb4..aac57dc5484 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -764,4 +766,56 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerClusterKey() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + try { + builder.setClusterKey(KEY_SECOND); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change cluster key on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + } + + @Test + public void testPeerReplicationEndpointImpl() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + try { + builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + try { + builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + } }