From 9981e3f84318eb3e00cff3e3238c49f103fd3d15 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 11 Dec 2018 08:39:43 +0800 Subject: [PATCH] HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator --- .../hbase/client/AsyncBufferedMutator.java | 16 +- .../client/AsyncBufferedMutatorBuilder.java | 19 +++ .../AsyncBufferedMutatorBuilderImpl.java | 19 ++- .../client/AsyncBufferedMutatorImpl.java | 67 +++++--- .../client/AsyncConnectionConfiguration.java | 37 ++-- .../hbase/client/AsyncConnectionImpl.java | 11 +- .../hbase/client/TestAsyncBufferMutator.java | 161 +++++++++++++++++- 7 files changed, 277 insertions(+), 53 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java index 6fe4b9a8602..7b21eb5fa13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java @@ -18,13 +18,16 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; - +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + /** * Used to communicate with a single HBase table in batches. Obtain an instance from a * {@link AsyncConnection} and call {@link #close()} afterwards. @@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable { * part of a batch. Currently only supports {@link Put} and {@link Delete} mutations. * @param mutation The data to send. */ - CompletableFuture mutate(Mutation mutation); + default CompletableFuture mutate(Mutation mutation) { + return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation))); + } /** * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire @@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable { * @return The size of the write buffer in bytes. */ long getWriteBufferSize(); + + /** + * Returns the periodical flush interval, 0 means disabled. + */ + default long getPeriodicalFlushTimeout(TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java index 45959bb4ce7..c617c8e1e8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -45,6 +45,25 @@ public interface AsyncBufferedMutatorBuilder { */ AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit); + /** + * Set the periodical flush interval. If the data in the buffer has not been flush for a long + * time, i.e, reach this timeout limit, we will flush it automatically. + *

+ * Notice that, set the timeout to 0 or a negative value means disable periodical flush, not + * 'flush immediately'. If you want to flush immediately then you should not use this class, as it + * is designed to be 'buffered'. + */ + default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Disable the periodical flush, i.e, set the timeout to 0. + */ + default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() { + return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS); + } + /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index 227d02b83c9..eb8af175fbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; /** * The implementation of {@link AsyncBufferedMutatorBuilder}. @@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @InterfaceAudience.Private class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { + private final HashedWheelTimer periodicalFlushTimer; + private final AsyncTableBuilder tableBuilder; private long writeBufferSize; + private long periodicFlushTimeoutNs; + public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf, - AsyncTableBuilder tableBuilder) { + AsyncTableBuilder tableBuilder, HashedWheelTimer periodicalFlushTimer) { this.tableBuilder = tableBuilder; this.writeBufferSize = connConf.getWriteBufferSize(); + this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs(); + this.periodicalFlushTimer = periodicalFlushTimer; } @Override @@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { } @Override - public AsyncBufferedMutator build() { - return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize); + public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) { + this.periodicFlushTimeoutNs = unit.toNanos(timeout); + return this; } + @Override + public AsyncBufferedMutator build() { + return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize, + periodicFlushTimeoutNs); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 5a92acef8d5..318c6c94c4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; + /** * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}. */ @InterfaceAudience.Private class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { + private final HashedWheelTimer periodicalFlushTimer; + private final AsyncTable table; private final long writeBufferSize; + private final long periodicFlushTimeoutNs; + private List mutations = new ArrayList<>(); private List> futures = new ArrayList<>(); @@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private boolean closed; - AsyncBufferedMutatorImpl(AsyncTable table, long writeBufferSize) { + @VisibleForTesting + Timeout periodicFlushTask; + + AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable table, + long writeBufferSize, long periodicFlushTimeoutNs) { + this.periodicalFlushTimer = periodicalFlushTimer; this.table = table; this.writeBufferSize = writeBufferSize; + this.periodicFlushTimeoutNs = periodicFlushTimeoutNs; } @Override @@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { return table.getConfiguration(); } - private void internalFlush() { + // will be overridden in test + @VisibleForTesting + protected void internalFlush() { + if (periodicFlushTask != null) { + periodicFlushTask.cancel(); + periodicFlushTask = null; + } List toSend = this.mutations; if (toSend.isEmpty()) { return; @@ -85,30 +106,11 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { } } - @Override - public CompletableFuture mutate(Mutation mutation) { - CompletableFuture future = new CompletableFuture(); - long heapSize = mutation.heapSize(); - synchronized (this) { - if (closed) { - future.completeExceptionally(new IOException("Already closed")); - return future; - } - mutations.add(mutation); - futures.add(future); - bufferedSize += heapSize; - if (bufferedSize >= writeBufferSize) { - internalFlush(); - } - } - return future; - } - @Override public List> mutate(List mutations) { List> futures = - Stream.> generate(CompletableFuture::new).limit(mutations.size()) - .collect(Collectors.toList()); + Stream.> generate(CompletableFuture::new).limit(mutations.size()) + .collect(Collectors.toList()); long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum(); synchronized (this) { if (closed) { @@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { futures.forEach(f -> f.completeExceptionally(ioe)); return futures; } + if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) { + periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { + synchronized (AsyncBufferedMutatorImpl.this) { + // confirm that we are still valid, if there is already an internalFlush call before us, + // then we should not execute any more. And in internalFlush we will set periodicFlush + // to null, and since we may schedule a new one, so here we check whether the references + // are equal. + if (timeout == periodicFlushTask) { + periodicFlushTask = null; + internalFlush(); + } + } + }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); + } this.mutations.addAll(mutations); this.futures.addAll(futures); bufferedSize += heapSize; @@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { public long getWriteBufferSize() { return writeBufferSize; } + + @Override + public long getPeriodicalFlushTimeout(TimeUnit unit) { + return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index bd2add8ec59..915e9dd46c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.yetus.audience.InterfaceAudience; @@ -91,32 +92,38 @@ class AsyncConnectionConfiguration { private final long writeBufferSize; + private final long writeBufferPeriodicFlushTimeoutNs; + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); - this.rpcTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)); + this.rpcTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)); this.readRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); this.writeRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); this.pauseNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.startLogErrorsCnt = - conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.scanTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); this.scannerCaching = - conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); + conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.metaScannerCaching = + conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.writeBufferPeriodicFlushTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, + WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); } long getMetaOperationTimeoutNs() { @@ -159,7 +166,7 @@ class AsyncConnectionConfiguration { return scannerCaching; } - int getMetaScannerCaching(){ + int getMetaScannerCaching() { return metaScannerCaching; } @@ -170,4 +177,8 @@ class AsyncConnectionConfiguration { long getWriteBufferSize() { return writeBufferSize; } + + long getWriteBufferPeriodicFlushTimeoutNs() { + return writeBufferPeriodicFlushTimeoutNs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index a05764ee9bf..078395ba067 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection { @VisibleForTesting static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( - Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); + Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; @@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); this.masterStubMakeFuture.getAndSet(null) - .completeExceptionally(new MasterNotRunningException(msg)); + .completeExceptionally(new MasterNotRunningException(msg)); return; } try { @@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection { }); } catch (IOException e) { this.masterStubMakeFuture.getAndSet(null) - .completeExceptionally(new IOException("Failed to create async master stub", e)); + .completeExceptionally(new IOException("Failed to create async master stub", e)); } }); } @@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { - return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName)); + return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); } @Override public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool) { - return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool)); + return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), + RETRY_TIMER); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index 9fe4ca74f16..6eed326f2d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -31,6 +33,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -45,12 +48,15 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; + @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncBufferMutator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncBufferMutator.class); + HBaseClassTestRule.forClass(TestAsyncBufferMutator.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -96,10 +102,10 @@ public class TestAsyncBufferMutator { private void test(TableName tableName) throws InterruptedException { List> futures = new ArrayList<>(); try (AsyncBufferedMutator mutator = - CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { + CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { List> fs = mutator.mutate(IntStream.range(0, COUNT / 2) - .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) - .collect(Collectors.toList())); + .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) + .collect(Collectors.toList())); // exceeded the write buffer size, a flush will be called directly fs.forEach(f -> f.join()); IntStream.range(COUNT / 2, COUNT).forEach(i -> { @@ -115,9 +121,9 @@ public class TestAsyncBufferMutator { futures.forEach(f -> f.join()); AsyncTable table = CONN.getTable(tableName); IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) - .forEach(r -> { - assertArrayEquals(VALUE, r.getValue(CF, CQ)); - }); + .forEach(r -> { + assertArrayEquals(VALUE, r.getValue(CF, CQ)); + }); } @Test @@ -142,4 +148,145 @@ public class TestAsyncBufferMutator { } } } + + @Test + public void testNoPeriodicFlush() throws InterruptedException, ExecutionException { + try (AsyncBufferedMutator mutator = + CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) { + Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); + CompletableFuture future = mutator.mutate(put); + Thread.sleep(2000); + // assert that we have not flushed it out + assertFalse(future.isDone()); + mutator.flush(); + future.get(); + } + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + } + + @Test + public void testPeriodicFlush() throws InterruptedException, ExecutionException { + AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) + .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build(); + Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); + CompletableFuture future = mutator.mutate(put); + future.get(); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + } + + // a bit deep into the implementation + @Test + public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException { + Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); + try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN + .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS) + .setWriteBufferSize(10 * put.heapSize()).build()) { + List> futures = new ArrayList<>(); + futures.add(mutator.mutate(put)); + Timeout task = mutator.periodicFlushTask; + // we should have scheduled a periodic flush task + assertNotNull(task); + for (int i = 1;; i++) { + futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); + if (mutator.periodicFlushTask == null) { + break; + } + } + assertTrue(task.isCancelled()); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + AsyncTable table = CONN.getTable(TABLE_NAME); + for (int i = 0; i < futures.size(); i++) { + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ)); + } + } + } + + @Test + public void testCancelPeriodicFlushByManuallyFlush() + throws InterruptedException, ExecutionException { + try (AsyncBufferedMutatorImpl mutator = + (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) + .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { + CompletableFuture future = + mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); + Timeout task = mutator.periodicFlushTask; + // we should have scheduled a periodic flush task + assertNotNull(task); + mutator.flush(); + assertTrue(task.isCancelled()); + future.get(); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + } + } + + @Test + public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException { + CompletableFuture future; + Timeout task; + try (AsyncBufferedMutatorImpl mutator = + (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) + .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { + future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); + task = mutator.periodicFlushTask; + // we should have scheduled a periodic flush task + assertNotNull(task); + } + assertTrue(task.isCancelled()); + future.get(); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + } + + private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl { + + private int flushCount; + + AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable table, + long writeBufferSize, long periodicFlushTimeoutNs) { + super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs); + } + + @Override + protected void internalFlush() { + flushCount++; + super.internalFlush(); + } + } + + @Test + public void testRaceBetweenNormalFlushAndPeriodicFlush() + throws InterruptedException, ExecutionException { + Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); + try (AsyncBufferMutatorForTest mutator = + new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), + 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) { + CompletableFuture future = mutator.mutate(put); + Timeout task = mutator.periodicFlushTask; + // we should have scheduled a periodic flush task + assertNotNull(task); + synchronized (mutator) { + // synchronized on mutator to prevent periodic flush to be executed + Thread.sleep(500); + // the timeout should be issued + assertTrue(task.isExpired()); + // but no flush is issued as we hold the lock + assertEquals(0, mutator.flushCount); + assertFalse(future.isDone()); + // manually flush, then release the lock + mutator.flush(); + } + // this is a bit deep into the implementation in netty but anyway let's add a check here to + // confirm that an issued timeout can not be canceled by netty framework. + assertFalse(task.isCancelled()); + // and the mutation is done + future.get(); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + // only the manual flush, the periodic flush should have been canceled by us + assertEquals(1, mutator.flushCount); + } + } }