HBASE-19783 Change replication peer cluster key/endpoint from a not-null value to null is not allowed

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-12 16:18:42 +08:00 committed by zhangduo
parent 3e6f80dcd5
commit 72702eeb70
2 changed files with 77 additions and 13 deletions

View File

@ -28,6 +28,7 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CompoundConfiguration;
@ -345,22 +346,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
} }
ReplicationPeerConfig existingConfig = peer.getPeerConfig(); ReplicationPeerConfig existingConfig = peer.getPeerConfig();
if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) {
!newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ throw new ReplicationException(
throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." "Changing the cluster key on an existing peer is not allowed." + " Existing key '" +
+ " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" existingConfig.getClusterKey() + "' does not match new key '" +
+ newConfig.getClusterKey() + newConfig.getClusterKey() + "'");
"'");
} }
String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); if (!isStringEquals(newConfig.getReplicationEndpointImpl(),
if (newConfig.getReplicationEndpointImpl() != null && existingConfig.getReplicationEndpointImpl())) {
!newConfig.getReplicationEndpointImpl().isEmpty() &&
!newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
throw new ReplicationException("Changing the replication endpoint implementation class " + throw new ReplicationException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '" "on an existing peer is not allowed. Existing class '" +
+ existingConfig.getReplicationEndpointImpl() existingConfig.getReplicationEndpointImpl() + "' does not match new class '" +
+ "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); newConfig.getReplicationEndpointImpl() + "'");
} }
// Update existingConfig's peer config and peer data with the new values, but don't touch config // Update existingConfig's peer config and peer data with the new values, but don't touch config
// or data that weren't explicitly changed // or data that weren't explicitly changed
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig); ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
@ -546,4 +545,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
} }
} }
/**
* For replication peer cluster key or endpoint class, null and empty string is same. So here
* don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
*/
private boolean isStringEquals(String s1, String s2) {
if (StringUtils.isBlank(s1)) {
return StringUtils.isBlank(s2);
}
return s1.equals(s2);
}
} }

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -764,4 +766,56 @@ public class TestReplicationAdmin {
assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
admin.removePeer(ID_ONE); admin.removePeer(ID_ONE);
} }
@Test
public void testPeerClusterKey() throws Exception {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
try {
builder.setClusterKey(KEY_SECOND);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change cluster key on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
}
@Test
public void testPeerReplicationEndpointImpl() throws Exception {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
try {
builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
try {
builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
}
} }