HBASE-19486 Periodically ensure records are not buffered too long by BufferedMutator

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Niels Basjes 2017-12-29 22:22:34 +08:00 committed by Chia-Ping Tsai
parent e23f7afe57
commit 5a1c36f70a
7 changed files with 365 additions and 25 deletions

View File

@ -18,13 +18,12 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* <p>Used to communicate with a single HBase table similar to {@link Table} but meant for * <p>Used to communicate with a single HBase table similar to {@link Table} but meant for
@ -64,7 +63,13 @@ public interface BufferedMutator extends Closeable {
/** /**
* Key to use setting non-default BufferedMutator implementation in Configuration. * Key to use setting non-default BufferedMutator implementation in Configuration.
*/ */
public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname"; String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
/**
* Having the timer tick run more often that once every 100ms is needless and will
* probably cause too many timer events firing having a negative impact on performance.
*/
long MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = 100;
/** /**
* Gets the fully qualified table name instance of the table that this BufferedMutator writes to. * Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
@ -114,6 +119,56 @@ public interface BufferedMutator extends Closeable {
*/ */
void flush() throws IOException; void flush() throws IOException;
/**
* Sets the maximum time before the buffer is automatically flushed checking once per second.
* @param timeoutMs The maximum number of milliseconds how long records may be buffered
* before they are flushed. Set to 0 to disable.
*/
default void setWriteBufferPeriodicFlush(long timeoutMs) {
setWriteBufferPeriodicFlush(timeoutMs, 1000L);
}
/**
* Sets the maximum time before the buffer is automatically flushed.
* @param timeoutMs The maximum number of milliseconds how long records may be buffered
* before they are flushed. Set to 0 to disable.
* @param timerTickMs The number of milliseconds between each check if the
* timeout has been exceeded. Must be 100ms (as defined in
* {@link #MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS})
* or larger to avoid performance problems.
*/
default void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
throw new UnsupportedOperationException(
"The BufferedMutator::setWriteBufferPeriodicFlush has not been implemented");
}
/**
* Disable periodic flushing of the write buffer.
*/
default void disableWriteBufferPeriodicFlush() {
setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
}
/**
* Returns the current periodic flush timeout value in milliseconds.
* @return The maximum number of milliseconds how long records may be buffered before they
* are flushed. The value 0 means this is disabled.
*/
default long getWriteBufferPeriodicFlushTimeoutMs() {
throw new UnsupportedOperationException(
"The BufferedMutator::getWriteBufferPeriodicFlushTimeoutMs has not been implemented");
}
/**
* Returns the current periodic flush timertick interval in milliseconds.
* @return The number of milliseconds between each check if the timeout has been exceeded.
* This value only has a real meaning if the timeout has been set to > 0
*/
default long getWriteBufferPeriodicFlushTimerTickMs() {
throw new UnsupportedOperationException(
"The BufferedMutator::getWriteBufferPeriodicFlushTimerTickMs has not been implemented");
}
/** /**
* Returns the maximum size in bytes of the write buffer for this HTable. * Returns the maximum size in bytes of the write buffer for this HTable.
* <p> * <p>

View File

@ -15,18 +15,20 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -75,6 +77,11 @@ public class BufferedMutatorImpl implements BufferedMutator {
*/ */
private final AtomicInteger undealtMutationCount = new AtomicInteger(0); private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
private final long writeBufferSize; private final long writeBufferSize;
private long writeBufferPeriodicFlushTimeoutMs;
private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
private Timer writeBufferPeriodicFlushTimer = null;
private final int maxKeyValueSize; private final int maxKeyValueSize;
private final ExecutorService pool; private final ExecutorService pool;
private final AtomicInteger rpcTimeout; private final AtomicInteger rpcTimeout;
@ -99,15 +106,34 @@ public class BufferedMutatorImpl implements BufferedMutator {
cleanupPoolOnClose = false; cleanupPoolOnClose = false;
} }
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? this.writeBufferSize =
params.getWriteBufferSize() : tableConf.getWriteBufferSize(); params.getWriteBufferSize() != UNSET ?
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getWriteBufferSize() : tableConf.getWriteBufferSize();
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ? // Set via the setter because it does value validation and starts/stops the TimerTask
params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); long newWriteBufferPeriodicFlushTimeoutMs =
this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ? params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); ? params.getWriteBufferPeriodicFlushTimeoutMs()
: tableConf.getWriteBufferPeriodicFlushTimeoutMs();
long newWriteBufferPeriodicFlushTimerTickMs =
params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
? params.getWriteBufferPeriodicFlushTimerTickMs()
: tableConf.getWriteBufferPeriodicFlushTimerTickMs();
this.setWriteBufferPeriodicFlush(
newWriteBufferPeriodicFlushTimeoutMs,
newWriteBufferPeriodicFlushTimerTickMs);
this.maxKeyValueSize =
params.getMaxKeyValueSize() != UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
this.rpcTimeout = new AtomicInteger(
params.getRpcTimeout() != UNSET ?
params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
this.operationTimeout = new AtomicInteger(
params.getOperationTimeout() != UNSET ?
params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
this.ap = ap; this.ap = ap;
} }
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
@ -161,6 +187,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
++toAddCount; ++toAddCount;
} }
if (currentWriteBufferSize.get() == 0) {
firstRecordInBufferTimestamp = System.currentTimeMillis();
}
// This behavior is highly non-intuitive... it does not protect us against // This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code // 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed. // and setter of hasError are not synchronized. Perhaps it should be removed.
@ -182,6 +212,31 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
} }
@VisibleForTesting
protected long getExecutedWriteBufferPeriodicFlushes() {
return executedWriteBufferPeriodicFlushes;
}
private long firstRecordInBufferTimestamp = 0;
private long executedWriteBufferPeriodicFlushes = 0;
private void timerCallbackForWriteBufferPeriodicFlush() {
if (currentWriteBufferSize.get() == 0) {
return; // Nothing to flush
}
long now = System.currentTimeMillis();
if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
return; // No need to flush yet
}
// The first record in the writebuffer has been in there too long --> flush
try {
executedWriteBufferPeriodicFlushes++;
flush();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
}
}
// validate for well-formedness // validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException { public void validatePut(final Put put) throws IllegalArgumentException {
HTable.validatePut(put, maxKeyValueSize); HTable.validatePut(put, maxKeyValueSize);
@ -193,6 +248,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (this.closed) { if (this.closed) {
return; return;
} }
// Stop any running Periodic Flush timer.
disableWriteBufferPeriodicFlush();
// As we can have an operation in progress even if the buffer is empty, we call // As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time. // backgroundFlushCommits at least one time.
backgroundFlushCommits(true); backgroundFlushCommits(true);
@ -277,7 +336,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
/** /**
* Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}. * Reuse the AsyncProcessTask when calling
* {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
* @param taker access the inner buffer. * @param taker access the inner buffer.
* @return An AsyncProcessTask which always returns the latest rpc and operation timeout. * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
*/ */
@ -309,6 +369,48 @@ public class BufferedMutatorImpl implements BufferedMutator {
return this.writeBufferSize; return this.writeBufferSize;
} }
@Override
public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
// Both parameters have minimal values.
this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs);
this.writeBufferPeriodicFlushTimerTickMs =
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
// If something changed we stop the old Timer.
if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs ||
this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
if (writeBufferPeriodicFlushTimer != null) {
writeBufferPeriodicFlushTimer.cancel();
writeBufferPeriodicFlushTimer = null;
}
}
// If we have the need for a timer and there is none we start it
if (writeBufferPeriodicFlushTimer == null &&
writeBufferPeriodicFlushTimeoutMs > 0) {
writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
@Override
public void run() {
BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
}
}, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
}
}
@Override
public long getWriteBufferPeriodicFlushTimeoutMs() {
return this.writeBufferPeriodicFlushTimeoutMs;
}
@Override
public long getWriteBufferPeriodicFlushTimerTickMs() {
return this.writeBufferPeriodicFlushTimerTickMs;
}
@Override @Override
public void setRpcTimeout(int rpcTimeout) { public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout.set(rpcTimeout); this.rpcTimeout.set(rpcTimeout);

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -34,6 +33,8 @@ public class BufferedMutatorParams implements Cloneable {
private final TableName tableName; private final TableName tableName;
private long writeBufferSize = UNSET; private long writeBufferSize = UNSET;
private long writeBufferPeriodicFlushTimeoutMs = UNSET;
private long writeBufferPeriodicFlushTimerTickMs = UNSET;
private int maxKeyValueSize = UNSET; private int maxKeyValueSize = UNSET;
private ExecutorService pool = null; private ExecutorService pool = null;
private String implementationClassName = null; private String implementationClassName = null;
@ -88,6 +89,30 @@ public class BufferedMutatorParams implements Cloneable {
return this; return this;
} }
public long getWriteBufferPeriodicFlushTimeoutMs() {
return writeBufferPeriodicFlushTimeoutMs;
}
/**
* Set the max timeout before the buffer is automatically flushed.
*/
public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs) {
this.writeBufferPeriodicFlushTimeoutMs = timeoutMs;
return this;
}
public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}
/**
* Set the TimerTick how often the buffer timeout if checked.
*/
public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
return this;
}
public int getMaxKeyValueSize() { public int getMaxKeyValueSize() {
return maxKeyValueSize; return maxKeyValueSize;
} }
@ -154,11 +179,13 @@ public class BufferedMutatorParams implements Cloneable {
@Override @Override
public BufferedMutatorParams clone() { public BufferedMutatorParams clone() {
BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName); BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName);
clone.writeBufferSize = this.writeBufferSize; clone.writeBufferSize = this.writeBufferSize;
clone.maxKeyValueSize = maxKeyValueSize; clone.writeBufferPeriodicFlushTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
clone.pool = this.pool; clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
clone.listener = this.listener; clone.maxKeyValueSize = this.maxKeyValueSize;
clone.implementationClassName = this.implementationClassName; clone.pool = this.pool;
clone.listener = this.listener;
clone.implementationClassName = this.implementationClassName;
return clone; return clone;
} }
} }

View File

@ -30,10 +30,18 @@ public class ConnectionConfiguration {
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS =
"hbase.client.write.buffer.periodicflush.timeout.ms";
public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS =
"hbase.client.write.buffer.periodicflush.timertick.ms";
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
private final long writeBufferSize; private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
private final int metaOperationTimeout; private final int metaOperationTimeout;
private final int operationTimeout; private final int operationTimeout;
private final int scannerCaching; private final int scannerCaching;
@ -56,6 +64,14 @@ public class ConnectionConfiguration {
ConnectionConfiguration(Configuration conf) { ConnectionConfiguration(Configuration conf) {
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT);
this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@ -105,6 +121,8 @@ public class ConnectionConfiguration {
@VisibleForTesting @VisibleForTesting
protected ConnectionConfiguration() { protected ConnectionConfiguration() {
this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
this.writeBufferPeriodicFlushTimeoutMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
this.writeBufferPeriodicFlushTimerTickMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT;
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
@ -133,6 +151,14 @@ public class ConnectionConfiguration {
return writeBufferSize; return writeBufferSize;
} }
public long getWriteBufferPeriodicFlushTimeoutMs() {
return writeBufferPeriodicFlushTimeoutMs;
}
public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}
public int getMetaOperationTimeout() { public int getMetaOperationTimeout() {
return metaOperationTimeout; return metaOperationTimeout;
} }

View File

@ -363,6 +363,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
params.writeBufferSize(connectionConfig.getWriteBufferSize()); params.writeBufferSize(connectionConfig.getWriteBufferSize());
} }
if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
params.setWriteBufferPeriodicFlushTimeoutMs(
connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
}
if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
params.setWriteBufferPeriodicFlushTimerTickMs(
connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
}
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
} }

View File

@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.CategoryBasedTimeout;
@ -1101,6 +1100,119 @@ public class TestAsyncProcess {
Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
} }
@Test
public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
checkPeriodicFlushParameters(conn, ap,
1234, 1234,
1234, 1234);
checkPeriodicFlushParameters(conn, ap,
0, 0,
0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
checkPeriodicFlushParameters(conn, ap,
-1234, 0,
-1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
checkPeriodicFlushParameters(conn, ap,
1, 1,
1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
}
private void checkPeriodicFlushParameters(ClusterConnection conn,
MyAsyncProcess ap,
long setTO, long expectTO,
long setTT, long expectTT
) {
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
// The BufferedMutatorParams does nothing with the value
bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
// The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
// The BufferedMutatorImpl corrects illegal values (direct via setter)
BufferedMutatorImpl ht2 =
new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
ht2.setWriteBufferPeriodicFlush(setTO, setTT);
Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
}
@Test
public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record
BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
// Verify if BufferedMutator has the right settings.
Assert.assertEquals(10000, ht.getWriteBufferSize());
Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
ht.getWriteBufferPeriodicFlushTimerTickMs());
Put put = createPut(1, true);
Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
// ----- Insert, flush immediately, MUST NOT flush automatically
ht.mutate(put);
ht.flush();
Thread.sleep(1000);
Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
// ----- Insert, NO flush, MUST flush automatically
ht.mutate(put);
Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
// The timerTick should fire every 100ms, so after twice that we must have
// seen at least 1 tick and we should see an automatic flush
Thread.sleep(200);
Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
// Ensure it does not flush twice
Thread.sleep(200);
Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
// ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
ht.disableWriteBufferPeriodicFlush();
ht.mutate(put);
Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
// Wait for at least 1 timerTick, we should see NO flushes.
Thread.sleep(200);
Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
// Reenable periodic flushing, a flush seems to take about 1 second
// so we wait for 2 seconds and it should have finished the flush.
ht.setWriteBufferPeriodicFlush(1, 100);
Thread.sleep(2000);
Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
}
@Test @Test
public void testBufferedMutatorImplWithSharedPool() throws Exception { public void testBufferedMutatorImplWithSharedPool() throws Exception {
ClusterConnection conn = createHConnection(); ClusterConnection conn = createHConnection();

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -30,7 +29,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -136,13 +134,21 @@ public class TestBufferedMutatorParams {
BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName)); BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName));
BufferedMutator.ExceptionListener listener = new MockExceptionListener(); BufferedMutator.ExceptionListener listener = new MockExceptionListener();
bmp.writeBufferSize(17).maxKeyValueSize(13).pool(pool).listener(listener); bmp
.writeBufferSize(17)
.setWriteBufferPeriodicFlushTimeoutMs(123)
.setWriteBufferPeriodicFlushTimerTickMs(456)
.maxKeyValueSize(13)
.pool(pool)
.listener(listener);
bmp.implementationClassName("someClassName"); bmp.implementationClassName("someClassName");
BufferedMutatorParams clone = bmp.clone(); BufferedMutatorParams clone = bmp.clone();
// Confirm some literals // Confirm some literals
assertEquals(tableName, clone.getTableName().toString()); assertEquals(tableName, clone.getTableName().toString());
assertEquals(17, clone.getWriteBufferSize()); assertEquals(17, clone.getWriteBufferSize());
assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(13, clone.getMaxKeyValueSize()); assertEquals(13, clone.getMaxKeyValueSize());
assertEquals("someClassName", clone.getImplementationClassName()); assertEquals("someClassName", clone.getImplementationClassName());
@ -168,6 +174,10 @@ public class TestBufferedMutatorParams {
assertEquals(some.getTableName().toString(), assertEquals(some.getTableName().toString(),
clone.getTableName().toString()); clone.getTableName().toString());
assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize()); assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize());
assertEquals(some.getWriteBufferPeriodicFlushTimeoutMs(),
clone.getWriteBufferPeriodicFlushTimeoutMs());
assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize()); assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
assertTrue(some.getListener() == clone.getListener()); assertTrue(some.getListener() == clone.getListener());
assertTrue(some.getPool() == clone.getPool()); assertTrue(some.getPool() == clone.getPool());