diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java new file mode 100644 index 00000000000..3599d10a339 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +import com.google.common.base.Predicate; + +public class BulkLoadCellFilter { + private static final Log LOG = LogFactory.getLog(BulkLoadCellFilter.class); + + /** + * Filters the bulk load cell using the supplied predicate. + * @param cell The WAL cell to filter. + * @param famPredicate Returns true of given family should be removed. + * @return The filtered cell. + */ + public Cell filterCell(Cell cell, Predicate famPredicate) { + byte[] fam; + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(cell); + } catch (IOException e) { + LOG.warn("Failed to get bulk load events information from the WAL file.", e); + return cell; + } + List storesList = bld.getStoresList(); + // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList + List copiedStoresList = new ArrayList(storesList); + Iterator copiedStoresListIterator = copiedStoresList.iterator(); + boolean anyStoreRemoved = false; + while (copiedStoresListIterator.hasNext()) { + StoreDescriptor sd = copiedStoresListIterator.next(); + fam = sd.getFamilyName().toByteArray(); + if (famPredicate.apply(fam)) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } + } + + if (!anyStoreRemoved) { + return cell; + } else if (copiedStoresList.isEmpty()) { + return null; + } + BulkLoadDescriptor.Builder newDesc = + BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) + .setEncodedRegionName(bld.getEncodedRegionName()) + .setBulkloadSeqNum(bld.getBulkloadSeqNum()); + newDesc.addAllStores(copiedStoresList); + BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); + return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, + cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 6a3981ae7cc..1d67faade39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -34,9 +35,11 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; public class ChainWALEntryFilter implements WALEntryFilter { private final WALEntryFilter[] filters; + private WALCellFilter[] cellFilters; public ChainWALEntryFilter(WALEntryFilter...filters) { this.filters = filters; + initCellFilters(); } public ChainWALEntryFilter(List filters) { @@ -49,8 +52,18 @@ public class ChainWALEntryFilter implements WALEntryFilter { rawFilters.add(filter); } } - this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]); + initCellFilters(); + } + + public void initCellFilters() { + ArrayList cellFilters = new ArrayList<>(filters.length); + for (WALEntryFilter filter : filters) { + if (filter instanceof WALCellFilter) { + cellFilters.add((WALCellFilter) filter); + } + } + this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]); } @Override @@ -61,7 +74,30 @@ public class ChainWALEntryFilter implements WALEntryFilter { } entry = filter.filter(entry); } + filterCells(entry); return entry; } + private void filterCells(Entry entry) { + if (entry == null || cellFilters.length == 0) { + return; + } + ArrayList cells = entry.getEdit().getCells(); + int size = cells.size(); + for (int i = size - 1; i >= 0; i--) { + Cell cell = cells.get(i); + for (WALCellFilter filter : cellFilters) { + cell = filter.filterCell(entry, cell); + if (cell != null) { + cells.set(i, cell); + } else { + cells.remove(i); + break; + } + } + } + if (cells.size() < size / 2) { + cells.trimToSize(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 516ab8cbb48..cb945e3deb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -18,29 +18,24 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.NavigableMap; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; +import com.google.common.base.Predicate; + /** * Keeps KVs that are scoped other than local */ @InterfaceAudience.Private -public class ScopeWALEntryFilter implements WALEntryFilter { - private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class); +public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { + + BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); @Override public Entry filter(Entry entry) { @@ -48,72 +43,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter { if (scopes == null || scopes.isEmpty()) { return null; } - ArrayList cells = entry.getEdit().getCells(); - int size = cells.size(); - byte[] fam; - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // If a bulk load entry has a scope then that means user has enabled replication for - // bulk load hfiles. - // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } - } else { - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - fam = CellUtil.cloneFamily(cell); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - cells.remove(i); - } - } - } - if (cells.size() < size / 2) { - cells.trimToSize(); - } return entry; } - private Cell filterBulkLoadEntries(NavigableMap scopes, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List copiedStoresList = new ArrayList(storesList); - Iterator copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; + @Override + public Cell filterCell(Entry entry, Cell cell) { + final NavigableMap scopes = entry.getKey().getScopes(); + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + byte[] fam = CellUtil.cloneFamily(cell); + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate() { + @Override + public boolean apply(byte[] fam) { + return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL; + } + }); + } else { + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { + return null; + } } - } - - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { - return null; - } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return cell; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index 6c2a752d7cb..1cc70429aa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,16 +26,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.util.Bytes; -public class TableCfWALEntryFilter implements WALEntryFilter { +import com.google.common.base.Predicate; + +public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class); - private final ReplicationPeer peer; + private ReplicationPeer peer; + private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); public TableCfWALEntryFilter(ReplicationPeer peer) { this.peer = peer; @@ -47,91 +45,56 @@ public class TableCfWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { TableName tabName = entry.getKey().getTablename(); - ArrayList cells = entry.getEdit().getCells(); - Map> tableCFs = null; + Map> tableCFs = getTableCfs(); + // If null means user has explicitly not configured any table CFs so all the tables data are + // applicable for replication + if (tableCFs == null) return entry; + if (!tableCFs.containsKey(tabName)) { + return null; + } + + return entry; + } + + @Override + public Cell filterCell(final Entry entry, Cell cell) { + final Map> tableCfs = getTableCfs(); + if (tableCfs == null) return cell; + TableName tabName = entry.getKey().getTablename(); + List cfs = tableCfs.get(tabName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate() { + @Override + public boolean apply(byte[] fam) { + if (tableCfs != null) { + List cfs = tableCfs.get(entry.getKey().getTablename()); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + return true; + } + } + return false; + } + }); + } else { + if ((cfs != null) && !cfs.contains( + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { + return null; + } + } + return cell; + } + + Map> getTableCfs() { + Map> tableCFs = null; try { tableCFs = this.peer.getTableCFs(); } catch (IllegalArgumentException e) { LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + ", degenerate as if it's not configured by keeping tableCFs==null"); } - int size = cells.size(); - - // If null means user has explicitly not configured any table CFs so all the tables data are - // applicable for replication - if (tableCFs == null) { - return entry; - } - // return null(prevent replicating) if logKey's table isn't in this peer's - // replicable table list - if (!tableCFs.containsKey(tabName)) { - return null; - } else { - List cfs = tableCFs.get(tabName); - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. - // Filter bulk load entries separately - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } - } else { - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength()))) { - cells.remove(i); - } - } - } - } - if (cells.size() < size/2) { - cells.trimToSize(); - } - return entry; - } - - private Cell filterBulkLoadEntries(List cfs, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List copiedStoresList = new ArrayList(storesList); - Iterator copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; - } - } - - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { - return null; - } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return tableCFs; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java new file mode 100644 index 00000000000..78b3ed45a61 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +/** + * A filter for WAL entry cells before being sent over to replication. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface WALCellFilter { + + /** + * Applies the filter, possibly returning a different Cell instance. + * If null is returned, the cell will be skipped. + * @param entry Entry which contains the cell + * @param cell Cell to filter + * @return a (possibly modified) Cell to use. Returning null will cause the cell + * to be skipped for replication. + */ + public Cell filterCell(Entry entry, Cell cell); + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 30fc60364de..acb6a76cbfa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -76,7 +76,7 @@ public class TestReplicationWALEntryFilters { @Test public void testScopeWALEntryFilter() { - ScopeWALEntryFilter filter = new ScopeWALEntryFilter(); + WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter()); Entry userEntry = createEntry(a, b); Entry userEntryA = createEntry(a); @@ -205,14 +205,14 @@ public class TestReplicationWALEntryFilters { when(peer.getTableCFs()).thenReturn(null); Entry userEntry = createEntry(a, b, c); - TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(createEntry(a,b,c), filter.filter(userEntry)); // empty map userEntry = createEntry(a, b, c); Map> tableCfs = new HashMap>(); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table bar @@ -220,7 +220,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // table foo:a @@ -228,15 +228,15 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); - assertEquals(createEntry(a), filter.filter(userEntry)); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + assertEquals(createEntry(a), filter.filter(userEntry)); // table foo:a,c userEntry = createEntry(a, b, c, d); tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); assertEquals(createEntry(a,c), filter.filter(userEntry)); }