HBASE-19630 Add peer cluster key check when add new replication peer

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2017-12-26 21:10:00 +08:00 committed by zhangduo
parent eae251d203
commit 712b5a80dc
2 changed files with 54 additions and 22 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -216,36 +218,36 @@ public final class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
} }
/** private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster. checkClusterKey(peerConfig.getClusterKey());
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
* cluster.
* <p>
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
*/
private static void checkPeerConfig(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
if (peerConfig.replicateAllUserTables()) { if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
(peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + // cluster.
"when you want replicate all cluster"); if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
+ "when you want replicate all cluster");
} }
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap()); peerConfig.getExcludeTableCFsMap());
} else { } else {
if ((peerConfig.getExcludeNamespaces() != null && // If replicate_all flag is false, it means all user tables can't be replicated to peer
!peerConfig.getExcludeNamespaces().isEmpty()) || // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
(peerConfig.getExcludeTableCFsMap() != null && // cluster.
!peerConfig.getExcludeTableCFsMap().isEmpty())) { if ((peerConfig.getExcludeNamespaces() != null
&& !peerConfig.getExcludeNamespaces().isEmpty())
|| (peerConfig.getExcludeTableCFsMap() != null
&& !peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException( throw new DoNotRetryIOException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly" + "Need clean exclude-namespaces or exclude-table-cfs config firstly"
" when replicate_all flag is false"); + " when replicate_all flag is false");
} }
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap()); peerConfig.getTableCFsMap());
} }
checkConfiguredWALEntryFilters(peerConfig); checkConfiguredWALEntryFilters(peerConfig);
} }
@ -268,7 +270,7 @@ public final class ReplicationPeerManager {
* exclude namespace.</li> * exclude namespace.</li>
* </ol> * </ol>
*/ */
private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
if (namespaces == null || namespaces.isEmpty()) { if (namespaces == null || namespaces.isEmpty()) {
return; return;
@ -285,7 +287,7 @@ public final class ReplicationPeerManager {
} }
} }
private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException { throws DoNotRetryIOException {
String filterCSV = peerConfig.getConfiguration() String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
@ -302,6 +304,14 @@ public final class ReplicationPeerManager {
} }
} }
private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
try {
ZKConfig.validateClusterKey(clusterKey);
} catch (IOException e) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
}
}
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException { throws ReplicationException {
ReplicationPeerStorage peerStorage = ReplicationPeerStorage peerStorage =

View File

@ -119,6 +119,28 @@ public class TestReplicationAdmin {
} }
} }
@Test
public void testAddInvalidPeer() {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
try {
String invalidPeerId = "1-2";
hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
} catch (Exception e) {
// OK
}
try {
String invalidClusterKey = "2181:/hbase";
builder.setClusterKey(invalidClusterKey);
hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
} catch (Exception e) {
// OK
}
}
/** /**
* Simple testing of adding and removing peers, basically shows that * Simple testing of adding and removing peers, basically shows that
* all interactions with ZK work * all interactions with ZK work