HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator

This commit is contained in:
zhangduo 2018-12-11 08:39:43 +08:00 committed by Duo Zhang
parent da9508d427
commit b09b87d143
7 changed files with 277 additions and 53 deletions

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
/**
* Used to communicate with a single HBase table in batches. Obtain an instance from a
* {@link AsyncConnection} and call {@link #close()} afterwards.
@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
* @param mutation The data to send.
*/
CompletableFuture<Void> mutate(Mutation mutation);
default CompletableFuture<Void> mutate(Mutation mutation) {
return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
}
/**
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
* @return The size of the write buffer in bytes.
*/
long getWriteBufferSize();
/**
* Returns the periodical flush interval, 0 means disabled.
*/
default long getPeriodicalFlushTimeout(TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}
}

View File

@ -45,6 +45,25 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
/**
* Set the periodical flush interval. If the data in the buffer has not been flush for a long
* time, i.e, reach this timeout limit, we will flush it automatically.
* <p/>
* Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
* 'flush immediately'. If you want to flush immediately then you should not use this class, as it
* is designed to be 'buffered'.
*/
default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}
/**
* Disable the periodical flush, i.e, set the timeout to 0.
*/
default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
}
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p>

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
/**
* The implementation of {@link AsyncBufferedMutatorBuilder}.
@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
private final HashedWheelTimer periodicalFlushTimer;
private final AsyncTableBuilder<?> tableBuilder;
private long writeBufferSize;
private long periodicFlushTimeoutNs;
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder) {
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
this.periodicalFlushTimer = periodicalFlushTimer;
}
@Override
@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
}
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
this.periodicFlushTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
periodicFlushTimeoutNs);
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
/**
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
*/
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private final HashedWheelTimer periodicalFlushTimer;
private final AsyncTable<?> table;
private final long writeBufferSize;
private final long periodicFlushTimeoutNs;
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private boolean closed;
AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
@VisibleForTesting
Timeout periodicFlushTask;
AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
}
@Override
@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
return table.getConfiguration();
}
private void internalFlush() {
// will be overridden in test
@VisibleForTesting
protected void internalFlush() {
if (periodicFlushTask != null) {
periodicFlushTask.cancel();
periodicFlushTask = null;
}
List<Mutation> toSend = this.mutations;
if (toSend.isEmpty()) {
return;
@ -85,30 +106,11 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
}
}
@Override
public CompletableFuture<Void> mutate(Mutation mutation) {
CompletableFuture<Void> future = new CompletableFuture<Void>();
long heapSize = mutation.heapSize();
synchronized (this) {
if (closed) {
future.completeExceptionally(new IOException("Already closed"));
return future;
}
mutations.add(mutation);
futures.add(future);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
internalFlush();
}
}
return future;
}
@Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
List<CompletableFuture<Void>> futures =
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
.collect(Collectors.toList());
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
synchronized (this) {
if (closed) {
@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
synchronized (AsyncBufferedMutatorImpl.this) {
// confirm that we are still valid, if there is already an internalFlush call before us,
// then we should not execute any more. And in internalFlush we will set periodicFlush
// to null, and since we may schedule a new one, so here we check whether the references
// are equal.
if (timeout == periodicFlushTask) {
periodicFlushTask = null;
internalFlush();
}
}
}, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
this.mutations.addAll(mutations);
this.futures.addAll(futures);
bufferedSize += heapSize;
@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
public long getWriteBufferSize() {
return writeBufferSize;
}
@Override
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}
}

View File

@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutNs;
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.rpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
this.rpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
this.readRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.scanTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
.toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
this.scannerCaching =
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.metaScannerCaching =
conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.writeBufferPeriodicFlushTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
return scannerCaching;
}
int getMetaScannerCaching(){
int getMetaScannerCaching() {
return metaScannerCaching;
}
@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferSize() {
return writeBufferSize;
}
long getWriteBufferPeriodicFlushTimeoutNs() {
return writeBufferPeriodicFlushTimeoutNs;
}
}

View File

@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
this.masterStubMakeFuture.getAndSet(null)
.completeExceptionally(new MasterNotRunningException(msg));
.completeExceptionally(new MasterNotRunningException(msg));
return;
}
try {
@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
});
} catch (IOException e) {
this.masterStubMakeFuture.getAndSet(null)
.completeExceptionally(new IOException("Failed to create async master stub", e));
.completeExceptionally(new IOException("Failed to create async master stub", e));
}
});
}
@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
RETRY_TIMER);
}
}

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -31,6 +33,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -45,12 +48,15 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBufferMutator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList()));
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList()));
// exceeded the write buffer size, a flush will be called directly
fs.forEach(f -> f.join());
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
futures.forEach(f -> f.join());
AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
.forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ));
});
.forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ));
});
}
@Test
@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
}
}
}
@Test
public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
CompletableFuture<?> future = mutator.mutate(put);
Thread.sleep(2000);
// assert that we have not flushed it out
assertFalse(future.isDone());
mutator.flush();
future.get();
}
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
@Test
public void testPeriodicFlush() throws InterruptedException, ExecutionException {
AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
CompletableFuture<?> future = mutator.mutate(put);
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
// a bit deep into the implementation
@Test
public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
.setWriteBufferSize(10 * put.heapSize()).build()) {
List<CompletableFuture<?>> futures = new ArrayList<>();
futures.add(mutator.mutate(put));
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
for (int i = 1;; i++) {
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
if (mutator.periodicFlushTask == null) {
break;
}
}
assertTrue(task.isCancelled());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
for (int i = 0; i < futures.size(); i++) {
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
}
}
}
@Test
public void testCancelPeriodicFlushByManuallyFlush()
throws InterruptedException, ExecutionException {
try (AsyncBufferedMutatorImpl mutator =
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
CompletableFuture<?> future =
mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
mutator.flush();
assertTrue(task.isCancelled());
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
}
@Test
public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
CompletableFuture<?> future;
Timeout task;
try (AsyncBufferedMutatorImpl mutator =
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
}
assertTrue(task.isCancelled());
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
}
@Override
protected void internalFlush() {
flushCount++;
super.internalFlush();
}
}
@Test
public void testRaceBetweenNormalFlushAndPeriodicFlush()
throws InterruptedException, ExecutionException {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
synchronized (mutator) {
// synchronized on mutator to prevent periodic flush to be executed
Thread.sleep(500);
// the timeout should be issued
assertTrue(task.isExpired());
// but no flush is issued as we hold the lock
assertEquals(0, mutator.flushCount);
assertFalse(future.isDone());
// manually flush, then release the lock
mutator.flush();
}
// this is a bit deep into the implementation in netty but anyway let's add a check here to
// confirm that an issued timeout can not be canceled by netty framework.
assertFalse(task.isCancelled());
// and the mutation is done
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
// only the manual flush, the periodic flush should have been canceled by us
assertEquals(1, mutator.flushCount);
}
}
}