HBASE-23345 Table need to replication unless all of cfs are excluded (#881)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
XinSun 2019-12-03 13:35:23 +08:00 committed by Guanghao Zhang
parent 97e0107000
commit 27cfe1bb27
11 changed files with 279 additions and 364 deletions

View File

@ -392,22 +392,31 @@ public class ReplicationPeerConfig {
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
String namespace = table.getNamespaceAsString();
if (replicateAllUserTables) {
if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
// replicate all user tables, but filter by exclude namespaces and table-cfs config
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return false;
}
if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
return false;
// trap here, must check existence first since HashMap allows null value.
if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
return true;
}
return true;
Collection<String> cfs = excludeTableCFsMap.get(table);
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
// otherwise, we may still need to replicate the table but filter out some families.
return cfs != null && !cfs.isEmpty();
} else {
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
// Not replicate all user tables, so filter by namespaces and table-cfs config
if (namespaces == null && tableCFsMap == null) {
return false;
}
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
return true;
}
return false;
return tableCFsMap != null && tableCFsMap.containsKey(table);
}
}
}

View File

@ -17,10 +17,17 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -32,6 +39,9 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
private static TableName TABLE_A = TableName.valueOf("replication", "testA");
private static TableName TABLE_B = TableName.valueOf("replication", "testB");
@Test
public void testClassMethodsAreBuilderStyle() {
/* ReplicationPeerConfig should have a builder style setup where setXXX/addXXX methods
@ -48,4 +58,196 @@ public class TestReplicationPeerConfig {
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
}
@Test
public void testNeedToReplicateWithReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
// 1. replication_all flag is true, no namespaces and table-cfs config
builder.setReplicateAllUserTables(true);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is true, and config in excludedTableCfs
builder.setExcludeNamespaces(null);
// empty map
tableCfs = new HashMap<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// table testB
tableCfs = new HashMap<>();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// 3. replicate_all flag is true, and config in excludeNamespaces
builder.setExcludeTableCFsMap(null);
// empty set
namespaces = new HashSet<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
}
@Test
public void testNeedToReplicateWithoutReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
// 1. replication_all flag is false, no namespaces and table-cfs config
builder.setReplicateAllUserTables(false);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is false, and only config table-cfs in peer
// empty map
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// table testB
tableCfs = new HashMap<>();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// 3. replication_all flag is false, and only config namespace in peer
builder.setTableCFsMap(null);
// empty set
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
}
}

View File

@ -154,44 +154,6 @@ public final class ReplicationUtils {
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/**
* Returns whether we should replicate the given table.
*/
public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
String namespace = tableName.getNamespaceAsString();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, but filter by exclude namespaces and table-cfs config
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return false;
}
Map<TableName, List<String>> excludedTableCFs = peerConfig.getExcludeTableCFsMap();
// trap here, must check existence first since HashMap allows null value.
if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
return true;
}
List<String> cfs = excludedTableCFs.get(tableName);
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
// otherwise, we may still need to replicate the table but filter out some families.
return cfs != null && !cfs.isEmpty();
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
Set<String> namespaces = peerConfig.getNamespaces();
Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
if (namespaces == null && tableCFs == null) {
return false;
}
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
return tableCFs != null && tableCFs.containsKey(tableName);
}
}
public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
throws IOException {
return new Path(remoteWALDir).getFileSystem(conf);

View File

@ -1,235 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, SmallTests.class })
public class TestReplicationUtil {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationUtil.class);
private static TableName TABLE_A = TableName.valueOf("replication", "testA");
private static TableName TABLE_B = TableName.valueOf("replication", "testB");
@Test
public void testContainsWithReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
// 1. replication_all flag is true, no namespaces and table-cfs config
builder.setReplicateAllUserTables(true);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// 2. replicate_all flag is true, and config in excludedTableCfs
builder.setExcludeNamespaces(null);
// empty map
tableCfs = new HashMap<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// table testB
tableCfs = new HashMap<>();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// 3. replicate_all flag is true, and config in excludeNamespaces
builder.setExcludeTableCFsMap(null);
// empty set
namespaces = new HashSet<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
}
@Test
public void testContainsWithoutReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
// 1. replication_all flag is false, no namespaces and table-cfs config
builder.setReplicateAllUserTables(false);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// 2. replicate_all flag is false, and only config table-cfs in peer
// empty map
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// table testB
tableCfs = new HashMap<>();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// 3. replication_all flag is false, and only config namespace in peer
builder.setTableCFsMap(null);
// empty set
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -118,7 +117,7 @@ public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockPr
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -144,11 +143,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
continue;
}
TableName tn = td.getTableName();
if (!ReplicationUtils.contains(peerConfig, tn)) {
if (!peerConfig.needToReplicate(tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
ReplicationUtils.contains(oldPeerConfig, tn)) {
oldPeerConfig.needToReplicate(tn)) {
continue;
}
if (needReopen(tsm, tn)) {

View File

@ -503,7 +503,7 @@ public class ReplicationPeerManager {
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
.filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
.collect(Collectors.toList());
}

View File

@ -128,15 +128,15 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
continue;
}
TableName tn = td.getTableName();
if (ReplicationUtils.contains(oldPeerConfig, tn)) {
if (!ReplicationUtils.contains(peerConfig, tn)) {
if (oldPeerConfig.needToReplicate(tn)) {
if (!peerConfig.needToReplicate(tn)) {
// removed from peer config
for (String encodedRegionName : MetaTableAccessor
.getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
addToList(encodedRegionNames, encodedRegionName, queueStorage);
}
}
} else if (ReplicationUtils.contains(peerConfig, tn)) {
} else if (peerConfig.needToReplicate(tn)) {
// newly added to peer config
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}

View File

@ -52,7 +52,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Entry filter(Entry entry) {
if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) {
return entry;
} else {
return null;

View File

@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@ -3916,8 +3915,8 @@ public class HBaseFsck extends Configured implements Closeable {
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
if (peerDescriptions != null && peerDescriptions.size() > 0) {
List<String> peers = peerDescriptions.stream()
.filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
cleanReplicationBarrierTable))
.filter(peerConfig -> peerConfig.getPeerConfig()
.needToReplicate(cleanReplicationBarrierTable))
.map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
try {
List<String> batch = new ArrayList<>();
@ -3986,4 +3985,4 @@ public class HBaseFsck extends Configured implements Closeable {
}
}
}
}
}

View File

@ -206,13 +206,11 @@ public class TestReplicationWALEntryFilters {
@Test
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is false, no namespaces and table-cfs config
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(null);
when(peerConfig.getTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@ -222,9 +220,8 @@ public class TestReplicationWALEntryFilters {
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>();
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@ -232,9 +229,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@ -242,9 +238,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
@ -252,9 +247,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
@ -262,19 +256,17 @@ public class TestReplicationWALEntryFilters {
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<>();
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
.setTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// namespace default
namespaces.add("default");
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@ -282,9 +274,8 @@ public class TestReplicationWALEntryFilters {
// namespace ns1
namespaces = new HashSet<>();
namespaces.add("ns1");
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@ -295,10 +286,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
.setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@ -307,10 +297,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
.setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@ -319,10 +308,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("bar"), null);
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
.setTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@ -331,14 +319,14 @@ public class TestReplicationWALEntryFilters {
@Test
public void testNamespaceTableCfWALEntryFilter2() {
ReplicationPeer peer = mock(ReplicationPeer.class);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is true
// and no exclude namespaces and no exclude table-cfs config
when(peerConfig.replicateAllUserTables()).thenReturn(true);
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setReplicateAllUserTables(true)
.setExcludeNamespaces(null)
.setExcludeTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@ -347,18 +335,16 @@ public class TestReplicationWALEntryFilters {
// 2. replicate_all flag is true, and only config exclude namespaces
// empty set
Set<String> namespaces = new HashSet<String>();
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude namespace default
namespaces.add("default");
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@ -366,9 +352,8 @@ public class TestReplicationWALEntryFilters {
// exclude namespace ns1
namespaces = new HashSet<String>();
namespaces.add("ns1");
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@ -376,9 +361,8 @@ public class TestReplicationWALEntryFilters {
// 3. replicate_all flag is true, and only config exclude table-cfs
// empty table-cfs map
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@ -386,9 +370,8 @@ public class TestReplicationWALEntryFilters {
// exclude table bar
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@ -396,9 +379,8 @@ public class TestReplicationWALEntryFilters {
// exclude table foo:a
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
@ -409,9 +391,8 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b), filter.filter(userEntry));
@ -421,9 +402,8 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));