HBASE-26950 Use AsyncConnection in ReplicationSink (#4595)
Signed-off-by: Bryan Beaudreault <bbeaudreault@hubspot.com>
This commit is contained in:
parent
6dd1b58481
commit
e7551c1359
|
@ -18,8 +18,8 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
import java.util.List;
|
||||
|
|
|
@ -330,11 +330,6 @@ public final class ConnectionUtils {
|
|||
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
|
||||
}
|
||||
|
||||
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
|
||||
}
|
||||
|
||||
public static ScanResultCache createScanResultCache(Scan scan) {
|
||||
if (scan.getAllowPartialResults()) {
|
||||
return new AllowPartialScanResultCache();
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -197,4 +200,16 @@ public final class FutureUtils {
|
|||
future.completeExceptionally(e);
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new CompletableFuture that is completed when all of the given CompletableFutures
|
||||
* complete. If any of the given CompletableFutures complete exceptionally, then the returned
|
||||
* CompletableFuture also does so, with a CompletionException holding this exception as its cause.
|
||||
* Otherwise, the results of all given CompletableFutures could be obtained by the new returned
|
||||
* CompletableFuture.
|
||||
*/
|
||||
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -28,7 +27,9 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -39,15 +40,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -83,10 +86,18 @@ public class ReplicationSink {
|
|||
private final Configuration conf;
|
||||
// Volatile because of note in here -- look for double-checked locking:
|
||||
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
|
||||
private volatile Connection sharedHtableCon;
|
||||
/**
|
||||
* This shared {@link Connection} is used for handling bulk load hfiles replication.
|
||||
*/
|
||||
private volatile Connection sharedConnection;
|
||||
/**
|
||||
* This shared {@link AsyncConnection} is used for handling wal replication.
|
||||
*/
|
||||
private volatile AsyncConnection sharedAsyncConnection;
|
||||
private final MetricsSink metrics;
|
||||
private final AtomicLong totalReplicatedEdits = new AtomicLong();
|
||||
private final Object sharedHtableConLock = new Object();
|
||||
private final Object sharedConnectionLock = new Object();
|
||||
private final Object sharedAsyncConnectionLock = new Object();
|
||||
// Number of hfiles that we successfully replicated
|
||||
private long hfilesReplicated = 0;
|
||||
private SourceFSConfigurationProvider provider;
|
||||
|
@ -375,16 +386,31 @@ public class ReplicationSink {
|
|||
*/
|
||||
public void stopReplicationSinkServices() {
|
||||
try {
|
||||
if (this.sharedHtableCon != null) {
|
||||
synchronized (sharedHtableConLock) {
|
||||
if (this.sharedHtableCon != null) {
|
||||
this.sharedHtableCon.close();
|
||||
this.sharedHtableCon = null;
|
||||
if (this.sharedConnection != null) {
|
||||
synchronized (sharedConnectionLock) {
|
||||
if (this.sharedConnection != null) {
|
||||
this.sharedConnection.close();
|
||||
this.sharedConnection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
|
||||
// ignoring as we are closing.
|
||||
LOG.warn("IOException while closing the sharedConnection", e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (this.sharedAsyncConnection != null) {
|
||||
synchronized (sharedAsyncConnectionLock) {
|
||||
if (this.sharedAsyncConnection != null) {
|
||||
this.sharedAsyncConnection.close();
|
||||
this.sharedAsyncConnection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// ignoring as we are closing.
|
||||
LOG.warn("IOException while closing the sharedAsyncConnection", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -399,51 +425,69 @@ public class ReplicationSink {
|
|||
if (allRows.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Table table = null;
|
||||
try {
|
||||
Connection connection = getConnection();
|
||||
table = connection.getTable(tableName);
|
||||
for (List<Row> rows : allRows) {
|
||||
List<List<Row>> batchRows;
|
||||
if (rows.size() > batchRowSizeThreshold) {
|
||||
batchRows = Lists.partition(rows, batchRowSizeThreshold);
|
||||
} else {
|
||||
batchRows = Collections.singletonList(rows);
|
||||
}
|
||||
for (List<Row> rowList : batchRows) {
|
||||
table.batch(rowList, null);
|
||||
}
|
||||
AsyncTable<?> table = getAsyncConnection().getTable(tableName);
|
||||
List<Future<?>> futures = new ArrayList<>();
|
||||
for (List<Row> rows : allRows) {
|
||||
List<List<Row>> batchRows;
|
||||
if (rows.size() > batchRowSizeThreshold) {
|
||||
batchRows = Lists.partition(rows, batchRowSizeThreshold);
|
||||
} else {
|
||||
batchRows = Collections.singletonList(rows);
|
||||
}
|
||||
} catch (RetriesExhaustedWithDetailsException rewde) {
|
||||
for (Throwable ex : rewde.getCauses()) {
|
||||
if (ex instanceof TableNotFoundException) {
|
||||
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
for (Future<?> future : futures) {
|
||||
try {
|
||||
FutureUtils.get(future);
|
||||
} catch (RetriesExhaustedException e) {
|
||||
if (e.getCause() instanceof TableNotFoundException) {
|
||||
throw new TableNotFoundException("'" + tableName + "'");
|
||||
}
|
||||
}
|
||||
throw rewde;
|
||||
} catch (InterruptedException ix) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
|
||||
} finally {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the shared {@link Connection} which is used for handling bulk load hfiles replication.
|
||||
*/
|
||||
private Connection getConnection() throws IOException {
|
||||
// See https://en.wikipedia.org/wiki/Double-checked_locking
|
||||
Connection connection = sharedHtableCon;
|
||||
Connection connection = sharedConnection;
|
||||
if (connection == null) {
|
||||
synchronized (sharedHtableConLock) {
|
||||
connection = sharedHtableCon;
|
||||
synchronized (sharedConnectionLock) {
|
||||
connection = sharedConnection;
|
||||
if (connection == null) {
|
||||
connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
sharedConnection = connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the shared {@link AsyncConnection} which is used for handling wal replication.
|
||||
*/
|
||||
private AsyncConnection getAsyncConnection() throws IOException {
|
||||
// See https://en.wikipedia.org/wiki/Double-checked_locking
|
||||
AsyncConnection asyncConnection = sharedAsyncConnection;
|
||||
if (asyncConnection == null) {
|
||||
synchronized (sharedAsyncConnectionLock) {
|
||||
asyncConnection = sharedAsyncConnection;
|
||||
if (asyncConnection == null) {
|
||||
/**
|
||||
* Get the AsyncConnection immediately.
|
||||
*/
|
||||
asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf));
|
||||
sharedAsyncConnection = asyncConnection;
|
||||
}
|
||||
}
|
||||
}
|
||||
return asyncConnection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a string representation of this sink's metrics
|
||||
* @return string with the total replicated edits count and the date of the last edit that was
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
|
||||
/**
|
||||
* Can be overridden in UT if you only want to implement part of the methods in {@link AsyncTable}.
|
||||
*/
|
||||
public class DummyAsyncTable<C extends ScanResultConsumerBase> implements AsyncTable<C> {
|
||||
|
||||
@Override
|
||||
public TableName getName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<TableDescriptor> getDescriptor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncTableRegionLocator getRegionLocator() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getScanTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> get(Get get) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> put(Put put) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> delete(Delete delete) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> append(Append append) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> increment(Increment increment) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<CheckAndMutateResult>>
|
||||
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> mutateRow(RowMutations mutation) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scan(Scan scan, C consumer) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(Scan scan) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<Result>> scanAll(Scan scan) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> put(List<Put> puts) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CompletableFuture<R> coprocessorService(
|
||||
Function<com.google.protobuf.RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
|
||||
byte[] row) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
|
||||
Function<com.google.protobuf.RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
|
||||
CoprocessorCallback<R> callback) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
||||
/**
|
||||
* Can be overridden in UT if you only want to implement part of the methods in
|
||||
* {@link ConnectionRegistry}.
|
||||
*/
|
||||
public class DummyConnectionRegistry implements ConnectionRegistry {
|
||||
|
||||
public static final String REGISTRY_IMPL_CONF_KEY =
|
||||
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
|
||||
|
||||
@Override
|
||||
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getClusterId() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ServerName> getActiveMaster() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getConnectionString() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
|
@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -330,8 +330,8 @@ public class TestReplicationSink {
|
|||
try {
|
||||
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
|
||||
} catch (RetriesExhaustedWithDetailsException e) {
|
||||
Assert.fail("Should re-throw RetriesExhaustedException.");
|
||||
} catch (RetriesExhaustedException e) {
|
||||
} finally {
|
||||
admin.enableTable(TABLE_NAME1);
|
||||
}
|
||||
|
|
|
@ -23,9 +23,8 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -34,34 +33,26 @@ import org.apache.hadoop.hbase.CellBuilder;
|
|||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
||||
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
|
||||
import org.apache.hadoop.hbase.client.AsyncAdminBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncTable;
|
||||
import org.apache.hadoop.hbase.client.AsyncTableBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.ConnectionRegistry;
|
||||
import org.apache.hadoop.hbase.client.DummyAsyncTable;
|
||||
import org.apache.hadoop.hbase.client.DummyConnectionRegistry;
|
||||
import org.apache.hadoop.hbase.client.Hbck;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.client.ScanResultConsumer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -123,9 +114,12 @@ public class TestWALEntrySinkFilter {
|
|||
public void testWALEntryFilter() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Make it so our filter is instantiated on construction of ReplicationSink.
|
||||
conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class,
|
||||
DummyConnectionRegistry.class);
|
||||
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
|
||||
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
|
||||
conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class);
|
||||
conf.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
|
||||
DevNullAsyncConnection.class, AsyncConnection.class);
|
||||
ReplicationSink sink = new ReplicationSink(conf);
|
||||
// Create some dumb walentries.
|
||||
List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries =
|
||||
|
@ -203,55 +197,79 @@ public class TestWALEntrySinkFilter {
|
|||
}
|
||||
}
|
||||
|
||||
public static class DevNullConnectionRegistry extends DummyConnectionRegistry {
|
||||
|
||||
public DevNullConnectionRegistry(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getClusterId() {
|
||||
return CompletableFuture.completedFuture("test");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A DevNull Connection whose only purpose is checking what edits made it through. See down in
|
||||
* {@link Table#batch(List, Object[])}.
|
||||
* A DevNull AsyncConnection whose only purpose is checking what edits made it through. See down
|
||||
* in {@link AsyncTable#batchAll}.
|
||||
*/
|
||||
public static class DevNullConnection implements Connection {
|
||||
private final Configuration configuration;
|
||||
public static class DevNullAsyncConnection implements AsyncConnection {
|
||||
|
||||
DevNullConnection(Configuration configuration, ExecutorService es, User user) {
|
||||
this.configuration = configuration;
|
||||
private final Configuration conf;
|
||||
|
||||
public DevNullAsyncConnection(Configuration conf, ConnectionRegistry registry, String clusterId,
|
||||
User user) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
|
||||
public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
|
||||
public void clearRegionLocationCache() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
||||
public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
|
||||
ExecutorService pool) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Admin getAdmin() throws IOException {
|
||||
public AsyncAdminBuilder getAdminBuilder() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
|
||||
ExecutorService pool) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Hbck> getHbck() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Hbck getHbck(ServerName masterServer) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -260,320 +278,31 @@ public class TestWALEntrySinkFilter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
|
||||
return new TableBuilder() {
|
||||
@Override
|
||||
public TableBuilder setOperationTimeout(int timeout) {
|
||||
return this;
|
||||
}
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
|
||||
return new DummyAsyncTable<AdvancedScanResultConsumer>() {
|
||||
|
||||
@Override
|
||||
public TableBuilder setRpcTimeout(int timeout) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableBuilder setReadRpcTimeout(int timeout) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableBuilder setWriteRpcTimeout(int timeout) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table build() {
|
||||
return new Table() {
|
||||
@Override
|
||||
public TableName getName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor getTableDescriptor() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableDescriptor getDescriptor() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(Get get) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean[] exists(List<Get> gets) throws IOException {
|
||||
return new boolean[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batch(List<? extends Row> actions, Object[] results)
|
||||
throws IOException, InterruptedException {
|
||||
for (Row action : actions) {
|
||||
// Row is the index of the loop above where we make WALEntry and Cells.
|
||||
int row = Bytes.toInt(action.getRow());
|
||||
assertTrue("" + row, row > BOUNDARY);
|
||||
UNFILTERED.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
|
||||
Batch.Callback<R> callback) throws IOException, InterruptedException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result get(Get get) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] get(List<Get> gets) throws IOException {
|
||||
return new Result[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Put put) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(List<Put> puts) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value,
|
||||
Put put) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Put put) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(List<Delete> deletes) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value,
|
||||
Delete delete) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Delete delete) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result mutateRow(RowMutations rm) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(Append append) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(Increment increment) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
|
||||
long amount, Durability durability) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public CoprocessorRpcChannel coprocessorService(byte[] row) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(
|
||||
Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
|
||||
throws com.google.protobuf.ServiceException, Throwable {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends com.google.protobuf.Service, R> void coprocessorService(
|
||||
Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable,
|
||||
Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(
|
||||
com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor,
|
||||
com.google.protobuf.Message request, byte[] startKey, byte[] endKey,
|
||||
R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R extends com.google.protobuf.Message> void batchCoprocessorService(
|
||||
com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor,
|
||||
com.google.protobuf.Message request, byte[] startKey, byte[] endKey,
|
||||
R responsePrototype, Batch.Callback<R> callback)
|
||||
throws com.google.protobuf.ServiceException, Throwable {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation)
|
||||
throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRpcTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRpcTimeout(int rpcTimeout) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReadRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReadRpcTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReadRpcTimeout(int readRpcTimeout) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteRpcTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getWriteRpcTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWriteRpcTimeout(int writeRpcTimeout) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOperationTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOperationTimeout(int operationTimeout) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocator getRegionLocator() throws IOException {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
|
||||
List<T> list = new ArrayList<>(actions.size());
|
||||
for (Row action : actions) {
|
||||
// Row is the index of the loop above where we make WALEntry and Cells.
|
||||
int row = Bytes.toInt(action.getRow());
|
||||
assertTrue("" + row, row > BOUNDARY);
|
||||
UNFILTERED.incrementAndGet();
|
||||
list.add(null);
|
||||
}
|
||||
return CompletableFuture.completedFuture(list);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionLocationCache() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterId() {
|
||||
return null;
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue