HBASE-16086 TableCfWALEntryFilter and ScopeWALEntryFilter should not redundantly iterate over cells (Vincent Poon)
Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
This commit is contained in:
parent
fe57fa4daa
commit
94026d0d09
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
public class BulkLoadCellFilter {
|
||||
private static final Log LOG = LogFactory.getLog(BulkLoadCellFilter.class);
|
||||
|
||||
/**
|
||||
* Filters the bulk load cell using the supplied predicate.
|
||||
* @param cell The WAL cell to filter.
|
||||
* @param famPredicate Returns true of given family should be removed.
|
||||
* @return The filtered cell.
|
||||
*/
|
||||
public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) {
|
||||
byte[] fam;
|
||||
BulkLoadDescriptor bld = null;
|
||||
try {
|
||||
bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get bulk load events information from the WAL file.", e);
|
||||
return cell;
|
||||
}
|
||||
List<StoreDescriptor> storesList = bld.getStoresList();
|
||||
// Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
|
||||
List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
|
||||
Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
|
||||
boolean anyStoreRemoved = false;
|
||||
while (copiedStoresListIterator.hasNext()) {
|
||||
StoreDescriptor sd = copiedStoresListIterator.next();
|
||||
fam = sd.getFamilyName().toByteArray();
|
||||
if (famPredicate.apply(fam)) {
|
||||
copiedStoresListIterator.remove();
|
||||
anyStoreRemoved = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!anyStoreRemoved) {
|
||||
return cell;
|
||||
} else if (copiedStoresList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
BulkLoadDescriptor.Builder newDesc =
|
||||
BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
|
||||
.setEncodedRegionName(bld.getEncodedRegionName())
|
||||
.setBulkloadSeqNum(bld.getBulkloadSeqNum());
|
||||
newDesc.addAllStores(copiedStoresList);
|
||||
BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
|
||||
return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
|
||||
cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
|
@ -34,9 +35,11 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
|||
public class ChainWALEntryFilter implements WALEntryFilter {
|
||||
|
||||
private final WALEntryFilter[] filters;
|
||||
private WALCellFilter[] cellFilters;
|
||||
|
||||
public ChainWALEntryFilter(WALEntryFilter...filters) {
|
||||
this.filters = filters;
|
||||
initCellFilters();
|
||||
}
|
||||
|
||||
public ChainWALEntryFilter(List<WALEntryFilter> filters) {
|
||||
|
@ -49,8 +52,18 @@ public class ChainWALEntryFilter implements WALEntryFilter {
|
|||
rawFilters.add(filter);
|
||||
}
|
||||
}
|
||||
|
||||
this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
|
||||
initCellFilters();
|
||||
}
|
||||
|
||||
public void initCellFilters() {
|
||||
ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
|
||||
for (WALEntryFilter filter : filters) {
|
||||
if (filter instanceof WALCellFilter) {
|
||||
cellFilters.add((WALCellFilter) filter);
|
||||
}
|
||||
}
|
||||
this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -61,7 +74,30 @@ public class ChainWALEntryFilter implements WALEntryFilter {
|
|||
}
|
||||
entry = filter.filter(entry);
|
||||
}
|
||||
filterCells(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
private void filterCells(Entry entry) {
|
||||
if (entry == null || cellFilters.length == 0) {
|
||||
return;
|
||||
}
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
int size = cells.size();
|
||||
for (int i = size - 1; i >= 0; i--) {
|
||||
Cell cell = cells.get(i);
|
||||
for (WALCellFilter filter : cellFilters) {
|
||||
cell = filter.filterCell(entry, cell);
|
||||
if (cell != null) {
|
||||
cells.set(i, cell);
|
||||
} else {
|
||||
cells.remove(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cells.size() < size / 2) {
|
||||
cells.trimToSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,29 +18,24 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
/**
|
||||
* Keeps KVs that are scoped other than local
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ScopeWALEntryFilter implements WALEntryFilter {
|
||||
private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
|
||||
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
|
||||
|
||||
BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
|
||||
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
|
@ -48,72 +43,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
|
|||
if (scopes == null || scopes.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
int size = cells.size();
|
||||
byte[] fam;
|
||||
for (int i = size - 1; i >= 0; i--) {
|
||||
Cell cell = cells.get(i);
|
||||
// If a bulk load entry has a scope then that means user has enabled replication for
|
||||
// bulk load hfiles.
|
||||
// TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
|
||||
// cannot refactor into one now, can revisit and see if any way to unify them.
|
||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||
Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
|
||||
if (filteredBulkLoadEntryCell != null) {
|
||||
cells.set(i, filteredBulkLoadEntryCell);
|
||||
} else {
|
||||
cells.remove(i);
|
||||
}
|
||||
} else {
|
||||
// The scope will be null or empty if
|
||||
// there's nothing to replicate in that WALEdit
|
||||
fam = CellUtil.cloneFamily(cell);
|
||||
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
|
||||
cells.remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cells.size() < size / 2) {
|
||||
cells.trimToSize();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
|
||||
byte[] fam;
|
||||
BulkLoadDescriptor bld = null;
|
||||
try {
|
||||
bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get bulk load events information from the WAL file.", e);
|
||||
return cell;
|
||||
@Override
|
||||
public Cell filterCell(Entry entry, Cell cell) {
|
||||
final NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
|
||||
// The scope will be null or empty if
|
||||
// there's nothing to replicate in that WALEdit
|
||||
byte[] fam = CellUtil.cloneFamily(cell);
|
||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
|
||||
@Override
|
||||
public boolean apply(byte[] fam) {
|
||||
return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
|
||||
}
|
||||
List<StoreDescriptor> storesList = bld.getStoresList();
|
||||
// Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
|
||||
List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
|
||||
Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
|
||||
boolean anyStoreRemoved = false;
|
||||
while (copiedStoresListIterator.hasNext()) {
|
||||
StoreDescriptor sd = copiedStoresListIterator.next();
|
||||
fam = sd.getFamilyName().toByteArray();
|
||||
});
|
||||
} else {
|
||||
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
|
||||
copiedStoresListIterator.remove();
|
||||
anyStoreRemoved = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!anyStoreRemoved) {
|
||||
return cell;
|
||||
} else if (copiedStoresList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
BulkLoadDescriptor.Builder newDesc =
|
||||
BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
|
||||
.setEncodedRegionName(bld.getEncodedRegionName())
|
||||
.setBulkloadSeqNum(bld.getBulkloadSeqNum());
|
||||
newDesc.addAllStores(copiedStoresList);
|
||||
BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
|
||||
return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
|
||||
cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -29,16 +26,17 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
public class TableCfWALEntryFilter implements WALEntryFilter {
|
||||
import com.google.common.base.Predicate;
|
||||
|
||||
public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
|
||||
private final ReplicationPeer peer;
|
||||
private ReplicationPeer peer;
|
||||
private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
|
||||
|
||||
public TableCfWALEntryFilter(ReplicationPeer peer) {
|
||||
this.peer = peer;
|
||||
|
@ -47,91 +45,56 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
|
|||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
TableName tabName = entry.getKey().getTablename();
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
Map<TableName, List<String>> tableCFs = null;
|
||||
Map<TableName, List<String>> tableCFs = getTableCfs();
|
||||
// If null means user has explicitly not configured any table CFs so all the tables data are
|
||||
// applicable for replication
|
||||
if (tableCFs == null) return entry;
|
||||
|
||||
if (!tableCFs.containsKey(tabName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell filterCell(final Entry entry, Cell cell) {
|
||||
final Map<TableName, List<String>> tableCfs = getTableCfs();
|
||||
if (tableCfs == null) return cell;
|
||||
TableName tabName = entry.getKey().getTablename();
|
||||
List<String> cfs = tableCfs.get(tabName);
|
||||
// ignore(remove) kv if its cf isn't in the replicable cf list
|
||||
// (empty cfs means all cfs of this table are replicable)
|
||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
|
||||
@Override
|
||||
public boolean apply(byte[] fam) {
|
||||
if (tableCfs != null) {
|
||||
List<String> cfs = tableCfs.get(entry.getKey().getTablename());
|
||||
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if ((cfs != null) && !cfs.contains(
|
||||
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
|
||||
Map<TableName, List<String>> getTableCfs() {
|
||||
Map<TableName, List<String>> tableCFs = null;
|
||||
try {
|
||||
tableCFs = this.peer.getTableCFs();
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
|
||||
", degenerate as if it's not configured by keeping tableCFs==null");
|
||||
}
|
||||
int size = cells.size();
|
||||
|
||||
// If null means user has explicitly not configured any table CFs so all the tables data are
|
||||
// applicable for replication
|
||||
if (tableCFs == null) {
|
||||
return entry;
|
||||
}
|
||||
// return null(prevent replicating) if logKey's table isn't in this peer's
|
||||
// replicable table list
|
||||
if (!tableCFs.containsKey(tabName)) {
|
||||
return null;
|
||||
} else {
|
||||
List<String> cfs = tableCFs.get(tabName);
|
||||
for (int i = size - 1; i >= 0; i--) {
|
||||
Cell cell = cells.get(i);
|
||||
// TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
|
||||
// cannot refactor into one now, can revisit and see if any way to unify them.
|
||||
// Filter bulk load entries separately
|
||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||
Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
|
||||
if (filteredBulkLoadEntryCell != null) {
|
||||
cells.set(i, filteredBulkLoadEntryCell);
|
||||
} else {
|
||||
cells.remove(i);
|
||||
}
|
||||
} else {
|
||||
// ignore(remove) kv if its cf isn't in the replicable cf list
|
||||
// (empty cfs means all cfs of this table are replicable)
|
||||
if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
|
||||
cell.getFamilyOffset(), cell.getFamilyLength()))) {
|
||||
cells.remove(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cells.size() < size/2) {
|
||||
cells.trimToSize();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
|
||||
byte[] fam;
|
||||
BulkLoadDescriptor bld = null;
|
||||
try {
|
||||
bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get bulk load events information from the WAL file.", e);
|
||||
return cell;
|
||||
}
|
||||
List<StoreDescriptor> storesList = bld.getStoresList();
|
||||
// Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
|
||||
List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
|
||||
Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
|
||||
boolean anyStoreRemoved = false;
|
||||
while (copiedStoresListIterator.hasNext()) {
|
||||
StoreDescriptor sd = copiedStoresListIterator.next();
|
||||
fam = sd.getFamilyName().toByteArray();
|
||||
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
|
||||
copiedStoresListIterator.remove();
|
||||
anyStoreRemoved = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!anyStoreRemoved) {
|
||||
return cell;
|
||||
} else if (copiedStoresList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
BulkLoadDescriptor.Builder newDesc =
|
||||
BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
|
||||
.setEncodedRegionName(bld.getEncodedRegionName())
|
||||
.setBulkloadSeqNum(bld.getBulkloadSeqNum());
|
||||
newDesc.addAllStores(copiedStoresList);
|
||||
BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
|
||||
return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
|
||||
cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
|
||||
return tableCFs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
||||
/**
|
||||
* A filter for WAL entry cells before being sent over to replication.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
public interface WALCellFilter {
|
||||
|
||||
/**
|
||||
* Applies the filter, possibly returning a different Cell instance.
|
||||
* If null is returned, the cell will be skipped.
|
||||
* @param entry Entry which contains the cell
|
||||
* @param cell Cell to filter
|
||||
* @return a (possibly modified) Cell to use. Returning null will cause the cell
|
||||
* to be skipped for replication.
|
||||
*/
|
||||
public Cell filterCell(Entry entry, Cell cell);
|
||||
|
||||
}
|
|
@ -76,7 +76,7 @@ public class TestReplicationWALEntryFilters {
|
|||
|
||||
@Test
|
||||
public void testScopeWALEntryFilter() {
|
||||
ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
|
||||
WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
|
||||
|
||||
Entry userEntry = createEntry(a, b);
|
||||
Entry userEntryA = createEntry(a);
|
||||
|
@ -205,14 +205,14 @@ public class TestReplicationWALEntryFilters {
|
|||
|
||||
when(peer.getTableCFs()).thenReturn(null);
|
||||
Entry userEntry = createEntry(a, b, c);
|
||||
TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
|
||||
WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
|
||||
|
||||
// empty map
|
||||
userEntry = createEntry(a, b, c);
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
// table bar
|
||||
|
@ -220,7 +220,7 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("bar"), null);
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
// table foo:a
|
||||
|
@ -228,15 +228,15 @@ public class TestReplicationWALEntryFilters {
|
|||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
assertEquals(createEntry(a), filter.filter(userEntry));
|
||||
|
||||
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(a), filter.filter(userEntry));
|
||||
// table foo:a,c
|
||||
userEntry = createEntry(a, b, c, d);
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
|
||||
assertEquals(createEntry(a,c), filter.filter(userEntry));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue