HBASE-19494 Create simple WALKey filter that can be plugged in on the Replication Sink

Implement new WALEntrySinkFilter (as opposed to WALEntryFilter) and
specify the implmentation (with a no-param constructor) in config
using property hbase.replication.sink.walentrysinkfilter

Signed-off-by: wolfgang hoschek whoscheck@cloudera.com
This commit is contained in:
Michael Stack 2017-12-18 12:57:53 -08:00
parent df351e4035
commit 32f6fd41c2
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
4 changed files with 650 additions and 12 deletions

View File

@ -25,10 +25,16 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* A Filter for WAL entries before being sent over to replication. Multiple
* filters might be chained together using {@link ChainWALEntryFilter}.
* Applied on the replication source side.
* <p>There is also a filter that can be installed on the sink end of a replication stream.
* See {@link org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter}. Certain
* use-cases may need such a facility but better to filter here on the source side rather
* than later, after the edit arrives at the sink.</p>
* @see org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter for filtering
* replication on the sink-side.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter {
/**
* Applies the filter, possibly returning a different Entry instance.
* If null is returned, the entry will be skipped.
@ -37,5 +43,4 @@ public interface WALEntryFilter {
* no cells will cause the entry to be skipped for replication.
*/
public Entry filter(Entry entry);
}

View File

@ -89,6 +89,7 @@ public class ReplicationSink {
// Number of hfiles that we successfully replicated
private long hfilesReplicated = 0;
private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter;
/**
* Create a sink for replication
@ -102,7 +103,7 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
String className =
conf.get("hbase.replication.source.fs.conf.provider",
DefaultSourceFSConfigurationProvider.class.getCanonicalName());
@ -116,6 +117,22 @@ public class ReplicationSink {
}
}
private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
Class<?> walEntryFilterClass =
this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
WALEntrySinkFilter filter = null;
try {
filter = walEntryFilterClass == null? null:
(WALEntrySinkFilter)walEntryFilterClass.newInstance();
} catch (Exception e) {
LOG.warn("Failed to instantiate " + walEntryFilterClass);
}
if (filter != null) {
filter.init(getConnection());
}
return filter;
}
/**
* decorate the Configuration object to make replication more receptive to delays:
* lessen the timeout and numTries.
@ -134,8 +151,6 @@ public class ReplicationSink {
/**
* Replicate this array of entries directly into the local cluster using the native client. Only
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries
* @param cells
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
@ -147,7 +162,6 @@ public class ReplicationSink {
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
if (entries.isEmpty()) return;
if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
// Very simple optimization where we batch sequences of rows going
// to the same table.
try {
@ -162,8 +176,21 @@ public class ReplicationSink {
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
// Skip Cells in CellScanner associated with this entry.
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
continue;
}
}
Cell previousCell = null;
Mutation m = null;
Mutation mutation = null;
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
@ -181,7 +208,7 @@ public class ReplicationSink {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
// Create new mutation
m =
mutation =
CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
@ -189,13 +216,13 @@ public class ReplicationSink {
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
}
m.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, m);
mutation.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
((Delete) m).add(cell);
((Delete) mutation).add(cell);
} else {
((Put) m).add(cell);
((Put) mutation).add(cell);
}
previousCell = cell;
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Implementations are installed on a Replication Sink called from inside
* ReplicationSink#replicateEntries to filter replicated WALEntries based off WALEntry attributes.
* Currently only table name and replication write time are exposed (WALEntry is a private,
* internal class so we cannot pass it here). To install, set
* <code>hbase.replication.sink.walentryfilter</code> to the name of the implementing
* class. Implementing class must have a no-param Constructor.
* <p>This filter is of limited use. It is better to filter on the replication source rather than
* here after the edits have been shipped on the replication sink. That said, applications such
* as the hbase-indexer want to filter out any edits that were made before replication was enabled.
* @see org.apache.hadoop.hbase.replication.WALEntryFilter for filtering on the replication
* source-side.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntrySinkFilter {
/**
* Name of configuration to set with name of implementing WALEntrySinkFilter class.
*/
public static final String WAL_ENTRY_FILTER_KEY = "hbase.replication.sink.walentrysinkfilter";
/**
* Called after Construction.
* Use passed Connection to keep any context the filter might need.
*/
void init(Connection connection);
/**
* @param table Table edit is destined for.
* @param writeTime Time at which the edit was created on the source.
* @return True if we are to filter out the edit.
*/
boolean filter(TableName table, long writeTime);
}

View File

@ -0,0 +1,549 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Simple test of sink-side wal entry filter facility.
*/
@Category({ReplicationTests.class, SmallTests.class})
public class TestWALEntrySinkFilter {
private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
@Rule public TestName name = new TestName();
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
withTimeout(this.getClass()).
withLookingForStuckThread(true).
build();
static final int BOUNDARY = 5;
static final AtomicInteger UNFILTERED = new AtomicInteger();
static final AtomicInteger FILTERED = new AtomicInteger();
/**
* Implemetentation of Stoppable to pass into ReplicationSink.
*/
private static Stoppable STOPPABLE = new Stoppable() {
private final AtomicBoolean stop = new AtomicBoolean(false);
@Override
public boolean isStopped() {
return this.stop.get();
}
@Override
public void stop(String why) {
LOG.info("STOPPING BECAUSE: " + why);
this.stop.set(true);
}
};
/**
* Test filter.
* Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we
* filter out and we count how many cells make it through for distribution way down below in the
* Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our
* counting Table.
* @throws IOException
*/
@Test
public void testWALEntryFilter() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Make it so our filter is instantiated on construction of ReplicationSink.
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
Connection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
// Create some dumb walentries.
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
// Need a tablename.
ByteString tableName =
ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
// Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
// describing the edit with all Cells from all edits aggregated in a single CellScanner.
final List<Cell> cells = new ArrayList<>();
int count = BOUNDARY * 2;
for(int i = 0; i < count; i++) {
byte [] bytes = Bytes.toBytes(i);
// Create a wal entry. Everything is set to the current index as bytes or int/long.
entryBuilder.clear();
entryBuilder.setKey(entryBuilder.getKeyBuilder().
setLogSequenceNumber(i).
setEncodedRegionName(ByteString.copyFrom(bytes)).
setWriteTime(i).
setTableName(tableName).build());
// Lets have one Cell associated with each WALEdit.
entryBuilder.setAssociatedCellCount(1);
entries.add(entryBuilder.build());
// We need to add a Cell per WALEdit to the cells array.
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
// Make cells whose row, family, cell, value, and ts are == 'i'.
Cell cell = cellBuilder.
setRow(bytes).
setFamily(bytes).
setQualifier(bytes).
setType(Cell.DataType.Put).
setTimestamp(i).
setValue(bytes).build();
cells.add(cell);
}
// Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
// all Cells from all the WALEntries made above.
CellScanner cellScanner = new CellScanner() {
// Set to -1 because advance gets called before current.
int index = -1;
@Override
public Cell current() {
return cells.get(index);
}
@Override
public boolean advance() throws IOException {
index++;
return index < cells.size();
}
};
// Call our sink.
sink.replicateEntries(entries, cellScanner, null, null, null);
// Check what made it through and what was filtered.
assertTrue(FILTERED.get() > 0);
assertTrue(UNFILTERED.get() > 0);
assertEquals(count, FILTERED.get() + UNFILTERED.get());
}
/**
* Simple filter that will filter out any entry wholse writeTime is <= 5.
*/
public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {}
@Override
public void init(Connection connection) {
// Do nothing.
}
@Override
public boolean filter(TableName table, long writeTime) {
boolean b = writeTime <= BOUNDARY;
if (b) {
FILTERED.incrementAndGet();
}
return b;
}
}
/**
* A DevNull Connection whose only purpose is checking what edits made it through. See down in
* {@link Table#batch(List, Object[])}.
*/
public static class DevNullConnection implements Connection {
private final Configuration configuration;
DevNullConnection(Configuration configuration, ExecutorService es, User user) {
this.configuration = configuration;
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public Configuration getConfiguration() {
return this.configuration;
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
return null;
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
return null;
}
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return null;
}
@Override
public Admin getAdmin() throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
return new TableBuilder() {
@Override
public TableBuilder setOperationTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setRpcTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setReadRpcTimeout(int timeout) {
return this;
}
@Override
public TableBuilder setWriteRpcTimeout(int timeout) {
return this;
}
@Override
public Table build() {
return new Table() {
@Override
public TableName getName() {
return tableName;
}
@Override
public Configuration getConfiguration() {
return configuration;
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
return null;
}
@Override
public TableDescriptor getDescriptor() throws IOException {
return null;
}
@Override
public boolean exists(Get get) throws IOException {
return false;
}
@Override
public boolean[] exists(List<Get> gets) throws IOException {
return new boolean[0];
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
for (Row action: actions) {
// Row is the index of the loop above where we make WALEntry and Cells.
int row = Bytes.toInt(action.getRow());
assertTrue("" + row, row> BOUNDARY);
UNFILTERED.incrementAndGet();
}
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
}
@Override
public Result get(Get get) throws IOException {
return null;
}
@Override
public Result[] get(List<Get> gets) throws IOException {
return new Result[0];
}
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
return null;
}
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
return null;
}
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
return null;
}
@Override
public void put(Put put) throws IOException {
}
@Override
public void put(List<Put> puts) throws IOException {
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
return false;
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
return false;
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
return false;
}
@Override
public void delete(Delete delete) throws IOException {
}
@Override
public void delete(List<Delete> deletes) throws IOException {
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
return false;
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
return false;
}
@Override
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
return false;
}
@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return null;
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
}
@Override
public Result append(Append append) throws IOException {
return null;
}
@Override
public Result increment(Increment increment) throws IOException {
return null;
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
return 0;
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
return 0;
}
@Override
public void close() throws IOException {
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return null;
}
@Override
public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable {
return null;
}
@Override
public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
}
@Override
public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
return null;
}
@Override
public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
return false;
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
return false;
}
@Override
public long getRpcTimeout(TimeUnit unit) {
return 0;
}
@Override
public int getRpcTimeout() {
return 0;
}
@Override
public void setRpcTimeout(int rpcTimeout) {
}
@Override
public long getReadRpcTimeout(TimeUnit unit) {
return 0;
}
@Override
public int getReadRpcTimeout() {
return 0;
}
@Override
public void setReadRpcTimeout(int readRpcTimeout) {
}
@Override
public long getWriteRpcTimeout(TimeUnit unit) {
return 0;
}
@Override
public int getWriteRpcTimeout() {
return 0;
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
}
@Override
public long getOperationTimeout(TimeUnit unit) {
return 0;
}
@Override
public int getOperationTimeout() {
return 0;
}
@Override
public void setOperationTimeout(int operationTimeout) {
}
};
}
};
}
}
}