diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a1e7d6cc3e5..80b45144c86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -325,9 +325,9 @@ public final class ReplicationPeerConfigUtil { public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); - if (peerConfig.getClusterKey() != null) { - builder.setClusterkey(peerConfig.getClusterKey()); - } + // we used to set cluster key as required so here we must always set it, until we can make sure + // that no one uses the old proto file. + builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); if (peerConfig.getReplicationEndpointImpl() != null) { builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 557b87cf74e..1d483ce3345 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -38,7 +38,7 @@ message TableCF { message ReplicationPeer { // clusterkey is the concatenation of the slave cluster's // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent - required string clusterkey = 1; + optional string clusterkey = 1; optional string replicationEndpointImpl = 2; repeated BytesBytesPair data = 3; repeated NameStringPair configuration = 4; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 87d0111b7c9..af00aaed9cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -242,7 +244,27 @@ public class ReplicationPeerManager { } private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { - checkClusterKey(peerConfig.getClusterKey()); + String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); + boolean checkClusterKey = true; + if (!StringUtils.isBlank(replicationEndpointImpl)) { + // try creating a instance + ReplicationEndpoint endpoint; + try { + endpoint = Class.forName(replicationEndpointImpl) + .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException( + "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, + e); + } + // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint + if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { + checkClusterKey = false; + } + } + if (checkClusterKey) { + checkClusterKey(peerConfig.getClusterKey()); + } if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index b5a50c0b4dc..e33fd21fef7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -32,16 +35,25 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.After; @@ -56,12 +68,12 @@ import org.junit.runners.Parameterized; * Class to test asynchronous replication admin operations. */ @RunWith(Parameterized.class) -@Category({LargeTests.class, ClientTests.class}) +@Category({ LargeTests.class, ClientTests.class }) public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); + HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); private final String ID_ONE = "1"; private final String KEY_ONE = "127.0.0.1:2181:/hbase"; @@ -89,7 +101,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { } catch (Exception e) { } ReplicationQueueStorage queueStorage = ReplicationStorageFactory - .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); for (ServerName serverName : queueStorage.getListOfReplicators()) { for (String queue : queueStorage.getAllQueues(serverName)) { queueStorage.removeQueue(serverName, queue); @@ -186,8 +198,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { // append table t1 to replication tableCFs.put(tableName1, null); admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); - Map> result = admin.getReplicationPeerConfig(ID_ONE).get() - .getTableCFsMap(); + Map> result = + admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); assertEquals(1, result.size()); assertEquals(true, result.containsKey(tableName1)); assertNull(result.get(tableName1)); @@ -301,12 +313,13 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { tableCFs.clear(); tableCFs.put(tableName3, null); admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); - fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); + fail("Test case should fail as removing table-cfs from a peer whose" + + " table-cfs didn't contain t3"); } catch (CompletionException e) { assertTrue(e.getCause() instanceof ReplicationException); } - Map> result = admin.getReplicationPeerConfig(ID_ONE).get() - .getTableCFsMap(); + Map> result = + admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); assertEquals(2, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -414,7 +427,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { rpc.setTableCFsMap(tableCfs); try { admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); - fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); + fail( + "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); } catch (CompletionException e) { // OK } @@ -430,7 +444,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { rpc.setNamespaces(namespaces); try { admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); - fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); + fail( + "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); } catch (CompletionException e) { // OK } @@ -453,4 +468,78 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { admin.removeReplicationPeer(ID_ONE).join(); } + + @Test + public void testInvalidClusterKey() throws InterruptedException { + try { + admin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + } + } + + @Test + public void testInvalidReplicationEndpoint() throws InterruptedException { + try { + admin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); + } + } + + public static final class DummyReplicationEndpoint extends BaseReplicationEndpoint { + + @Override + public UUID getPeerUUID() { + return ctx.getClusterId(); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @Test + public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { + // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint + admin.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build()).get(); + + // but we still need to check cluster key if we specify the default ReplicationEndpoint + try { + admin + .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) + .get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + } + } }