HBASE-23345 Table need to replication unless all of cfs are excluded
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
0770b0768f
commit
997684f24d
|
@ -366,22 +366,31 @@ public class ReplicationPeerConfig {
|
|||
* @return true if the table need replicate to the peer cluster
|
||||
*/
|
||||
public boolean needToReplicate(TableName table) {
|
||||
String namespace = table.getNamespaceAsString();
|
||||
if (replicateAllUserTables) {
|
||||
if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
|
||||
// replicate all user tables, but filter by exclude namespaces and table-cfs config
|
||||
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
|
||||
return false;
|
||||
}
|
||||
if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
|
||||
return false;
|
||||
// trap here, must check existence first since HashMap allows null value.
|
||||
if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
Collection<String> cfs = excludeTableCFsMap.get(table);
|
||||
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
|
||||
// otherwise, we may still need to replicate the table but filter out some families.
|
||||
return cfs != null && !cfs.isEmpty();
|
||||
} else {
|
||||
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
|
||||
// Not replicate all user tables, so filter by namespaces and table-cfs config
|
||||
if (namespaces == null && tableCFsMap == null) {
|
||||
return false;
|
||||
}
|
||||
// First filter by namespaces config
|
||||
// If table's namespace in peer config, all the tables data are applicable for replication
|
||||
if (namespaces != null && namespaces.contains(namespace)) {
|
||||
return true;
|
||||
}
|
||||
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return tableCFsMap != null && tableCFsMap.containsKey(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,10 +17,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.BuilderStyleTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -32,6 +39,9 @@ public class TestReplicationPeerConfig {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
|
||||
|
||||
private static TableName TABLE_A = TableName.valueOf("replication", "testA");
|
||||
private static TableName TABLE_B = TableName.valueOf("replication", "testB");
|
||||
|
||||
@Test
|
||||
public void testClassMethodsAreBuilderStyle() {
|
||||
/* ReplicationPeerConfig should have a builder style setup where setXXX/addXXX methods
|
||||
|
@ -48,4 +58,196 @@ public class TestReplicationPeerConfig {
|
|||
|
||||
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedToReplicateWithReplicatingAll() {
|
||||
ReplicationPeerConfig peerConfig;
|
||||
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
|
||||
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
|
||||
// 1. replication_all flag is true, no namespaces and table-cfs config
|
||||
builder.setReplicateAllUserTables(true);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 2. replicate_all flag is true, and config in excludedTableCfs
|
||||
builder.setExcludeNamespaces(null);
|
||||
// empty map
|
||||
tableCfs = new HashMap<>();
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// table testB
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// table testA
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 3. replicate_all flag is true, and config in excludeNamespaces
|
||||
builder.setExcludeTableCFsMap(null);
|
||||
// empty set
|
||||
namespaces = new HashSet<>();
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// namespace default
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("default");
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// namespace replication
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("replication");
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
|
||||
// Namespaces config doesn't conflict with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// Namespaces config conflicts with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedToReplicateWithoutReplicatingAll() {
|
||||
ReplicationPeerConfig peerConfig;
|
||||
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
|
||||
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
|
||||
// 1. replication_all flag is false, no namespaces and table-cfs config
|
||||
builder.setReplicateAllUserTables(false);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 2. replicate_all flag is false, and only config table-cfs in peer
|
||||
// empty map
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// table testB
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// table testA
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 3. replication_all flag is false, and only config namespace in peer
|
||||
builder.setTableCFsMap(null);
|
||||
// empty set
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// namespace default
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("default");
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// namespace replication
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("replication");
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// 4. replicate_all flag is false, and config namespaces and table-cfs both
|
||||
// Namespaces config doesn't conflict with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
// Namespaces config conflicts with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,44 +134,6 @@ public final class ReplicationUtils {
|
|||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether we should replicate the given table.
|
||||
*/
|
||||
public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
|
||||
String namespace = tableName.getNamespaceAsString();
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
// replicate all user tables, but filter by exclude namespaces and table-cfs config
|
||||
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
|
||||
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
|
||||
return false;
|
||||
}
|
||||
Map<TableName, List<String>> excludedTableCFs = peerConfig.getExcludeTableCFsMap();
|
||||
// trap here, must check existence first since HashMap allows null value.
|
||||
if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
|
||||
return true;
|
||||
}
|
||||
List<String> cfs = excludedTableCFs.get(tableName);
|
||||
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
|
||||
// otherwise, we may still need to replicate the table but filter out some families.
|
||||
return cfs != null && !cfs.isEmpty();
|
||||
} else {
|
||||
// Not replicate all user tables, so filter by namespaces and table-cfs config
|
||||
Set<String> namespaces = peerConfig.getNamespaces();
|
||||
Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
|
||||
|
||||
if (namespaces == null && tableCFs == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// First filter by namespaces config
|
||||
// If table's namespace in peer config, all the tables data are applicable for replication
|
||||
if (namespaces != null && namespaces.contains(namespace)) {
|
||||
return true;
|
||||
}
|
||||
return tableCFs != null && tableCFs.containsKey(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the adaptive timeout value when performing a retry
|
||||
*/
|
||||
|
|
|
@ -1,235 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ReplicationTests.class, SmallTests.class })
|
||||
public class TestReplicationUtil {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationUtil.class);
|
||||
|
||||
private static TableName TABLE_A = TableName.valueOf("replication", "testA");
|
||||
private static TableName TABLE_B = TableName.valueOf("replication", "testB");
|
||||
|
||||
@Test
|
||||
public void testContainsWithReplicatingAll() {
|
||||
ReplicationPeerConfig peerConfig;
|
||||
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
|
||||
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
|
||||
// 1. replication_all flag is true, no namespaces and table-cfs config
|
||||
builder.setReplicateAllUserTables(true);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 2. replicate_all flag is true, and config in excludedTableCfs
|
||||
builder.setExcludeNamespaces(null);
|
||||
// empty map
|
||||
tableCfs = new HashMap<>();
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// table testB
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// table testA
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 3. replicate_all flag is true, and config in excludeNamespaces
|
||||
builder.setExcludeTableCFsMap(null);
|
||||
// empty set
|
||||
namespaces = new HashSet<>();
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// namespace default
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("default");
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// namespace replication
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("replication");
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
|
||||
// Namespaces config doesn't conflict with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// Namespaces config conflicts with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(true);
|
||||
builder.setExcludeTableCFsMap(tableCfs);
|
||||
builder.setExcludeNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainsWithoutReplicatingAll() {
|
||||
ReplicationPeerConfig peerConfig;
|
||||
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
|
||||
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
|
||||
// 1. replication_all flag is false, no namespaces and table-cfs config
|
||||
builder.setReplicateAllUserTables(false);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 2. replicate_all flag is false, and only config table-cfs in peer
|
||||
// empty map
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// table testB
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// table testA
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 3. replication_all flag is false, and only config namespace in peer
|
||||
builder.setTableCFsMap(null);
|
||||
// empty set
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// namespace default
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("default");
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// namespace replication
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("replication");
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// 4. replicate_all flag is false, and config namespaces and table-cfs both
|
||||
// Namespaces config doesn't conflict with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
// Namespaces config conflicts with table-cfs config
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TABLE_A, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
|
||||
namespaces = new HashSet<>();
|
||||
tableCfs = new HashMap<>();
|
||||
namespaces.add("replication");
|
||||
tableCfs.put(TABLE_B, null);
|
||||
builder.setReplicateAllUserTables(false);
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
builder.setNamespaces(namespaces);
|
||||
peerConfig = builder.build();
|
||||
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
|
||||
}
|
||||
}
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -168,11 +167,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
continue;
|
||||
}
|
||||
TableName tn = td.getTableName();
|
||||
if (!ReplicationUtils.contains(peerConfig, tn)) {
|
||||
if (!peerConfig.needToReplicate(tn)) {
|
||||
continue;
|
||||
}
|
||||
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
|
||||
ReplicationUtils.contains(oldPeerConfig, tn)) {
|
||||
oldPeerConfig.needToReplicate(tn)) {
|
||||
continue;
|
||||
}
|
||||
if (needReopen(tsm, tn)) {
|
||||
|
@ -206,7 +205,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
continue;
|
||||
}
|
||||
TableName tn = td.getTableName();
|
||||
if (!ReplicationUtils.contains(peerConfig, tn)) {
|
||||
if (!peerConfig.needToReplicate(tn)) {
|
||||
continue;
|
||||
}
|
||||
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
|
||||
|
|
|
@ -359,7 +359,7 @@ public class ReplicationPeerManager {
|
|||
|
||||
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
|
||||
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
|
||||
.filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
|
||||
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
|
|
@ -128,15 +128,15 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
|
|||
continue;
|
||||
}
|
||||
TableName tn = td.getTableName();
|
||||
if (ReplicationUtils.contains(oldPeerConfig, tn)) {
|
||||
if (!ReplicationUtils.contains(peerConfig, tn)) {
|
||||
if (oldPeerConfig.needToReplicate(tn)) {
|
||||
if (!peerConfig.needToReplicate(tn)) {
|
||||
// removed from peer config
|
||||
for (String encodedRegionName : MetaTableAccessor
|
||||
.getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
|
||||
addToList(encodedRegionNames, encodedRegionName, queueStorage);
|
||||
}
|
||||
}
|
||||
} else if (ReplicationUtils.contains(peerConfig, tn)) {
|
||||
} else if (peerConfig.needToReplicate(tn)) {
|
||||
// newly added to peer config
|
||||
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
|
|||
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
|
||||
if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) {
|
||||
return entry;
|
||||
} else {
|
||||
return null;
|
||||
|
|
|
@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
|
||||
|
@ -3926,8 +3925,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
|
||||
if (peerDescriptions != null && peerDescriptions.size() > 0) {
|
||||
List<String> peers = peerDescriptions.stream()
|
||||
.filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
|
||||
cleanReplicationBarrierTable))
|
||||
.filter(peerConfig -> peerConfig.getPeerConfig()
|
||||
.needToReplicate(cleanReplicationBarrierTable))
|
||||
.map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
|
||||
try {
|
||||
List<String> batch = new ArrayList<>();
|
||||
|
@ -3996,4 +3995,4 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -213,13 +213,11 @@ public class TestReplicationWALEntryFilters {
|
|||
@Test
|
||||
public void testNamespaceTableCfWALEntryFilter() {
|
||||
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||
ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
|
||||
|
||||
// 1. replicate_all flag is false, no namespaces and table-cfs config
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(null);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
Entry userEntry = createEntry(null, a, b, c);
|
||||
ChainWALEntryFilter filter =
|
||||
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
|
@ -229,9 +227,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// empty map
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
|
@ -239,9 +236,8 @@ public class TestReplicationWALEntryFilters {
|
|||
userEntry = createEntry(null, a, b, c);
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TableName.valueOf("bar"), null);
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
|
@ -249,9 +245,8 @@ public class TestReplicationWALEntryFilters {
|
|||
userEntry = createEntry(null, a, b, c);
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a), filter.filter(userEntry));
|
||||
|
||||
|
@ -259,9 +254,8 @@ public class TestReplicationWALEntryFilters {
|
|||
userEntry = createEntry(null, a, b, c, d);
|
||||
tableCfs = new HashMap<>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
|
||||
|
||||
|
@ -269,19 +263,17 @@ public class TestReplicationWALEntryFilters {
|
|||
when(peer.getTableCFs()).thenReturn(null);
|
||||
// empty set
|
||||
Set<String> namespaces = new HashSet<>();
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
|
||||
.setTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
// namespace default
|
||||
namespaces.add("default");
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
|
||||
|
@ -289,9 +281,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// namespace ns1
|
||||
namespaces = new HashSet<>();
|
||||
namespaces.add("ns1");
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
@ -302,10 +293,9 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<>();
|
||||
namespaces.add("ns1");
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
|
||||
.setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
|
||||
|
@ -314,10 +304,9 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
|
||||
.setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||
|
@ -326,10 +315,9 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<>();
|
||||
namespaces.add("ns1");
|
||||
tableCfs.put(TableName.valueOf("bar"), null);
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||
when(peerConfig.getNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
|
||||
.setTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
@ -338,14 +326,14 @@ public class TestReplicationWALEntryFilters {
|
|||
@Test
|
||||
public void testNamespaceTableCfWALEntryFilter2() {
|
||||
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||
ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
|
||||
|
||||
// 1. replicate_all flag is true
|
||||
// and no exclude namespaces and no exclude table-cfs config
|
||||
when(peerConfig.replicateAllUserTables()).thenReturn(true);
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setReplicateAllUserTables(true)
|
||||
.setExcludeNamespaces(null)
|
||||
.setExcludeTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
Entry userEntry = createEntry(null, a, b, c);
|
||||
ChainWALEntryFilter filter =
|
||||
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
|
@ -354,18 +342,16 @@ public class TestReplicationWALEntryFilters {
|
|||
// 2. replicate_all flag is true, and only config exclude namespaces
|
||||
// empty set
|
||||
Set<String> namespaces = new HashSet<String>();
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||
|
||||
// exclude namespace default
|
||||
namespaces.add("default");
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
@ -373,9 +359,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// exclude namespace ns1
|
||||
namespaces = new HashSet<String>();
|
||||
namespaces.add("ns1");
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||
|
@ -383,9 +368,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// 3. replicate_all flag is true, and only config exclude table-cfs
|
||||
// empty table-cfs map
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||
|
@ -393,9 +377,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// exclude table bar
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("bar"), null);
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||
|
@ -403,9 +386,8 @@ public class TestReplicationWALEntryFilters {
|
|||
// exclude table foo:a
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
|
||||
|
@ -416,9 +398,8 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
namespaces.add("ns1");
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(null, b), filter.filter(userEntry));
|
||||
|
@ -428,9 +409,8 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
namespaces.add("default");
|
||||
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
|
||||
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
|
||||
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
|
||||
userEntry = createEntry(null, a, b, c);
|
||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
|
Loading…
Reference in New Issue