diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 4c6a0ef685e..7805f77e30e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -18,13 +18,12 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; - import java.io.Closeable; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; /** *

Used to communicate with a single HBase table similar to {@link Table} but meant for @@ -64,7 +63,13 @@ public interface BufferedMutator extends Closeable { /** * Key to use setting non-default BufferedMutator implementation in Configuration. */ - public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname"; + String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname"; + + /** + * Having the timer tick run more often that once every 100ms is needless and will + * probably cause too many timer events firing having a negative impact on performance. + */ + long MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = 100; /** * Gets the fully qualified table name instance of the table that this BufferedMutator writes to. @@ -114,6 +119,56 @@ public interface BufferedMutator extends Closeable { */ void flush() throws IOException; + /** + * Sets the maximum time before the buffer is automatically flushed checking once per second. + * @param timeoutMs The maximum number of milliseconds how long records may be buffered + * before they are flushed. Set to 0 to disable. + */ + default void setWriteBufferPeriodicFlush(long timeoutMs) { + setWriteBufferPeriodicFlush(timeoutMs, 1000L); + } + + /** + * Sets the maximum time before the buffer is automatically flushed. + * @param timeoutMs The maximum number of milliseconds how long records may be buffered + * before they are flushed. Set to 0 to disable. + * @param timerTickMs The number of milliseconds between each check if the + * timeout has been exceeded. Must be 100ms (as defined in + * {@link #MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS}) + * or larger to avoid performance problems. + */ + default void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { + throw new UnsupportedOperationException( + "The BufferedMutator::setWriteBufferPeriodicFlush has not been implemented"); + } + + /** + * Disable periodic flushing of the write buffer. + */ + default void disableWriteBufferPeriodicFlush() { + setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + } + + /** + * Returns the current periodic flush timeout value in milliseconds. + * @return The maximum number of milliseconds how long records may be buffered before they + * are flushed. The value 0 means this is disabled. + */ + default long getWriteBufferPeriodicFlushTimeoutMs() { + throw new UnsupportedOperationException( + "The BufferedMutator::getWriteBufferPeriodicFlushTimeoutMs has not been implemented"); + } + + /** + * Returns the current periodic flush timertick interval in milliseconds. + * @return The number of milliseconds between each check if the timeout has been exceeded. + * This value only has a real meaning if the timeout has been set to > 0 + */ + default long getWriteBufferPeriodicFlushTimerTickMs() { + throw new UnsupportedOperationException( + "The BufferedMutator::getWriteBufferPeriodicFlushTimerTickMs has not been implemented"); + } + /** * Returns the maximum size in bytes of the write buffer for this HTable. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index cf85f5790b5..13b1a811039 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -15,18 +15,20 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -75,6 +77,11 @@ public class BufferedMutatorImpl implements BufferedMutator { */ private final AtomicInteger undealtMutationCount = new AtomicInteger(0); private final long writeBufferSize; + + private long writeBufferPeriodicFlushTimeoutMs; + private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; + private Timer writeBufferPeriodicFlushTimer = null; + private final int maxKeyValueSize; private final ExecutorService pool; private final AtomicInteger rpcTimeout; @@ -99,15 +106,34 @@ public class BufferedMutatorImpl implements BufferedMutator { cleanupPoolOnClose = false; } ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); - this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? - params.getWriteBufferSize() : tableConf.getWriteBufferSize(); - this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? - params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.writeBufferSize = + params.getWriteBufferSize() != UNSET ? + params.getWriteBufferSize() : tableConf.getWriteBufferSize(); - this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ? - params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); - this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ? - params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); + // Set via the setter because it does value validation and starts/stops the TimerTask + long newWriteBufferPeriodicFlushTimeoutMs = + params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET + ? params.getWriteBufferPeriodicFlushTimeoutMs() + : tableConf.getWriteBufferPeriodicFlushTimeoutMs(); + long newWriteBufferPeriodicFlushTimerTickMs = + params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET + ? params.getWriteBufferPeriodicFlushTimerTickMs() + : tableConf.getWriteBufferPeriodicFlushTimerTickMs(); + this.setWriteBufferPeriodicFlush( + newWriteBufferPeriodicFlushTimeoutMs, + newWriteBufferPeriodicFlushTimerTickMs); + + this.maxKeyValueSize = + params.getMaxKeyValueSize() != UNSET ? + params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + + this.rpcTimeout = new AtomicInteger( + params.getRpcTimeout() != UNSET ? + params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); + + this.operationTimeout = new AtomicInteger( + params.getOperationTimeout() != UNSET ? + params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); this.ap = ap; } BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, @@ -161,6 +187,10 @@ public class BufferedMutatorImpl implements BufferedMutator { ++toAddCount; } + if (currentWriteBufferSize.get() == 0) { + firstRecordInBufferTimestamp = System.currentTimeMillis(); + } + // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. @@ -182,6 +212,31 @@ public class BufferedMutatorImpl implements BufferedMutator { } } + @VisibleForTesting + protected long getExecutedWriteBufferPeriodicFlushes() { + return executedWriteBufferPeriodicFlushes; + } + + private long firstRecordInBufferTimestamp = 0; + private long executedWriteBufferPeriodicFlushes = 0; + + private void timerCallbackForWriteBufferPeriodicFlush() { + if (currentWriteBufferSize.get() == 0) { + return; // Nothing to flush + } + long now = System.currentTimeMillis(); + if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) { + return; // No need to flush yet + } + // The first record in the writebuffer has been in there too long --> flush + try { + executedWriteBufferPeriodicFlushes++; + flush(); + } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { + LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); + } + } + // validate for well-formedness public void validatePut(final Put put) throws IllegalArgumentException { HTable.validatePut(put, maxKeyValueSize); @@ -193,6 +248,10 @@ public class BufferedMutatorImpl implements BufferedMutator { if (this.closed) { return; } + + // Stop any running Periodic Flush timer. + disableWriteBufferPeriodicFlush(); + // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. backgroundFlushCommits(true); @@ -277,7 +336,8 @@ public class BufferedMutatorImpl implements BufferedMutator { } /** - * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. + * Reuse the AsyncProcessTask when calling + * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. * @param taker access the inner buffer. * @return An AsyncProcessTask which always returns the latest rpc and operation timeout. */ @@ -309,6 +369,48 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + @Override + public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { + long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs; + long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; + + // Both parameters have minimal values. + this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs); + this.writeBufferPeriodicFlushTimerTickMs = + Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs); + + // If something changed we stop the old Timer. + if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs || + this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) { + if (writeBufferPeriodicFlushTimer != null) { + writeBufferPeriodicFlushTimer.cancel(); + writeBufferPeriodicFlushTimer = null; + } + } + + // If we have the need for a timer and there is none we start it + if (writeBufferPeriodicFlushTimer == null && + writeBufferPeriodicFlushTimeoutMs > 0) { + writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. + writeBufferPeriodicFlushTimer.schedule(new TimerTask() { + @Override + public void run() { + BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); + } + }, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs); + } + } + + @Override + public long getWriteBufferPeriodicFlushTimeoutMs() { + return this.writeBufferPeriodicFlushTimeoutMs; + } + + @Override + public long getWriteBufferPeriodicFlushTimerTickMs() { + return this.writeBufferPeriodicFlushTimerTickMs; + } + @Override public void setRpcTimeout(int rpcTimeout) { this.rpcTimeout.set(rpcTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index 0648501b2de..3f6c56570e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.ExecutorService; - import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -34,6 +33,8 @@ public class BufferedMutatorParams implements Cloneable { private final TableName tableName; private long writeBufferSize = UNSET; + private long writeBufferPeriodicFlushTimeoutMs = UNSET; + private long writeBufferPeriodicFlushTimerTickMs = UNSET; private int maxKeyValueSize = UNSET; private ExecutorService pool = null; private String implementationClassName = null; @@ -88,6 +89,30 @@ public class BufferedMutatorParams implements Cloneable { return this; } + public long getWriteBufferPeriodicFlushTimeoutMs() { + return writeBufferPeriodicFlushTimeoutMs; + } + + /** + * Set the max timeout before the buffer is automatically flushed. + */ + public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs) { + this.writeBufferPeriodicFlushTimeoutMs = timeoutMs; + return this; + } + + public long getWriteBufferPeriodicFlushTimerTickMs() { + return writeBufferPeriodicFlushTimerTickMs; + } + + /** + * Set the TimerTick how often the buffer timeout if checked. + */ + public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) { + this.writeBufferPeriodicFlushTimerTickMs = timerTickMs; + return this; + } + public int getMaxKeyValueSize() { return maxKeyValueSize; } @@ -154,11 +179,13 @@ public class BufferedMutatorParams implements Cloneable { @Override public BufferedMutatorParams clone() { BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName); - clone.writeBufferSize = this.writeBufferSize; - clone.maxKeyValueSize = maxKeyValueSize; - clone.pool = this.pool; - clone.listener = this.listener; - clone.implementationClassName = this.implementationClassName; + clone.writeBufferSize = this.writeBufferSize; + clone.writeBufferPeriodicFlushTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs; + clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; + clone.maxKeyValueSize = this.maxKeyValueSize; + clone.pool = this.pool; + clone.listener = this.listener; + clone.implementationClassName = this.implementationClassName; return clone; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index a58240579d3..d99600426d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -30,10 +30,18 @@ public class ConnectionConfiguration { public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; + public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS = + "hbase.client.write.buffer.periodicflush.timeout.ms"; + public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = + "hbase.client.write.buffer.periodicflush.timertick.ms"; + public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled + public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; private final long writeBufferSize; + private final long writeBufferPeriodicFlushTimeoutMs; + private final long writeBufferPeriodicFlushTimerTickMs; private final int metaOperationTimeout; private final int operationTimeout; private final int scannerCaching; @@ -56,6 +64,14 @@ public class ConnectionConfiguration { ConnectionConfiguration(Configuration conf) { this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); + this.writeBufferPeriodicFlushTimeoutMs = conf.getLong( + WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, + WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT); + + this.writeBufferPeriodicFlushTimerTickMs = conf.getLong( + WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, + WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT); + this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -105,6 +121,8 @@ public class ConnectionConfiguration { @VisibleForTesting protected ConnectionConfiguration() { this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; + this.writeBufferPeriodicFlushTimeoutMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; + this.writeBufferPeriodicFlushTimerTickMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; @@ -133,6 +151,14 @@ public class ConnectionConfiguration { return writeBufferSize; } + public long getWriteBufferPeriodicFlushTimeoutMs() { + return writeBufferPeriodicFlushTimeoutMs; + } + + public long getWriteBufferPeriodicFlushTimerTickMs() { + return writeBufferPeriodicFlushTimerTickMs; + } + public int getMetaOperationTimeout() { return metaOperationTimeout; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 653fbfdb4eb..562630f38e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -363,6 +363,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { params.writeBufferSize(connectionConfig.getWriteBufferSize()); } + if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { + params.setWriteBufferPeriodicFlushTimeoutMs( + connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); + } + if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { + params.setWriteBufferPeriodicFlushTimerTickMs( + connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); + } if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index ba6756b670d..fc5979390de 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CategoryBasedTimeout; @@ -1101,6 +1100,119 @@ public class TestAsyncProcess { Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); } + @Test + public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + + checkPeriodicFlushParameters(conn, ap, + 1234, 1234, + 1234, 1234); + checkPeriodicFlushParameters(conn, ap, + 0, 0, + 0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + checkPeriodicFlushParameters(conn, ap, + -1234, 0, + -1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + checkPeriodicFlushParameters(conn, ap, + 1, 1, + 1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); + } + + private void checkPeriodicFlushParameters(ClusterConnection conn, + MyAsyncProcess ap, + long setTO, long expectTO, + long setTT, long expectTT + ) { + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + + // The BufferedMutatorParams does nothing with the value + bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); + bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); + Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); + Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); + + // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) + BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); + Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); + Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); + + // The BufferedMutatorImpl corrects illegal values (direct via setter) + BufferedMutatorImpl ht2 = + new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); + ht2.setWriteBufferPeriodicFlush(setTO, setTT); + Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); + Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); + + } + + @Test + public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + + bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP + bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms + bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record + + BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); + + // Verify if BufferedMutator has the right settings. + Assert.assertEquals(10000, ht.getWriteBufferSize()); + Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); + Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, + ht.getWriteBufferPeriodicFlushTimerTickMs()); + + Put put = createPut(1, true); + + Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + + // ----- Insert, flush immediately, MUST NOT flush automatically + ht.mutate(put); + ht.flush(); + + Thread.sleep(1000); + Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + + // ----- Insert, NO flush, MUST flush automatically + ht.mutate(put); + Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); + + // The timerTick should fire every 100ms, so after twice that we must have + // seen at least 1 tick and we should see an automatic flush + Thread.sleep(200); + Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + + // Ensure it does not flush twice + Thread.sleep(200); + Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + + // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically + ht.disableWriteBufferPeriodicFlush(); + ht.mutate(put); + Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); + + // Wait for at least 1 timerTick, we should see NO flushes. + Thread.sleep(200); + Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); + + // Reenable periodic flushing, a flush seems to take about 1 second + // so we wait for 2 seconds and it should have finished the flush. + ht.setWriteBufferPeriodicFlush(1, 100); + Thread.sleep(2000); + Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + } + + @Test public void testBufferedMutatorImplWithSharedPool() throws Exception { ClusterConnection conn = createHConnection(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java index 0646f41b6de..2c2935e63f1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -30,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -136,13 +134,21 @@ public class TestBufferedMutatorParams { BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName)); BufferedMutator.ExceptionListener listener = new MockExceptionListener(); - bmp.writeBufferSize(17).maxKeyValueSize(13).pool(pool).listener(listener); + bmp + .writeBufferSize(17) + .setWriteBufferPeriodicFlushTimeoutMs(123) + .setWriteBufferPeriodicFlushTimerTickMs(456) + .maxKeyValueSize(13) + .pool(pool) + .listener(listener); bmp.implementationClassName("someClassName"); BufferedMutatorParams clone = bmp.clone(); // Confirm some literals assertEquals(tableName, clone.getTableName().toString()); assertEquals(17, clone.getWriteBufferSize()); + assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs()); + assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs()); assertEquals(13, clone.getMaxKeyValueSize()); assertEquals("someClassName", clone.getImplementationClassName()); @@ -168,6 +174,10 @@ public class TestBufferedMutatorParams { assertEquals(some.getTableName().toString(), clone.getTableName().toString()); assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize()); + assertEquals(some.getWriteBufferPeriodicFlushTimeoutMs(), + clone.getWriteBufferPeriodicFlushTimeoutMs()); + assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(), + clone.getWriteBufferPeriodicFlushTimerTickMs()); assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize()); assertTrue(some.getListener() == clone.getListener()); assertTrue(some.getPool() == clone.getPool());