HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer
Signed-off-by: Xu Cang <xucang@apache.org>
This commit is contained in:
parent
a984b44798
commit
16f27d7699
|
@ -325,9 +325,9 @@ public final class ReplicationPeerConfigUtil {
|
|||
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
|
||||
ReplicationProtos.ReplicationPeer.Builder builder =
|
||||
ReplicationProtos.ReplicationPeer.newBuilder();
|
||||
if (peerConfig.getClusterKey() != null) {
|
||||
builder.setClusterkey(peerConfig.getClusterKey());
|
||||
}
|
||||
// we used to set cluster key as required so here we must always set it, until we can make sure
|
||||
// that no one uses the old proto file.
|
||||
builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
|
||||
if (peerConfig.getReplicationEndpointImpl() != null) {
|
||||
builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ message TableCF {
|
|||
message ReplicationPeer {
|
||||
// clusterkey is the concatenation of the slave cluster's
|
||||
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
|
||||
required string clusterkey = 1;
|
||||
optional string clusterkey = 1;
|
||||
optional string replicationEndpointImpl = 2;
|
||||
repeated BytesBytesPair data = 3;
|
||||
repeated NameStringPair configuration = 4;
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -242,7 +244,27 @@ public class ReplicationPeerManager {
|
|||
}
|
||||
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
||||
checkClusterKey(peerConfig.getClusterKey());
|
||||
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
|
||||
boolean checkClusterKey = true;
|
||||
if (!StringUtils.isBlank(replicationEndpointImpl)) {
|
||||
// try creating a instance
|
||||
ReplicationEndpoint endpoint;
|
||||
try {
|
||||
endpoint = Class.forName(replicationEndpointImpl)
|
||||
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
|
||||
e);
|
||||
}
|
||||
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
|
||||
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
|
||||
checkClusterKey = false;
|
||||
}
|
||||
}
|
||||
if (checkClusterKey) {
|
||||
checkClusterKey(peerConfig.getClusterKey());
|
||||
}
|
||||
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
|
|
|
@ -18,10 +18,13 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -32,16 +35,25 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.After;
|
||||
|
@ -56,12 +68,12 @@ import org.junit.runners.Parameterized;
|
|||
* Class to test asynchronous replication admin operations.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({LargeTests.class, ClientTests.class})
|
||||
@Category({ LargeTests.class, ClientTests.class })
|
||||
public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
|
||||
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
|
||||
|
||||
private final String ID_ONE = "1";
|
||||
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
|
||||
|
@ -89,7 +101,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
} catch (Exception e) {
|
||||
}
|
||||
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
|
||||
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
|
||||
for (ServerName serverName : queueStorage.getListOfReplicators()) {
|
||||
for (String queue : queueStorage.getAllQueues(serverName)) {
|
||||
queueStorage.removeQueue(serverName, queue);
|
||||
|
@ -186,8 +198,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
// append table t1 to replication
|
||||
tableCFs.put(tableName1, null);
|
||||
admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
|
||||
Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
|
||||
.getTableCFsMap();
|
||||
Map<TableName, List<String>> result =
|
||||
admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(true, result.containsKey(tableName1));
|
||||
assertNull(result.get(tableName1));
|
||||
|
@ -301,12 +313,13 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
tableCFs.clear();
|
||||
tableCFs.put(tableName3, null);
|
||||
admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
|
||||
fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3");
|
||||
fail("Test case should fail as removing table-cfs from a peer whose" +
|
||||
" table-cfs didn't contain t3");
|
||||
} catch (CompletionException e) {
|
||||
assertTrue(e.getCause() instanceof ReplicationException);
|
||||
}
|
||||
Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
|
||||
.getTableCFsMap();
|
||||
Map<TableName, List<String>> result =
|
||||
admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
|
||||
assertEquals(2, result.size());
|
||||
assertTrue("Should contain t1", result.containsKey(tableName1));
|
||||
assertTrue("Should contain t2", result.containsKey(tableName2));
|
||||
|
@ -414,7 +427,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
rpc.setTableCFsMap(tableCfs);
|
||||
try {
|
||||
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
|
||||
fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
|
||||
fail(
|
||||
"Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
|
||||
} catch (CompletionException e) {
|
||||
// OK
|
||||
}
|
||||
|
@ -430,7 +444,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
rpc.setNamespaces(namespaces);
|
||||
try {
|
||||
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
|
||||
fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
|
||||
fail(
|
||||
"Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
|
||||
} catch (CompletionException e) {
|
||||
// OK
|
||||
}
|
||||
|
@ -453,4 +468,78 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
|
||||
admin.removeReplicationPeer(ID_ONE).join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClusterKey() throws InterruptedException {
|
||||
try {
|
||||
admin.addReplicationPeer(ID_ONE,
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
|
||||
fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidReplicationEndpoint() throws InterruptedException {
|
||||
try {
|
||||
admin.addReplicationPeer(ID_ONE,
|
||||
ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
|
||||
fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
|
||||
assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
|
||||
}
|
||||
}
|
||||
|
||||
public static final class DummyReplicationEndpoint extends BaseReplicationEndpoint {
|
||||
|
||||
@Override
|
||||
public UUID getPeerUUID() {
|
||||
return ctx.getClusterId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replicate(ReplicateContext replicateContext) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
stopAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
notifyStarted();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
notifyStopped();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
|
||||
// make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
|
||||
admin.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder()
|
||||
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build()).get();
|
||||
|
||||
// but we still need to check cluster key if we specify the default ReplicationEndpoint
|
||||
try {
|
||||
admin
|
||||
.addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
|
||||
.setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
|
||||
.get();
|
||||
fail();
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue