HBASE-23309: Adding the flexibility to ChainWalEntryFilter to filter the whole entry if all cells get filtered (#837)
HBASE-23309: Adding the flexibility to ChainWalEntryFilter to filter the whole entry if all cells get filtered
This commit is contained in:
parent
8548463e74
commit
60d9430195
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link ChainWALEntryFilter} for providing more flexible options
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||||
|
public class ChainWALEmptyEntryFilter extends ChainWALEntryFilter {
|
||||||
|
|
||||||
|
private boolean filterEmptyEntry = false;
|
||||||
|
|
||||||
|
public ChainWALEmptyEntryFilter(final WALEntryFilter... filters) {
|
||||||
|
super(filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ChainWALEmptyEntryFilter(final List<WALEntryFilter> filters) {
|
||||||
|
super(filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WAL.Entry filter(WAL.Entry entry) {
|
||||||
|
entry = super.filter(entry);
|
||||||
|
if (filterEmptyEntry && entry != null && entry.getEdit().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To allow the empty entries to get filtered, we want to set this optional flag to decide
|
||||||
|
* if we want to filter the entries which have no cells or all cells got filtered
|
||||||
|
* though {@link WALCellFilter}.
|
||||||
|
*
|
||||||
|
* @param filterEmptyEntry flag
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setFilterEmptyEntry(final boolean filterEmptyEntry) {
|
||||||
|
this.filterEmptyEntry = filterEmptyEntry;
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,17 +68,26 @@ public class ChainWALEntryFilter implements WALEntryFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Entry filter(Entry entry) {
|
public Entry filter(Entry entry) {
|
||||||
|
entry = filterEntry(entry);
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
filterCells(entry);
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Entry filterEntry(Entry entry) {
|
||||||
for (WALEntryFilter filter : filters) {
|
for (WALEntryFilter filter : filters) {
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
entry = filter.filter(entry);
|
entry = filter.filter(entry);
|
||||||
}
|
}
|
||||||
filterCells(entry);
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void filterCells(Entry entry) {
|
protected void filterCells(Entry entry) {
|
||||||
if (entry == null || cellFilters.length == 0) {
|
if (entry == null || cellFilters.length == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,6 +155,74 @@ public class TestReplicationWALEntryFilters {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public static class FilterSomeCellsWALCellFilter implements WALEntryFilter, WALCellFilter {
|
||||||
|
@Override
|
||||||
|
public Entry filter(Entry entry) {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cell filterCell(Entry entry, Cell cell) {
|
||||||
|
if (Bytes.toString(
|
||||||
|
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).equals("a")) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return cell;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FilterAllCellsWALCellFilter implements WALEntryFilter, WALCellFilter {
|
||||||
|
@Override
|
||||||
|
public Entry filter(Entry entry) {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cell filterCell(Entry entry, Cell cell) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChainWALEntryWithCellFilter() {
|
||||||
|
Entry userEntry = createEntry(null, a, b, c);
|
||||||
|
ChainWALEntryFilter filterSomeCells =
|
||||||
|
new ChainWALEntryFilter(new FilterSomeCellsWALCellFilter());
|
||||||
|
// since WALCellFilter filter cells with rowkey 'a'
|
||||||
|
assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry));
|
||||||
|
|
||||||
|
Entry userEntry2 = createEntry(null, b, c, d);
|
||||||
|
// since there is no cell to get filtered, nothing should get filtered
|
||||||
|
assertEquals(userEntry2, filterSomeCells.filter(userEntry2));
|
||||||
|
|
||||||
|
// since we filter all the cells, we should get empty entry
|
||||||
|
ChainWALEntryFilter filterAllCells =
|
||||||
|
new ChainWALEntryFilter(new FilterAllCellsWALCellFilter());
|
||||||
|
assertEquals(createEntry(null), filterAllCells.filter(userEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChainWALEmptyEntryWithCellFilter() {
|
||||||
|
Entry userEntry = createEntry(null, a, b, c);
|
||||||
|
ChainWALEmptyEntryFilter filterSomeCells =
|
||||||
|
new ChainWALEmptyEntryFilter(new FilterSomeCellsWALCellFilter());
|
||||||
|
// since WALCellFilter filter cells with rowkey 'a'
|
||||||
|
assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry));
|
||||||
|
|
||||||
|
Entry userEntry2 = createEntry(null, b, c, d);
|
||||||
|
// since there is no cell to get filtered, nothing should get filtered
|
||||||
|
assertEquals(userEntry2, filterSomeCells.filter(userEntry2));
|
||||||
|
|
||||||
|
ChainWALEmptyEntryFilter filterAllCells =
|
||||||
|
new ChainWALEmptyEntryFilter(new FilterAllCellsWALCellFilter());
|
||||||
|
assertEquals(createEntry(null), filterAllCells.filter(userEntry));
|
||||||
|
// let's set the filter empty entry flag to true now for the above case
|
||||||
|
filterAllCells.setFilterEmptyEntry(true);
|
||||||
|
// since WALCellFilter filter all cells, whole entry should be filtered
|
||||||
|
assertEquals(null, filterAllCells.filter(userEntry));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChainWALEntryFilter() {
|
public void testChainWALEntryFilter() {
|
||||||
Entry userEntry = createEntry(null, a, b, c);
|
Entry userEntry = createEntry(null, a, b, c);
|
||||||
|
|
Loading…
Reference in New Issue