HBASE-19494 Create simple WALKey filter that can be plugged in on the Replication Sink
Implement new WALEntrySinkFilter (as opposed to WALEntryFilter) and specify the implmentation (with a no-param constructor) in config using property hbase.replication.sink.walentrysinkfilter Signed-off-by: wolfgang hoschek whoscheck@cloudera.com
This commit is contained in:
parent
7348ca2bd2
commit
31ebd24b7d
|
@ -25,10 +25,16 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
/**
|
/**
|
||||||
* A Filter for WAL entries before being sent over to replication. Multiple
|
* A Filter for WAL entries before being sent over to replication. Multiple
|
||||||
* filters might be chained together using {@link ChainWALEntryFilter}.
|
* filters might be chained together using {@link ChainWALEntryFilter}.
|
||||||
|
* Applied on the replication source side.
|
||||||
|
* <p>There is also a filter that can be installed on the sink end of a replication stream.
|
||||||
|
* See {@link org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter}. Certain
|
||||||
|
* use-cases may need such a facility but better to filter here on the source side rather
|
||||||
|
* than later, after the edit arrives at the sink.</p>
|
||||||
|
* @see org.apache.hadoop.hbase.replication.regionserver.WALEntrySinkFilter for filtering
|
||||||
|
* replication on the sink-side.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||||
public interface WALEntryFilter {
|
public interface WALEntryFilter {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies the filter, possibly returning a different Entry instance.
|
* Applies the filter, possibly returning a different Entry instance.
|
||||||
* If null is returned, the entry will be skipped.
|
* If null is returned, the entry will be skipped.
|
||||||
|
@ -37,5 +43,4 @@ public interface WALEntryFilter {
|
||||||
* no cells will cause the entry to be skipped for replication.
|
* no cells will cause the entry to be skipped for replication.
|
||||||
*/
|
*/
|
||||||
public Entry filter(Entry entry);
|
public Entry filter(Entry entry);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,6 +89,7 @@ public class ReplicationSink {
|
||||||
// Number of hfiles that we successfully replicated
|
// Number of hfiles that we successfully replicated
|
||||||
private long hfilesReplicated = 0;
|
private long hfilesReplicated = 0;
|
||||||
private SourceFSConfigurationProvider provider;
|
private SourceFSConfigurationProvider provider;
|
||||||
|
private WALEntrySinkFilter walEntrySinkFilter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a sink for replication
|
* Create a sink for replication
|
||||||
|
@ -102,7 +103,7 @@ public class ReplicationSink {
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
decorateConf();
|
decorateConf();
|
||||||
this.metrics = new MetricsSink();
|
this.metrics = new MetricsSink();
|
||||||
|
this.walEntrySinkFilter = setupWALEntrySinkFilter();
|
||||||
String className =
|
String className =
|
||||||
conf.get("hbase.replication.source.fs.conf.provider",
|
conf.get("hbase.replication.source.fs.conf.provider",
|
||||||
DefaultSourceFSConfigurationProvider.class.getCanonicalName());
|
DefaultSourceFSConfigurationProvider.class.getCanonicalName());
|
||||||
|
@ -116,6 +117,22 @@ public class ReplicationSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
|
||||||
|
Class<?> walEntryFilterClass =
|
||||||
|
this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
|
||||||
|
WALEntrySinkFilter filter = null;
|
||||||
|
try {
|
||||||
|
filter = walEntryFilterClass == null? null:
|
||||||
|
(WALEntrySinkFilter)walEntryFilterClass.newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to instantiate " + walEntryFilterClass);
|
||||||
|
}
|
||||||
|
if (filter != null) {
|
||||||
|
filter.init(getConnection());
|
||||||
|
}
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* decorate the Configuration object to make replication more receptive to delays:
|
* decorate the Configuration object to make replication more receptive to delays:
|
||||||
* lessen the timeout and numTries.
|
* lessen the timeout and numTries.
|
||||||
|
@ -134,8 +151,6 @@ public class ReplicationSink {
|
||||||
/**
|
/**
|
||||||
* Replicate this array of entries directly into the local cluster using the native client. Only
|
* Replicate this array of entries directly into the local cluster using the native client. Only
|
||||||
* operates against raw protobuf type saving on a conversion from pb to pojo.
|
* operates against raw protobuf type saving on a conversion from pb to pojo.
|
||||||
* @param entries
|
|
||||||
* @param cells
|
|
||||||
* @param replicationClusterId Id which will uniquely identify source cluster FS client
|
* @param replicationClusterId Id which will uniquely identify source cluster FS client
|
||||||
* configurations in the replication configuration directory
|
* configurations in the replication configuration directory
|
||||||
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
|
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
|
||||||
|
@ -147,7 +162,6 @@ public class ReplicationSink {
|
||||||
String replicationClusterId, String sourceBaseNamespaceDirPath,
|
String replicationClusterId, String sourceBaseNamespaceDirPath,
|
||||||
String sourceHFileArchiveDirPath) throws IOException {
|
String sourceHFileArchiveDirPath) throws IOException {
|
||||||
if (entries.isEmpty()) return;
|
if (entries.isEmpty()) return;
|
||||||
if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
|
|
||||||
// Very simple optimization where we batch sequences of rows going
|
// Very simple optimization where we batch sequences of rows going
|
||||||
// to the same table.
|
// to the same table.
|
||||||
try {
|
try {
|
||||||
|
@ -162,8 +176,21 @@ public class ReplicationSink {
|
||||||
for (WALEntry entry : entries) {
|
for (WALEntry entry : entries) {
|
||||||
TableName table =
|
TableName table =
|
||||||
TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
||||||
|
if (this.walEntrySinkFilter != null) {
|
||||||
|
if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
|
||||||
|
// Skip Cells in CellScanner associated with this entry.
|
||||||
|
int count = entry.getAssociatedCellCount();
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
// Throw index out of bounds if our cell count is off
|
||||||
|
if (!cells.advance()) {
|
||||||
|
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
Cell previousCell = null;
|
Cell previousCell = null;
|
||||||
Mutation m = null;
|
Mutation mutation = null;
|
||||||
int count = entry.getAssociatedCellCount();
|
int count = entry.getAssociatedCellCount();
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
// Throw index out of bounds if our cell count is off
|
// Throw index out of bounds if our cell count is off
|
||||||
|
@ -181,7 +208,7 @@ public class ReplicationSink {
|
||||||
// Handle wal replication
|
// Handle wal replication
|
||||||
if (isNewRowOrType(previousCell, cell)) {
|
if (isNewRowOrType(previousCell, cell)) {
|
||||||
// Create new mutation
|
// Create new mutation
|
||||||
m =
|
mutation =
|
||||||
CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
|
CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
|
||||||
cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
|
cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
|
||||||
cell.getRowLength());
|
cell.getRowLength());
|
||||||
|
@ -189,13 +216,13 @@ public class ReplicationSink {
|
||||||
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
|
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
|
||||||
clusterIds.add(toUUID(clusterId));
|
clusterIds.add(toUUID(clusterId));
|
||||||
}
|
}
|
||||||
m.setClusterIds(clusterIds);
|
mutation.setClusterIds(clusterIds);
|
||||||
addToHashMultiMap(rowMap, table, clusterIds, m);
|
addToHashMultiMap(rowMap, table, clusterIds, mutation);
|
||||||
}
|
}
|
||||||
if (CellUtil.isDelete(cell)) {
|
if (CellUtil.isDelete(cell)) {
|
||||||
((Delete) m).add(cell);
|
((Delete) mutation).add(cell);
|
||||||
} else {
|
} else {
|
||||||
((Put) m).add(cell);
|
((Put) mutation).add(cell);
|
||||||
}
|
}
|
||||||
previousCell = cell;
|
previousCell = cell;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementations are installed on a Replication Sink called from inside
|
||||||
|
* ReplicationSink#replicateEntries to filter replicated WALEntries based off WALEntry attributes.
|
||||||
|
* Currently only table name and replication write time are exposed (WALEntry is a private,
|
||||||
|
* internal class so we cannot pass it here). To install, set
|
||||||
|
* <code>hbase.replication.sink.walentryfilter</code> to the name of the implementing
|
||||||
|
* class. Implementing class must have a no-param Constructor.
|
||||||
|
* <p>This filter is of limited use. It is better to filter on the replication source rather than
|
||||||
|
* here after the edits have been shipped on the replication sink. That said, applications such
|
||||||
|
* as the hbase-indexer want to filter out any edits that were made before replication was enabled.
|
||||||
|
* @see org.apache.hadoop.hbase.replication.WALEntryFilter for filtering on the replication
|
||||||
|
* source-side.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||||
|
public interface WALEntrySinkFilter {
|
||||||
|
/**
|
||||||
|
* Name of configuration to set with name of implementing WALEntrySinkFilter class.
|
||||||
|
*/
|
||||||
|
public static final String WAL_ENTRY_FILTER_KEY = "hbase.replication.sink.walentrysinkfilter";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after Construction.
|
||||||
|
* Use passed Connection to keep any context the filter might need.
|
||||||
|
*/
|
||||||
|
void init(Connection connection);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param table Table edit is destined for.
|
||||||
|
* @param writeTime Time at which the edit was created on the source.
|
||||||
|
* @return True if we are to filter out the edit.
|
||||||
|
*/
|
||||||
|
boolean filter(TableName table, long writeTime);
|
||||||
|
}
|
|
@ -0,0 +1,549 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilder;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||||
|
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test of sink-side wal entry filter facility.
|
||||||
|
*/
|
||||||
|
@Category({ReplicationTests.class, SmallTests.class})
|
||||||
|
public class TestWALEntrySinkFilter {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
|
||||||
|
@Rule public TestName name = new TestName();
|
||||||
|
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||||
|
withTimeout(this.getClass()).
|
||||||
|
withLookingForStuckThread(true).
|
||||||
|
build();
|
||||||
|
static final int BOUNDARY = 5;
|
||||||
|
static final AtomicInteger UNFILTERED = new AtomicInteger();
|
||||||
|
static final AtomicInteger FILTERED = new AtomicInteger();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implemetentation of Stoppable to pass into ReplicationSink.
|
||||||
|
*/
|
||||||
|
private static Stoppable STOPPABLE = new Stoppable() {
|
||||||
|
private final AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return this.stop.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
LOG.info("STOPPING BECAUSE: " + why);
|
||||||
|
this.stop.set(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test filter.
|
||||||
|
* Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we
|
||||||
|
* filter out and we count how many cells make it through for distribution way down below in the
|
||||||
|
* Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our
|
||||||
|
* counting Table.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALEntryFilter() throws IOException {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
// Make it so our filter is instantiated on construction of ReplicationSink.
|
||||||
|
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
|
||||||
|
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
|
||||||
|
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
|
||||||
|
Connection.class);
|
||||||
|
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
|
||||||
|
// Create some dumb walentries.
|
||||||
|
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
|
||||||
|
new ArrayList<>();
|
||||||
|
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
|
||||||
|
// Need a tablename.
|
||||||
|
ByteString tableName =
|
||||||
|
ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
|
||||||
|
// Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
|
||||||
|
// describing the edit with all Cells from all edits aggregated in a single CellScanner.
|
||||||
|
final List<Cell> cells = new ArrayList<>();
|
||||||
|
int count = BOUNDARY * 2;
|
||||||
|
for(int i = 0; i < count; i++) {
|
||||||
|
byte [] bytes = Bytes.toBytes(i);
|
||||||
|
// Create a wal entry. Everything is set to the current index as bytes or int/long.
|
||||||
|
entryBuilder.clear();
|
||||||
|
entryBuilder.setKey(entryBuilder.getKeyBuilder().
|
||||||
|
setLogSequenceNumber(i).
|
||||||
|
setEncodedRegionName(ByteString.copyFrom(bytes)).
|
||||||
|
setWriteTime(i).
|
||||||
|
setTableName(tableName).build());
|
||||||
|
// Lets have one Cell associated with each WALEdit.
|
||||||
|
entryBuilder.setAssociatedCellCount(1);
|
||||||
|
entries.add(entryBuilder.build());
|
||||||
|
// We need to add a Cell per WALEdit to the cells array.
|
||||||
|
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||||
|
// Make cells whose row, family, cell, value, and ts are == 'i'.
|
||||||
|
Cell cell = cellBuilder.
|
||||||
|
setRow(bytes).
|
||||||
|
setFamily(bytes).
|
||||||
|
setQualifier(bytes).
|
||||||
|
setType(Cell.DataType.Put).
|
||||||
|
setTimestamp(i).
|
||||||
|
setValue(bytes).build();
|
||||||
|
cells.add(cell);
|
||||||
|
}
|
||||||
|
// Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
|
||||||
|
// all Cells from all the WALEntries made above.
|
||||||
|
CellScanner cellScanner = new CellScanner() {
|
||||||
|
// Set to -1 because advance gets called before current.
|
||||||
|
int index = -1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cell current() {
|
||||||
|
return cells.get(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean advance() throws IOException {
|
||||||
|
index++;
|
||||||
|
return index < cells.size();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Call our sink.
|
||||||
|
sink.replicateEntries(entries, cellScanner, null, null, null);
|
||||||
|
// Check what made it through and what was filtered.
|
||||||
|
assertTrue(FILTERED.get() > 0);
|
||||||
|
assertTrue(UNFILTERED.get() > 0);
|
||||||
|
assertEquals(count, FILTERED.get() + UNFILTERED.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple filter that will filter out any entry wholse writeTime is <= 5.
|
||||||
|
*/
|
||||||
|
public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
|
||||||
|
public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Connection connection) {
|
||||||
|
// Do nothing.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean filter(TableName table, long writeTime) {
|
||||||
|
boolean b = writeTime <= BOUNDARY;
|
||||||
|
if (b) {
|
||||||
|
FILTERED.incrementAndGet();
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A DevNull Connection whose only purpose is checking what edits made it through. See down in
|
||||||
|
* {@link Table#batch(List, Object[])}.
|
||||||
|
*/
|
||||||
|
public static class DevNullConnection implements Connection {
|
||||||
|
private final Configuration configuration;
|
||||||
|
|
||||||
|
DevNullConnection(Configuration configuration, ExecutorService es, User user) {
|
||||||
|
this.configuration = configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return this.configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Admin getAdmin() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isClosed() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
|
||||||
|
return new TableBuilder() {
|
||||||
|
@Override
|
||||||
|
public TableBuilder setOperationTimeout(int timeout) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableBuilder setRpcTimeout(int timeout) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableBuilder setReadRpcTimeout(int timeout) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableBuilder setWriteRpcTimeout(int timeout) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Table build() {
|
||||||
|
return new Table() {
|
||||||
|
@Override
|
||||||
|
public TableName getName() {
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableDescriptor getDescriptor() throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean exists(Get get) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean[] exists(List<Get> gets) throws IOException {
|
||||||
|
return new boolean[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
|
||||||
|
for (Row action: actions) {
|
||||||
|
// Row is the index of the loop above where we make WALEntry and Cells.
|
||||||
|
int row = Bytes.toInt(action.getRow());
|
||||||
|
assertTrue("" + row, row> BOUNDARY);
|
||||||
|
UNFILTERED.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result get(Get get) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result[] get(List<Get> gets) throws IOException {
|
||||||
|
return new Result[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultScanner getScanner(byte[] family) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(Put put) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(List<Put> puts) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(Delete delete) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(List<Delete> deletes) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mutateRow(RowMutations rm) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result append(Append append) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result increment(Increment increment) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getRpcTimeout(TimeUnit unit) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getRpcTimeout() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRpcTimeout(int rpcTimeout) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getReadRpcTimeout(TimeUnit unit) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getReadRpcTimeout() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setReadRpcTimeout(int readRpcTimeout) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getWriteRpcTimeout() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWriteRpcTimeout(int writeRpcTimeout) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getOperationTimeout(TimeUnit unit) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getOperationTimeout() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setOperationTimeout(int operationTimeout) {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue