HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator
This commit is contained in:
parent
af72916c4c
commit
9981e3f843
|
@ -18,13 +18,16 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Used to communicate with a single HBase table in batches. Obtain an instance from a
|
||||
* {@link AsyncConnection} and call {@link #close()} afterwards.
|
||||
|
@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
|
|||
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
|
||||
* @param mutation The data to send.
|
||||
*/
|
||||
CompletableFuture<Void> mutate(Mutation mutation);
|
||||
default CompletableFuture<Void> mutate(Mutation mutation) {
|
||||
return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
|
||||
|
@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
|
|||
* @return The size of the write buffer in bytes.
|
||||
*/
|
||||
long getWriteBufferSize();
|
||||
|
||||
/**
|
||||
* Returns the periodical flush interval, 0 means disabled.
|
||||
*/
|
||||
default long getPeriodicalFlushTimeout(TimeUnit unit) {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,25 @@ public interface AsyncBufferedMutatorBuilder {
|
|||
*/
|
||||
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Set the periodical flush interval. If the data in the buffer has not been flush for a long
|
||||
* time, i.e, reach this timeout limit, we will flush it automatically.
|
||||
* <p/>
|
||||
* Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
|
||||
* 'flush immediately'. If you want to flush immediately then you should not use this class, as it
|
||||
* is designed to be 'buffered'.
|
||||
*/
|
||||
default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable the periodical flush, i.e, set the timeout to 0.
|
||||
*/
|
||||
default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
|
||||
return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
|
||||
* <p>
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
|
||||
/**
|
||||
* The implementation of {@link AsyncBufferedMutatorBuilder}.
|
||||
|
@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|||
@InterfaceAudience.Private
|
||||
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
|
||||
|
||||
private final HashedWheelTimer periodicalFlushTimer;
|
||||
|
||||
private final AsyncTableBuilder<?> tableBuilder;
|
||||
|
||||
private long writeBufferSize;
|
||||
|
||||
private long periodicFlushTimeoutNs;
|
||||
|
||||
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
|
||||
AsyncTableBuilder<?> tableBuilder) {
|
||||
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
|
||||
this.tableBuilder = tableBuilder;
|
||||
this.writeBufferSize = connConf.getWriteBufferSize();
|
||||
this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
|
||||
this.periodicalFlushTimer = periodicalFlushTimer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AsyncBufferedMutator build() {
|
||||
return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
|
||||
public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
|
||||
this.periodicFlushTimeoutNs = unit.toNanos(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncBufferedMutator build() {
|
||||
return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
|
||||
periodicFlushTimeoutNs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
|
||||
/**
|
||||
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
||||
|
||||
private final HashedWheelTimer periodicalFlushTimer;
|
||||
|
||||
private final AsyncTable<?> table;
|
||||
|
||||
private final long writeBufferSize;
|
||||
|
||||
private final long periodicFlushTimeoutNs;
|
||||
|
||||
private List<Mutation> mutations = new ArrayList<>();
|
||||
|
||||
private List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||
|
@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
|||
|
||||
private boolean closed;
|
||||
|
||||
AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
|
||||
@VisibleForTesting
|
||||
Timeout periodicFlushTask;
|
||||
|
||||
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
|
||||
long writeBufferSize, long periodicFlushTimeoutNs) {
|
||||
this.periodicalFlushTimer = periodicalFlushTimer;
|
||||
this.table = table;
|
||||
this.writeBufferSize = writeBufferSize;
|
||||
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
|||
return table.getConfiguration();
|
||||
}
|
||||
|
||||
private void internalFlush() {
|
||||
// will be overridden in test
|
||||
@VisibleForTesting
|
||||
protected void internalFlush() {
|
||||
if (periodicFlushTask != null) {
|
||||
periodicFlushTask.cancel();
|
||||
periodicFlushTask = null;
|
||||
}
|
||||
List<Mutation> toSend = this.mutations;
|
||||
if (toSend.isEmpty()) {
|
||||
return;
|
||||
|
@ -85,30 +106,11 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mutate(Mutation mutation) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||
long heapSize = mutation.heapSize();
|
||||
synchronized (this) {
|
||||
if (closed) {
|
||||
future.completeExceptionally(new IOException("Already closed"));
|
||||
return future;
|
||||
}
|
||||
mutations.add(mutation);
|
||||
futures.add(future);
|
||||
bufferedSize += heapSize;
|
||||
if (bufferedSize >= writeBufferSize) {
|
||||
internalFlush();
|
||||
}
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
|
||||
List<CompletableFuture<Void>> futures =
|
||||
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
|
||||
.collect(Collectors.toList());
|
||||
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
|
||||
.collect(Collectors.toList());
|
||||
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
|
||||
synchronized (this) {
|
||||
if (closed) {
|
||||
|
@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
|||
futures.forEach(f -> f.completeExceptionally(ioe));
|
||||
return futures;
|
||||
}
|
||||
if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
|
||||
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
|
||||
synchronized (AsyncBufferedMutatorImpl.this) {
|
||||
// confirm that we are still valid, if there is already an internalFlush call before us,
|
||||
// then we should not execute any more. And in internalFlush we will set periodicFlush
|
||||
// to null, and since we may schedule a new one, so here we check whether the references
|
||||
// are equal.
|
||||
if (timeout == periodicFlushTask) {
|
||||
periodicFlushTask = null;
|
||||
internalFlush();
|
||||
}
|
||||
}
|
||||
}, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
this.mutations.addAll(mutations);
|
||||
this.futures.addAll(futures);
|
||||
bufferedSize += heapSize;
|
||||
|
@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
|
|||
public long getWriteBufferSize() {
|
||||
return writeBufferSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPeriodicalFlushTimeout(TimeUnit unit) {
|
||||
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
|||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
|
|||
|
||||
private final long writeBufferSize;
|
||||
|
||||
private final long writeBufferPeriodicFlushTimeoutNs;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
AsyncConnectionConfiguration(Configuration conf) {
|
||||
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
|
||||
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
|
||||
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
|
||||
this.rpcTimeoutNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.rpcTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
this.readRpcTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
|
||||
this.writeRpcTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
|
||||
this.pauseNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
|
||||
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
this.startLogErrorsCnt =
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
this.scanTimeoutNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
|
||||
.toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
|
||||
this.scannerCaching =
|
||||
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||
this.metaScannerCaching =
|
||||
conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
|
||||
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
|
||||
this.writeBufferPeriodicFlushTimeoutNs =
|
||||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
|
||||
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
|
||||
}
|
||||
|
||||
long getMetaOperationTimeoutNs() {
|
||||
|
@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
|
|||
return scannerCaching;
|
||||
}
|
||||
|
||||
int getMetaScannerCaching(){
|
||||
int getMetaScannerCaching() {
|
||||
return metaScannerCaching;
|
||||
}
|
||||
|
||||
|
@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
|
|||
long getWriteBufferSize() {
|
||||
return writeBufferSize;
|
||||
}
|
||||
|
||||
long getWriteBufferPeriodicFlushTimeoutNs() {
|
||||
return writeBufferPeriodicFlushTimeoutNs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
@VisibleForTesting
|
||||
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
|
||||
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
|
||||
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
|
||||
|
||||
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
|
||||
|
||||
|
@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
String msg = "ZooKeeper available but no active master location found";
|
||||
LOG.info(msg);
|
||||
this.masterStubMakeFuture.getAndSet(null)
|
||||
.completeExceptionally(new MasterNotRunningException(msg));
|
||||
.completeExceptionally(new MasterNotRunningException(msg));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
});
|
||||
} catch (IOException e) {
|
||||
this.masterStubMakeFuture.getAndSet(null)
|
||||
.completeExceptionally(new IOException("Failed to create async master stub", e));
|
||||
.completeExceptionally(new IOException("Failed to create async master stub", e));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
@Override
|
||||
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
|
||||
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
|
||||
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
|
||||
ExecutorService pool) {
|
||||
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
|
||||
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
|
||||
RETRY_TIMER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -31,6 +33,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -45,12 +48,15 @@ import org.junit.ClassRule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncBufferMutator {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
|
||||
HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
|
@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
|
|||
private void test(TableName tableName) throws InterruptedException {
|
||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||
try (AsyncBufferedMutator mutator =
|
||||
CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
|
||||
CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
|
||||
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
|
||||
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
|
||||
.collect(Collectors.toList()));
|
||||
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
|
||||
.collect(Collectors.toList()));
|
||||
// exceeded the write buffer size, a flush will be called directly
|
||||
fs.forEach(f -> f.join());
|
||||
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
|
||||
|
@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
|
|||
futures.forEach(f -> f.join());
|
||||
AsyncTable<?> table = CONN.getTable(tableName);
|
||||
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
|
||||
.forEach(r -> {
|
||||
assertArrayEquals(VALUE, r.getValue(CF, CQ));
|
||||
});
|
||||
.forEach(r -> {
|
||||
assertArrayEquals(VALUE, r.getValue(CF, CQ));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
|
||||
try (AsyncBufferedMutator mutator =
|
||||
CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
|
||||
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
|
||||
CompletableFuture<?> future = mutator.mutate(put);
|
||||
Thread.sleep(2000);
|
||||
// assert that we have not flushed it out
|
||||
assertFalse(future.isDone());
|
||||
mutator.flush();
|
||||
future.get();
|
||||
}
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeriodicFlush() throws InterruptedException, ExecutionException {
|
||||
AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
|
||||
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
|
||||
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
|
||||
CompletableFuture<?> future = mutator.mutate(put);
|
||||
future.get();
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
|
||||
}
|
||||
|
||||
// a bit deep into the implementation
|
||||
@Test
|
||||
public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
|
||||
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
|
||||
try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
|
||||
.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
|
||||
.setWriteBufferSize(10 * put.heapSize()).build()) {
|
||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||
futures.add(mutator.mutate(put));
|
||||
Timeout task = mutator.periodicFlushTask;
|
||||
// we should have scheduled a periodic flush task
|
||||
assertNotNull(task);
|
||||
for (int i = 1;; i++) {
|
||||
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
|
||||
if (mutator.periodicFlushTask == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(task.isCancelled());
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelPeriodicFlushByManuallyFlush()
|
||||
throws InterruptedException, ExecutionException {
|
||||
try (AsyncBufferedMutatorImpl mutator =
|
||||
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
|
||||
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
|
||||
CompletableFuture<?> future =
|
||||
mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
|
||||
Timeout task = mutator.periodicFlushTask;
|
||||
// we should have scheduled a periodic flush task
|
||||
assertNotNull(task);
|
||||
mutator.flush();
|
||||
assertTrue(task.isCancelled());
|
||||
future.get();
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
|
||||
CompletableFuture<?> future;
|
||||
Timeout task;
|
||||
try (AsyncBufferedMutatorImpl mutator =
|
||||
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
|
||||
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
|
||||
future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
|
||||
task = mutator.periodicFlushTask;
|
||||
// we should have scheduled a periodic flush task
|
||||
assertNotNull(task);
|
||||
}
|
||||
assertTrue(task.isCancelled());
|
||||
future.get();
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
|
||||
}
|
||||
|
||||
private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
|
||||
|
||||
private int flushCount;
|
||||
|
||||
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
|
||||
long writeBufferSize, long periodicFlushTimeoutNs) {
|
||||
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void internalFlush() {
|
||||
flushCount++;
|
||||
super.internalFlush();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRaceBetweenNormalFlushAndPeriodicFlush()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
|
||||
try (AsyncBufferMutatorForTest mutator =
|
||||
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
|
||||
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
|
||||
CompletableFuture<?> future = mutator.mutate(put);
|
||||
Timeout task = mutator.periodicFlushTask;
|
||||
// we should have scheduled a periodic flush task
|
||||
assertNotNull(task);
|
||||
synchronized (mutator) {
|
||||
// synchronized on mutator to prevent periodic flush to be executed
|
||||
Thread.sleep(500);
|
||||
// the timeout should be issued
|
||||
assertTrue(task.isExpired());
|
||||
// but no flush is issued as we hold the lock
|
||||
assertEquals(0, mutator.flushCount);
|
||||
assertFalse(future.isDone());
|
||||
// manually flush, then release the lock
|
||||
mutator.flush();
|
||||
}
|
||||
// this is a bit deep into the implementation in netty but anyway let's add a check here to
|
||||
// confirm that an issued timeout can not be canceled by netty framework.
|
||||
assertFalse(task.isCancelled());
|
||||
// and the mutation is done
|
||||
future.get();
|
||||
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
|
||||
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
|
||||
// only the manual flush, the periodic flush should have been canceled by us
|
||||
assertEquals(1, mutator.flushCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue