diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index da1ca4afa1f..aa696966ca6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; +import static org.apache.hadoop.hbase.util.FutureUtils.allOf; import com.google.protobuf.RpcChannel; import java.util.List; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 4baa48ce55e..40d16fec49a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -330,11 +330,6 @@ public final class ConnectionUtils { return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; } - static CompletableFuture> allOf(List> futures) { - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); - } - public static ScanResultCache createScanResultCache(Scan scan) { if (scan.getAllowPartialResults()) { return new AllowPartialScanResultCache(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 9c96a0a1f9a..abe8fc3e89f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.util; +import static java.util.stream.Collectors.toList; + import java.io.IOException; import java.io.InterruptedIOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -197,4 +200,16 @@ public final class FutureUtils { future.completeExceptionally(e); return future; } + + /** + * Returns a new CompletableFuture that is completed when all of the given CompletableFutures + * complete. If any of the given CompletableFutures complete exceptionally, then the returned + * CompletableFuture also does so, with a CompletionException holding this exception as its cause. + * Otherwise, the results of all given CompletableFutures could be obtained by the new returned + * CompletableFuture. + */ + public static CompletableFuture> allOf(List> futures) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 616dcfeb8cc..86780142c92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -28,7 +27,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -39,15 +40,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.yetus.audience.InterfaceAudience; @@ -83,10 +86,18 @@ public class ReplicationSink { private final Configuration conf; // Volatile because of note in here -- look for double-checked locking: // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html - private volatile Connection sharedHtableCon; + /** + * This shared {@link Connection} is used for handling bulk load hfiles replication. + */ + private volatile Connection sharedConnection; + /** + * This shared {@link AsyncConnection} is used for handling wal replication. + */ + private volatile AsyncConnection sharedAsyncConnection; private final MetricsSink metrics; private final AtomicLong totalReplicatedEdits = new AtomicLong(); - private final Object sharedHtableConLock = new Object(); + private final Object sharedConnectionLock = new Object(); + private final Object sharedAsyncConnectionLock = new Object(); // Number of hfiles that we successfully replicated private long hfilesReplicated = 0; private SourceFSConfigurationProvider provider; @@ -375,16 +386,31 @@ public class ReplicationSink { */ public void stopReplicationSinkServices() { try { - if (this.sharedHtableCon != null) { - synchronized (sharedHtableConLock) { - if (this.sharedHtableCon != null) { - this.sharedHtableCon.close(); - this.sharedHtableCon = null; + if (this.sharedConnection != null) { + synchronized (sharedConnectionLock) { + if (this.sharedConnection != null) { + this.sharedConnection.close(); + this.sharedConnection = null; } } } } catch (IOException e) { - LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. + // ignoring as we are closing. + LOG.warn("IOException while closing the sharedConnection", e); + } + + try { + if (this.sharedAsyncConnection != null) { + synchronized (sharedAsyncConnectionLock) { + if (this.sharedAsyncConnection != null) { + this.sharedAsyncConnection.close(); + this.sharedAsyncConnection = null; + } + } + } + } catch (IOException e) { + // ignoring as we are closing. + LOG.warn("IOException while closing the sharedAsyncConnection", e); } } @@ -399,51 +425,69 @@ public class ReplicationSink { if (allRows.isEmpty()) { return; } - Table table = null; - try { - Connection connection = getConnection(); - table = connection.getTable(tableName); - for (List rows : allRows) { - List> batchRows; - if (rows.size() > batchRowSizeThreshold) { - batchRows = Lists.partition(rows, batchRowSizeThreshold); - } else { - batchRows = Collections.singletonList(rows); - } - for (List rowList : batchRows) { - table.batch(rowList, null); - } + AsyncTable table = getAsyncConnection().getTable(tableName); + List> futures = new ArrayList<>(); + for (List rows : allRows) { + List> batchRows; + if (rows.size() > batchRowSizeThreshold) { + batchRows = Lists.partition(rows, batchRowSizeThreshold); + } else { + batchRows = Collections.singletonList(rows); } - } catch (RetriesExhaustedWithDetailsException rewde) { - for (Throwable ex : rewde.getCauses()) { - if (ex instanceof TableNotFoundException) { + futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); + } + + for (Future future : futures) { + try { + FutureUtils.get(future); + } catch (RetriesExhaustedException e) { + if (e.getCause() instanceof TableNotFoundException) { throw new TableNotFoundException("'" + tableName + "'"); } - } - throw rewde; - } catch (InterruptedException ix) { - throw (InterruptedIOException) new InterruptedIOException().initCause(ix); - } finally { - if (table != null) { - table.close(); + throw e; } } } + /** + * Return the shared {@link Connection} which is used for handling bulk load hfiles replication. + */ private Connection getConnection() throws IOException { // See https://en.wikipedia.org/wiki/Double-checked_locking - Connection connection = sharedHtableCon; + Connection connection = sharedConnection; if (connection == null) { - synchronized (sharedHtableConLock) { - connection = sharedHtableCon; + synchronized (sharedConnectionLock) { + connection = sharedConnection; if (connection == null) { - connection = sharedHtableCon = ConnectionFactory.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); + sharedConnection = connection; } } } return connection; } + /** + * Return the shared {@link AsyncConnection} which is used for handling wal replication. + */ + private AsyncConnection getAsyncConnection() throws IOException { + // See https://en.wikipedia.org/wiki/Double-checked_locking + AsyncConnection asyncConnection = sharedAsyncConnection; + if (asyncConnection == null) { + synchronized (sharedAsyncConnectionLock) { + asyncConnection = sharedAsyncConnection; + if (asyncConnection == null) { + /** + * Get the AsyncConnection immediately. + */ + asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf)); + sharedAsyncConnection = asyncConnection; + } + } + } + return asyncConnection; + } + /** * Get a string representation of this sink's metrics * @return string with the total replicated edits count and the date of the last edit that was diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java new file mode 100644 index 00000000000..3aa99575a9e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; + +/** + * Can be overridden in UT if you only want to implement part of the methods in {@link AsyncTable}. + */ +public class DummyAsyncTable implements AsyncTable { + + @Override + public TableName getName() { + return null; + } + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public CompletableFuture getDescriptor() { + return null; + } + + @Override + public AsyncTableRegionLocator getRegionLocator() { + return null; + } + + @Override + public long getRpcTimeout(TimeUnit unit) { + return 0; + } + + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return 0; + } + + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return 0; + } + + @Override + public long getOperationTimeout(TimeUnit unit) { + return 0; + } + + @Override + public long getScanTimeout(TimeUnit unit) { + return 0; + } + + @Override + public CompletableFuture get(Get get) { + return null; + } + + @Override + public CompletableFuture put(Put put) { + return null; + } + + @Override + public CompletableFuture delete(Delete delete) { + return null; + } + + @Override + public CompletableFuture append(Append append) { + return null; + } + + @Override + public CompletableFuture increment(Increment increment) { + return null; + } + + @Override + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + return null; + } + + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return null; + } + + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + return null; + } + + @Override + public List> + checkAndMutate(List checkAndMutates) { + return null; + } + + @Override + public CompletableFuture mutateRow(RowMutations mutation) { + return null; + } + + @Override + public void scan(Scan scan, C consumer) { + } + + @Override + public ResultScanner getScanner(Scan scan) { + return null; + } + + @Override + public CompletableFuture> scanAll(Scan scan) { + return null; + } + + @Override + public List> get(List gets) { + return null; + } + + @Override + public List> put(List puts) { + return null; + } + + @Override + public List> delete(List deletes) { + return null; + } + + @Override + public List> batch(List actions) { + return null; + } + + @Override + public CompletableFuture coprocessorService( + Function stubMaker, ServiceCaller callable, + byte[] row) { + return null; + } + + @Override + public CoprocessorServiceBuilder coprocessorService( + Function stubMaker, ServiceCaller callable, + CoprocessorCallback callback) { + return null; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java new file mode 100644 index 00000000000..cc2e9493d03 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; + +/** + * Can be overridden in UT if you only want to implement part of the methods in + * {@link ConnectionRegistry}. + */ +public class DummyConnectionRegistry implements ConnectionRegistry { + + public static final String REGISTRY_IMPL_CONF_KEY = + HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; + + @Override + public CompletableFuture getMetaRegionLocations() { + return null; + } + + @Override + public CompletableFuture getClusterId() { + return null; + } + + @Override + public CompletableFuture getActiveMaster() { + return null; + } + + @Override + public String getConnectionString() { + return null; + } + + @Override + public void close() { + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index e9643b52997..73e9f0de6a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -330,8 +330,8 @@ public class TestReplicationSink { try { SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); - } catch (RetriesExhaustedWithDetailsException e) { + Assert.fail("Should re-throw RetriesExhaustedException."); + } catch (RetriesExhaustedException e) { } finally { admin.enableTable(TABLE_NAME1); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 57646018bed..d957d9957f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -23,9 +23,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -34,34 +33,26 @@ import org.apache.hadoop.hbase.CellBuilder; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncAdminBuilder; +import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.AsyncTableBuilder; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistry; +import org.apache.hadoop.hbase.client.DummyAsyncTable; +import org.apache.hadoop.hbase.client.DummyConnectionRegistry; +import org.apache.hadoop.hbase.client.Hbck; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableBuilder; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -123,9 +114,12 @@ public class TestWALEntrySinkFilter { public void testWALEntryFilter() throws IOException { Configuration conf = HBaseConfiguration.create(); // Make it so our filter is instantiated on construction of ReplicationSink. + conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class, + DummyConnectionRegistry.class); conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); - conf.setClass("hbase.client.connection.impl", DevNullConnection.class, Connection.class); + conf.setClass(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL, + DevNullAsyncConnection.class, AsyncConnection.class); ReplicationSink sink = new ReplicationSink(conf); // Create some dumb walentries. List entries = @@ -203,55 +197,79 @@ public class TestWALEntrySinkFilter { } } + public static class DevNullConnectionRegistry extends DummyConnectionRegistry { + + public DevNullConnectionRegistry(Configuration conf) { + } + + @Override + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture("test"); + } + } + /** - * A DevNull Connection whose only purpose is checking what edits made it through. See down in - * {@link Table#batch(List, Object[])}. + * A DevNull AsyncConnection whose only purpose is checking what edits made it through. See down + * in {@link AsyncTable#batchAll}. */ - public static class DevNullConnection implements Connection { - private final Configuration configuration; + public static class DevNullAsyncConnection implements AsyncConnection { - DevNullConnection(Configuration configuration, ExecutorService es, User user) { - this.configuration = configuration; + private final Configuration conf; + + public DevNullAsyncConnection(Configuration conf, ConnectionRegistry registry, String clusterId, + User user) { + this.conf = conf; } @Override - public void abort(String why, Throwable e) { - - } - - @Override - public boolean isAborted() { - return false; - } - - @Override - public Configuration getConfiguration() { - return this.configuration; - } - - @Override - public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + public AsyncTableRegionLocator getRegionLocator(TableName tableName) { return null; } @Override - public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + public void clearRegionLocationCache() { + } + + @Override + public AsyncTableBuilder getTableBuilder(TableName tableName) { return null; } @Override - public RegionLocator getRegionLocator(TableName tableName) throws IOException { + public AsyncTableBuilder getTableBuilder(TableName tableName, + ExecutorService pool) { return null; } @Override - public Admin getAdmin() throws IOException { + public AsyncAdminBuilder getAdminBuilder() { return null; } @Override - public void close() throws IOException { + public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { + return null; + } + @Override + public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { + return null; + } + + @Override + public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, + ExecutorService pool) { + return null; + } + + @Override + public CompletableFuture getHbck() { + return null; + } + + @Override + public Hbck getHbck(ServerName masterServer) throws IOException { + return null; } @Override @@ -260,320 +278,31 @@ public class TestWALEntrySinkFilter { } @Override - public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) { - return new TableBuilder() { - @Override - public TableBuilder setOperationTimeout(int timeout) { - return this; - } + public void close() throws IOException { + } + + @Override + public AsyncTable getTable(TableName tableName) { + return new DummyAsyncTable() { @Override - public TableBuilder setRpcTimeout(int timeout) { - return this; - } - - @Override - public TableBuilder setReadRpcTimeout(int timeout) { - return this; - } - - @Override - public TableBuilder setWriteRpcTimeout(int timeout) { - return this; - } - - @Override - public Table build() { - return new Table() { - @Override - public TableName getName() { - return tableName; - } - - @Override - public Configuration getConfiguration() { - return configuration; - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return null; - } - - @Override - public TableDescriptor getDescriptor() throws IOException { - return null; - } - - @Override - public boolean exists(Get get) throws IOException { - return false; - } - - @Override - public boolean[] exists(List gets) throws IOException { - return new boolean[0]; - } - - @Override - public void batch(List actions, Object[] results) - throws IOException, InterruptedException { - for (Row action : actions) { - // Row is the index of the loop above where we make WALEntry and Cells. - int row = Bytes.toInt(action.getRow()); - assertTrue("" + row, row > BOUNDARY); - UNFILTERED.incrementAndGet(); - } - } - - @Override - public void batchCallback(List actions, Object[] results, - Batch.Callback callback) throws IOException, InterruptedException { - - } - - @Override - public Result get(Get get) throws IOException { - return null; - } - - @Override - public Result[] get(List gets) throws IOException { - return new Result[0]; - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - return null; - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - return null; - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - return null; - } - - @Override - public void put(Put put) throws IOException { - - } - - @Override - public void put(List puts) throws IOException { - - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, - Put put) throws IOException { - return false; - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { - return false; - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Put put) throws IOException { - return false; - } - - @Override - public void delete(Delete delete) throws IOException { - - } - - @Override - public void delete(List deletes) throws IOException { - - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, - Delete delete) throws IOException { - return false; - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException { - return false; - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Delete delete) throws IOException { - return false; - } - - @Override - public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { - return null; - } - - @Override - public Result mutateRow(RowMutations rm) throws IOException { - return null; - } - - @Override - public Result append(Append append) throws IOException { - return null; - } - - @Override - public Result increment(Increment increment) throws IOException { - return null; - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) throws IOException { - return 0; - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - return null; - } - - @Override - public Map coprocessorService( - Class service, byte[] startKey, byte[] endKey, Batch.Call callable) - throws com.google.protobuf.ServiceException, Throwable { - return null; - } - - @Override - public void coprocessorService( - Class service, byte[] startKey, byte[] endKey, Batch.Call callable, - Batch.Callback callback) throws com.google.protobuf.ServiceException, Throwable { - - } - - @Override - public Map batchCoprocessorService( - com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, - com.google.protobuf.Message request, byte[] startKey, byte[] endKey, - R responsePrototype) throws com.google.protobuf.ServiceException, Throwable { - return null; - } - - @Override - public void batchCoprocessorService( - com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, - com.google.protobuf.Message request, byte[] startKey, byte[] endKey, - R responsePrototype, Batch.Callback callback) - throws com.google.protobuf.ServiceException, Throwable { - - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) - throws IOException { - return false; - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, RowMutations mutation) throws IOException { - return false; - } - - @Override - public long getRpcTimeout(TimeUnit unit) { - return 0; - } - - @Override - public int getRpcTimeout() { - return 0; - } - - @Override - public void setRpcTimeout(int rpcTimeout) { - - } - - @Override - public long getReadRpcTimeout(TimeUnit unit) { - return 0; - } - - @Override - public int getReadRpcTimeout() { - return 0; - } - - @Override - public void setReadRpcTimeout(int readRpcTimeout) { - - } - - @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return 0; - } - - @Override - public int getWriteRpcTimeout() { - return 0; - } - - @Override - public void setWriteRpcTimeout(int writeRpcTimeout) { - - } - - @Override - public long getOperationTimeout(TimeUnit unit) { - return 0; - } - - @Override - public int getOperationTimeout() { - return 0; - } - - @Override - public void setOperationTimeout(int operationTimeout) { - } - - @Override - public RegionLocator getRegionLocator() throws IOException { - return null; - } - }; + public CompletableFuture> batchAll(List actions) { + List list = new ArrayList<>(actions.size()); + for (Row action : actions) { + // Row is the index of the loop above where we make WALEntry and Cells. + int row = Bytes.toInt(action.getRow()); + assertTrue("" + row, row > BOUNDARY); + UNFILTERED.incrementAndGet(); + list.add(null); + } + return CompletableFuture.completedFuture(list); } }; } @Override - public void clearRegionLocationCache() { - } - - @Override - public String getClusterId() { - return null; + public Configuration getConfiguration() { + return conf; } } }