HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer

Signed-off-by: Xu Cang <xucang@apache.org>
This commit is contained in:
zhangduo 2019-02-08 09:56:52 +08:00
parent a155d17fa6
commit 6f16836c20
4 changed files with 91 additions and 15 deletions

View File

@ -329,9 +329,9 @@ public final class ReplicationPeerConfigUtil {
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer.newBuilder();
if (peerConfig.getClusterKey() != null) { // we used to set cluster key as required so here we must always set it, until we can make sure
builder.setClusterkey(peerConfig.getClusterKey()); // that no one uses the old proto file.
} builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
if (peerConfig.getReplicationEndpointImpl() != null) { if (peerConfig.getReplicationEndpointImpl() != null) {
builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
} }

View File

@ -38,7 +38,7 @@ message TableCF {
message ReplicationPeer { message ReplicationPeer {
// clusterkey is the concatenation of the slave cluster's // clusterkey is the concatenation of the slave cluster's
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
required string clusterkey = 1; optional string clusterkey = 1;
optional string replicationEndpointImpl = 2; optional string replicationEndpointImpl = 2;
repeated BytesBytesPair data = 3; repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4; repeated NameStringPair configuration = 4;

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -334,7 +336,27 @@ public class ReplicationPeerManager {
} }
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
checkClusterKey(peerConfig.getClusterKey()); String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
boolean checkClusterKey = true;
if (!StringUtils.isBlank(replicationEndpointImpl)) {
// try creating a instance
ReplicationEndpoint endpoint;
try {
endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new DoNotRetryIOException(
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
e);
}
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
checkClusterKey = false;
}
}
if (checkClusterKey) {
checkClusterKey(peerConfig.getClusterKey());
}
if (peerConfig.replicateAllUserTables()) { if (peerConfig.replicateAllUserTables()) {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster. // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -33,6 +36,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -42,6 +47,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.After; import org.junit.After;
@ -56,12 +63,12 @@ import org.junit.runners.Parameterized;
* Class to test asynchronous replication admin operations. * Class to test asynchronous replication admin operations.
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class}) @Category({ LargeTests.class, ClientTests.class })
public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
private final String ID_ONE = "1"; private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase"; private final String KEY_ONE = "127.0.0.1:2181:/hbase";
@ -89,7 +96,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
} catch (Exception e) { } catch (Exception e) {
} }
ReplicationQueueStorage queueStorage = ReplicationStorageFactory ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
for (ServerName serverName : queueStorage.getListOfReplicators()) { for (ServerName serverName : queueStorage.getListOfReplicators()) {
for (String queue : queueStorage.getAllQueues(serverName)) { for (String queue : queueStorage.getAllQueues(serverName)) {
queueStorage.removeQueue(serverName, queue); queueStorage.removeQueue(serverName, queue);
@ -186,8 +193,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
// append table t1 to replication // append table t1 to replication
tableCFs.put(tableName1, null); tableCFs.put(tableName1, null);
admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() Map<TableName, List<String>> result =
.getTableCFsMap(); admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
assertEquals(1, result.size()); assertEquals(1, result.size());
assertEquals(true, result.containsKey(tableName1)); assertEquals(true, result.containsKey(tableName1));
assertNull(result.get(tableName1)); assertNull(result.get(tableName1));
@ -301,12 +308,13 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
tableCFs.clear(); tableCFs.clear();
tableCFs.put(tableName3, null); tableCFs.put(tableName3, null);
admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); fail("Test case should fail as removing table-cfs from a peer whose" +
" table-cfs didn't contain t3");
} catch (CompletionException e) { } catch (CompletionException e) {
assertTrue(e.getCause() instanceof ReplicationException); assertTrue(e.getCause() instanceof ReplicationException);
} }
Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get() Map<TableName, List<String>> result =
.getTableCFsMap(); admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
assertEquals(2, result.size()); assertEquals(2, result.size());
assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t1", result.containsKey(tableName1));
assertTrue("Should contain t2", result.containsKey(tableName2)); assertTrue("Should contain t2", result.containsKey(tableName2));
@ -414,7 +422,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
rpc.setTableCFsMap(tableCfs); rpc.setTableCFsMap(tableCfs);
try { try {
admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); fail(
"Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
} catch (CompletionException e) { } catch (CompletionException e) {
// OK // OK
} }
@ -430,7 +439,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
rpc.setNamespaces(namespaces); rpc.setNamespaces(namespaces);
try { try {
admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); fail(
"Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
} catch (CompletionException e) { } catch (CompletionException e) {
// OK // OK
} }
@ -453,4 +463,48 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
admin.removeReplicationPeer(ID_ONE).join(); admin.removeReplicationPeer(ID_ONE).join();
} }
@Test
public void testInvalidClusterKey() throws InterruptedException {
try {
admin.addReplicationPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
}
}
@Test
public void testInvalidReplicationEndpoint() throws InterruptedException {
try {
admin.addReplicationPeer(ID_ONE,
ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
}
}
@Test
public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
// make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
admin
.addReplicationPeer(ID_ONE,
ReplicationPeerConfig.newBuilder()
.setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
.get();
// but we still need to check cluster key if we specify the default ReplicationEndpoint
try {
admin
.addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
.setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
.get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
}
}
} }