diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java
new file mode 100644
index 00000000000..19fd0c77e7e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A {@link ChainWALEntryFilter} for providing more flexible options
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public class ChainWALEmptyEntryFilter extends ChainWALEntryFilter {
+
+ private boolean filterEmptyEntry = false;
+
+ public ChainWALEmptyEntryFilter(final WALEntryFilter... filters) {
+ super(filters);
+ }
+
+ public ChainWALEmptyEntryFilter(final List filters) {
+ super(filters);
+ }
+
+ @Override
+ public WAL.Entry filter(WAL.Entry entry) {
+ entry = super.filter(entry);
+ if (filterEmptyEntry && entry != null && entry.getEdit().isEmpty()) {
+ return null;
+ }
+ return entry;
+ }
+
+ /**
+ * To allow the empty entries to get filtered, we want to set this optional flag to decide
+ * if we want to filter the entries which have no cells or all cells got filtered
+ * though {@link WALCellFilter}.
+ *
+ * @param filterEmptyEntry flag
+ */
+ @VisibleForTesting
+ public void setFilterEmptyEntry(final boolean filterEmptyEntry) {
+ this.filterEmptyEntry = filterEmptyEntry;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 2bb981119af..ae3c74ad475 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -68,17 +68,26 @@ public class ChainWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
+ entry = filterEntry(entry);
+ if (entry == null) {
+ return null;
+ }
+
+ filterCells(entry);
+ return entry;
+ }
+
+ protected Entry filterEntry(Entry entry) {
for (WALEntryFilter filter : filters) {
if (entry == null) {
return null;
}
entry = filter.filter(entry);
}
- filterCells(entry);
return entry;
}
- private void filterCells(Entry entry) {
+ protected void filterCells(Entry entry) {
if (entry == null || cellFilters.length == 0) {
return;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 10c6f72610e..3a6cfd45d32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -155,6 +155,74 @@ public class TestReplicationWALEntryFilters {
}
};
+ public static class FilterSomeCellsWALCellFilter implements WALEntryFilter, WALCellFilter {
+ @Override
+ public Entry filter(Entry entry) {
+ return entry;
+ }
+
+ @Override
+ public Cell filterCell(Entry entry, Cell cell) {
+ if (Bytes.toString(
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).equals("a")) {
+ return null;
+ } else {
+ return cell;
+ }
+ }
+ }
+
+ public static class FilterAllCellsWALCellFilter implements WALEntryFilter, WALCellFilter {
+ @Override
+ public Entry filter(Entry entry) {
+ return entry;
+ }
+
+ @Override
+ public Cell filterCell(Entry entry, Cell cell) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testChainWALEntryWithCellFilter() {
+ Entry userEntry = createEntry(null, a, b, c);
+ ChainWALEntryFilter filterSomeCells =
+ new ChainWALEntryFilter(new FilterSomeCellsWALCellFilter());
+ // since WALCellFilter filter cells with rowkey 'a'
+ assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry));
+
+ Entry userEntry2 = createEntry(null, b, c, d);
+ // since there is no cell to get filtered, nothing should get filtered
+ assertEquals(userEntry2, filterSomeCells.filter(userEntry2));
+
+ // since we filter all the cells, we should get empty entry
+ ChainWALEntryFilter filterAllCells =
+ new ChainWALEntryFilter(new FilterAllCellsWALCellFilter());
+ assertEquals(createEntry(null), filterAllCells.filter(userEntry));
+ }
+
+ @Test
+ public void testChainWALEmptyEntryWithCellFilter() {
+ Entry userEntry = createEntry(null, a, b, c);
+ ChainWALEmptyEntryFilter filterSomeCells =
+ new ChainWALEmptyEntryFilter(new FilterSomeCellsWALCellFilter());
+ // since WALCellFilter filter cells with rowkey 'a'
+ assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry));
+
+ Entry userEntry2 = createEntry(null, b, c, d);
+ // since there is no cell to get filtered, nothing should get filtered
+ assertEquals(userEntry2, filterSomeCells.filter(userEntry2));
+
+ ChainWALEmptyEntryFilter filterAllCells =
+ new ChainWALEmptyEntryFilter(new FilterAllCellsWALCellFilter());
+ assertEquals(createEntry(null), filterAllCells.filter(userEntry));
+ // let's set the filter empty entry flag to true now for the above case
+ filterAllCells.setFilterEmptyEntry(true);
+ // since WALCellFilter filter all cells, whole entry should be filtered
+ assertEquals(null, filterAllCells.filter(userEntry));
+ }
+
@Test
public void testChainWALEntryFilter() {
Entry userEntry = createEntry(null, a, b, c);