HBASE-19630 Add peer cluster key check when add new replication peer
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
65eb91f84e
commit
1025388da6
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -216,36 +218,36 @@ public final class ReplicationPeerManager {
|
|||
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
|
||||
* cluster.
|
||||
* <p>
|
||||
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
|
||||
*/
|
||||
private static void checkPeerConfig(ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException {
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
||||
checkClusterKey(peerConfig.getClusterKey());
|
||||
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
|
||||
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
|
||||
"when you want replicate all cluster");
|
||||
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
// Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
|
||||
// cluster.
|
||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
||||
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
|
||||
+ "when you want replicate all cluster");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
|
||||
peerConfig.getExcludeTableCFsMap());
|
||||
} else {
|
||||
if ((peerConfig.getExcludeNamespaces() != null &&
|
||||
!peerConfig.getExcludeNamespaces().isEmpty()) ||
|
||||
(peerConfig.getExcludeTableCFsMap() != null &&
|
||||
!peerConfig.getExcludeTableCFsMap().isEmpty())) {
|
||||
// If replicate_all flag is false, it means all user tables can't be replicated to peer
|
||||
// cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
|
||||
// cluster.
|
||||
if ((peerConfig.getExcludeNamespaces() != null
|
||||
&& !peerConfig.getExcludeNamespaces().isEmpty())
|
||||
|| (peerConfig.getExcludeTableCFsMap() != null
|
||||
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Need clean exclude-namespaces or exclude-table-cfs config firstly" +
|
||||
" when replicate_all flag is false");
|
||||
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
|
||||
+ " when replicate_all flag is false");
|
||||
}
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||
peerConfig.getTableCFsMap());
|
||||
}
|
||||
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
}
|
||||
|
||||
|
@ -268,7 +270,7 @@ public final class ReplicationPeerManager {
|
|||
* exclude namespace.</li>
|
||||
* </ol>
|
||||
*/
|
||||
private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
|
||||
if (namespaces == null || namespaces.isEmpty()) {
|
||||
return;
|
||||
|
@ -285,7 +287,7 @@ public final class ReplicationPeerManager {
|
|||
}
|
||||
}
|
||||
|
||||
private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
|
||||
throws DoNotRetryIOException {
|
||||
String filterCSV = peerConfig.getConfiguration()
|
||||
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
|
||||
|
@ -302,6 +304,14 @@ public final class ReplicationPeerManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
|
||||
try {
|
||||
ZKConfig.validateClusterKey(clusterKey);
|
||||
} catch (IOException e) {
|
||||
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
|
||||
throws ReplicationException {
|
||||
ReplicationPeerStorage peerStorage =
|
||||
|
|
|
@ -112,6 +112,28 @@ public class TestReplicationAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddInvalidPeer() {
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
|
||||
builder.setClusterKey(KEY_ONE);
|
||||
try {
|
||||
String invalidPeerId = "1-2";
|
||||
hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
|
||||
fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
|
||||
} catch (Exception e) {
|
||||
// OK
|
||||
}
|
||||
|
||||
try {
|
||||
String invalidClusterKey = "2181:/hbase";
|
||||
builder.setClusterKey(invalidClusterKey);
|
||||
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
|
||||
fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
|
||||
} catch (Exception e) {
|
||||
// OK
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple testing of adding and removing peers, basically shows that
|
||||
* all interactions with ZK work
|
||||
|
|
Loading…
Reference in New Issue