HBASE-19783 Change replication peer cluster key/endpoint from a not-null value to null is not allowed

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-12 16:10:41 +08:00 committed by zhangduo
parent 4bd6ac3e10
commit 20ccaef841
2 changed files with 73 additions and 9 deletions

View File

@ -132,20 +132,19 @@ public class ReplicationPeerManager {
checkPeerConfig(peerConfig); checkPeerConfig(peerConfig);
ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerDescription desc = checkPeerExists(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
if (!StringUtils.isBlank(peerConfig.getClusterKey()) && if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
!peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) {
throw new DoNotRetryIOException( throw new DoNotRetryIOException(
"Changing the cluster key on an existing peer is not allowed. Existing key '" + "Changing the cluster key on an existing peer is not allowed. Existing key '" +
oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
peerConfig.getClusterKey() + "'"); peerConfig.getClusterKey() + "'");
} }
if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
!peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { oldPeerConfig.getReplicationEndpointImpl())) {
throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '" + "on an existing peer is not allowed. Existing class '" +
oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
" does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
} }
} }
@ -341,4 +340,15 @@ public class ReplicationPeerManager {
return new ReplicationPeerManager(peerStorage, return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
} }
/**
* For replication peer cluster key or endpoint class, null and empty string is same. So here
* don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
*/
private boolean isStringEquals(String s1, String s2) {
if (StringUtils.isBlank(s1)) {
return StringUtils.isBlank(s2);
}
return s1.equals(s2);
}
} }

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After; import org.junit.After;
@ -846,4 +848,56 @@ public class TestReplicationAdmin {
assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
admin.removePeer(ID_ONE); admin.removePeer(ID_ONE);
} }
@Test
public void testPeerClusterKey() throws Exception {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
try {
builder.setClusterKey(KEY_SECOND);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change cluster key on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
}
@Test
public void testPeerReplicationEndpointImpl() throws Exception {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
try {
builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
try {
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
try {
builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
fail("Change replication endpoint implementation class on an existing peer is not allowed");
} catch (Exception e) {
// OK
}
}
} }