HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
XinSun 2021-02-26 09:50:23 +08:00 committed by sunxin
parent d724d0576f
commit 328ff8c05a
6 changed files with 549 additions and 260 deletions

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* A configuration for the replication peer cluster.
*/
@ -372,6 +374,19 @@ public class ReplicationPeerConfig {
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
return needToReplicate(table, null);
}
/**
* Decide whether the passed family of the table need replicate to the peer cluster according to
* this peer config.
* @param table name of the table
* @param family family name
* @return true if (the family of) the table need replicate to the peer cluster.
* If passed family is null, return true if any CFs of the table need replicate;
* If passed family is not null, return true if the passed family need replicate.
*/
public boolean needToReplicate(TableName table, byte[] family) {
String namespace = table.getNamespaceAsString();
if (replicateAllUserTables) {
// replicate all user tables, but filter by exclude namespaces and table-cfs config
@ -383,9 +398,12 @@ public class ReplicationPeerConfig {
return true;
}
Collection<String> cfs = excludeTableCFsMap.get(table);
// if cfs is null or empty then we can make sure that we do not need to replicate this table,
// If cfs is null or empty then we can make sure that we do not need to replicate this table,
// otherwise, we may still need to replicate the table but filter out some families.
return cfs != null && !cfs.isEmpty();
return cfs != null && !cfs.isEmpty()
// If exclude-table-cfs contains passed family then we make sure that we do not need to
// replicate this family.
&& (family == null || !cfs.contains(Bytes.toString(family)));
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
if (namespaces == null && tableCFsMap == null) {
@ -396,7 +414,12 @@ public class ReplicationPeerConfig {
if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
return tableCFsMap != null && tableCFsMap.containsKey(table);
// If table-cfs contains this table then we can make sure that we need replicate some CFs of
// this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
return tableCFsMap != null && tableCFsMap.containsKey(table)
&& (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table))
// If table-cfs must contain passed family then we need to replicate this family.
|| tableCFsMap.get(table).contains(Bytes.toString(family)));
}
}
}

View File

@ -17,21 +17,26 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.HashSet;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
import org.junit.Assert;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({ClientTests.class, SmallTests.class})
public class TestReplicationPeerConfig {
@ -39,8 +44,12 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
private static TableName TABLE_A = TableName.valueOf("replication", "testA");
private static TableName TABLE_B = TableName.valueOf("replication", "testB");
private static final String NAMESPACE_REPLICATE = "replicate";
private static final String NAMESPACE_OTHER = "other";
private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA");
private static final TableName TABLE_B = TableName.valueOf(NAMESPACE_REPLICATE, "testB");
private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
@Test
public void testClassMethodsAreBuilderStyle() {
@ -61,193 +70,230 @@ public class TestReplicationPeerConfig {
@Test
public void testNeedToReplicateWithReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
// 1. replication_all flag is true, no namespaces and table-cfs config
builder.setReplicateAllUserTables(true);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is true, and config in excludedTableCfs
builder.setExcludeNamespaces(null);
// empty map
tableCfs = new HashMap<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// Exclude empty table-cfs map
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(Maps.newHashMap())
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// table testB
tableCfs = new HashMap<>();
// Exclude table B
Map<TableName, List<String>> tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(tableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
assertFalse(peerConfig.needToReplicate(TABLE_B));
// 3. replicate_all flag is true, and config in excludeNamespaces
builder.setExcludeTableCFsMap(null);
// empty set
namespaces = new HashSet<>();
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// Exclude empty namespace set
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet())
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// Exclude namespace other
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(true);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// Exclude namespace replication
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.setExcludeTableCFsMap(tableCfs)
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(tableCfs)
.setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
assertTrue(peerConfig.needToReplicate(TABLE_B));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(true);
builder.setExcludeTableCFsMap(tableCfs);
builder.setExcludeNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(tableCfs)
.setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
assertFalse(peerConfig.needToReplicate(TABLE_B));
}
@Test
public void testNeedToReplicateWithoutReplicatingAll() {
ReplicationPeerConfig peerConfig;
ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
Map<TableName, List<String>> tableCfs = new HashMap<>();
Set<String> namespaces = new HashSet<>();
Map<TableName, List<String>> tableCfs;
// 1. replication_all flag is false, no namespaces and table-cfs config
builder.setReplicateAllUserTables(false);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is false, and only config table-cfs in peer
// empty map
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// Set empty table-cfs map
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(Maps.newHashMap())
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// table testB
tableCfs = new HashMap<>();
// Set table B
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// table testA
tableCfs = new HashMap<>();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
assertTrue(peerConfig.needToReplicate(TABLE_B));
// 3. replication_all flag is false, and only config namespace in peer
builder.setTableCFsMap(null);
// empty set
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// Set empty namespace set
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setNamespaces(Sets.newHashSet())
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// namespace default
namespaces = new HashSet<>();
namespaces.add("default");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
// Set namespace other
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
// namespace replication
namespaces = new HashSet<>();
namespaces.add("replication");
builder.setReplicateAllUserTables(false);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
// Set namespace replication
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config doesn't conflict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("replication");
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_B, null);
builder.setReplicateAllUserTables(false);
builder.setTableCFsMap(tableCfs);
builder.setNamespaces(namespaces);
peerConfig = builder.build();
Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
.setTableCFsMap(tableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
}
@Test
public void testNeedToReplicateCFWithReplicatingAll() {
Map<TableName, List<String>> excludeTableCfs = Maps.newHashMap();
excludeTableCfs.put(TABLE_A, null);
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCfs)
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
excludeTableCfs = Maps.newHashMap();
excludeTableCfs.put(TABLE_A, Lists.newArrayList());
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCfs)
.build();
assertFalse(peerConfig.needToReplicate(TABLE_A));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
excludeTableCfs = Maps.newHashMap();
excludeTableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
@Test
public void testNeedToReplicateCFWithoutReplicatingAll() {
Map<TableName, List<String>> tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, Lists.newArrayList());
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
.setReplicateAllUserTables(false)
.setTableCFsMap(tableCfs)
.build();
assertTrue(peerConfig.needToReplicate(TABLE_A));
assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
}

View File

@ -18,27 +18,17 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
* exclude namespaces config, and exclude table-cfs config.
* Filter a WAL Entry by the peer config according to the table and family which it belongs to.
*
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
* you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
* Note: set a exclude namespace means that all tables in this namespace can't be replicated.
*
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* But you can set namespaces or table-cfs which will be replicated to peer cluster.
* Note: set a namespace means that all tables in this namespace will be replicated.
* @see ReplicationPeerConfig#needToReplicate(TableName, byte[])
*/
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
@ -62,72 +52,12 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Cell filterCell(final Entry entry, Cell cell) {
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, but filter by exclude table-cfs config
final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
if (excludeTableCfs == null) {
return cell;
}
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell,
fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam),
excludeTableCfs));
} else {
if (filterByExcludeTableCfs(entry.getKey().getTableName(),
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
excludeTableCfs)) {
return null;
}
}
return cell;
TableName tableName = entry.getKey().getTableName();
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
// If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter.
return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam));
} else {
// not replicate all user tables, so filter by table-cfs config
final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
if (tableCfs == null) {
return cell;
}
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell,
fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs));
} else {
if (filterByTableCfs(entry.getKey().getTableName(),
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
tableCfs)) {
return null;
}
}
return cell;
return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null;
}
}
private boolean filterByExcludeTableCfs(TableName tableName, String family,
Map<TableName, List<String>> excludeTableCfs) {
List<String> excludeCfs = excludeTableCfs.get(tableName);
if (excludeCfs != null) {
// empty cfs means all cfs of this table are excluded
if (excludeCfs.isEmpty()) {
return true;
}
// ignore(remove) kv if its cf is in the exclude cfs list
if (excludeCfs.contains(family)) {
return true;
}
}
return false;
}
private boolean filterByTableCfs(TableName tableName, String family,
Map<TableName, List<String>> tableCfs) {
List<String> cfs = tableCfs.get(tableName);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (cfs != null && !cfs.contains(family)) {
return true;
}
return false;
}
}

View File

@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -273,31 +272,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
}

View File

@ -183,9 +183,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
//adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
setupCoprocessor(UTIL1, "cluster1");
setupCoprocessor(UTIL2, "cluster2");
setupCoprocessor(UTIL3, "cluster3");
setupCoprocessor(UTIL1);
setupCoprocessor(UTIL2);
setupCoprocessor(UTIL3);
BULK_LOADS_COUNT = new AtomicInteger(0);
}
@ -194,7 +194,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
.setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
}
private void setupCoprocessor(HBaseTestingUtility cluster, String name){
private void setupCoprocessor(HBaseTestingUtility cluster){
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
try {
TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().

View File

@ -0,0 +1,310 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({ ReplicationTests.class, SmallTests.class})
public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class);
private static final String PEER1_CLUSTER_ID = "peer1";
private static final String PEER2_CLUSTER_ID = "peer2";
private static final String REPLICATE_NAMESPACE = "replicate_ns";
private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
private static final TableName REPLICATE_TABLE =
TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
private static final TableName NO_REPLICATE_TABLE =
TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
private static final byte[] CF_A = Bytes.toBytes("cfa");
private static final byte[] CF_B = Bytes.toBytes("cfb");
private byte[] row = Bytes.toBytes("r1");
private byte[] qualifier = Bytes.toBytes("q1");
private byte[] value = Bytes.toBytes("v1");
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
private static Admin admin1;
private static Admin admin2;
private static ReplicationQueueStorage queueStorage;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
TestReplicationBase.setUpBeforeClass();
admin1 = UTIL1.getConnection().getAdmin();
admin2 = UTIL2.getConnection().getAdmin();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
UTIL1.getConfiguration());
admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
}
protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
config.writeXml(new FileOutputStream(sourceConfigFile));
config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
}
@Before
public void setUp() throws Exception {
for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
admin1.removeReplicationPeer(peer.getPeerId());
}
}
@After
public void teardown() throws Exception {
for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
admin1.removeReplicationPeer(peer.getPeerId());
}
for (TableName tableName : admin1.listTableNames()) {
UTIL1.deleteTable(tableName);
}
for (TableName tableName : admin2.listTableNames()) {
UTIL2.deleteTable(tableName);
}
}
@Test
public void testWhenExcludeCF() throws Exception {
// Create table in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
// Add peer, setReplicateAllUserTables true, but exclude CF_B.
Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCFs)
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the CF that is not replicated.
bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
@Test
public void testWhenExcludeTable() throws Exception {
// Create 2 tables in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A);
createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
// Add peer, setReplicateAllUserTables true, but exclude one table.
Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
excludeTableCFs.put(NO_REPLICATE_TABLE, null);
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCFs)
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the table that is not replicated.
bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
@Test
public void testWhenExcludeNamespace() throws Exception {
// Create 2 tables in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A);
createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
// Add peer, setReplicateAllUserTables true, but exclude one namespace.
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE))
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the table of the namespace that is not replicated.
byte[] row = Bytes.toBytes("001");
byte[] value = Bytes.toBytes("v1");
bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
protected void bulkLoadOnCluster(TableName tableName, byte[] family)
throws Exception {
String bulkLoadFilePath = createHFileForFamilies(family);
copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
}
private String createHFileForFamilies(byte[] family) throws IOException {
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
cellBuilder.setRow(row)
.setFamily(family)
.setQualifier(qualifier)
.setValue(value)
.setType(Cell.Type.Put);
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
File hFileLocation = testFolder.newFile();
FSDataOutputStream out =
new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(cellBuilder.build()));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
throws Exception {
Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
cluster.getFileSystem().mkdirs(bulkLoadDir);
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}
private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] cf : cfs) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
}
TableDescriptor td = builder.build();
admin1.createTable(td);
admin2.createTable(td);
}
}