HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Niels Basjes 2017-12-31 11:58:24 +01:00 committed by Chia-Ping Tsai
parent 99399cdeef
commit 0dcbba1563
1 changed files with 22 additions and 20 deletions

View File

@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final AtomicInteger undealtMutationCount = new AtomicInteger(0); private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
private final long writeBufferSize; private final long writeBufferSize;
private long writeBufferPeriodicFlushTimeoutMs; private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
private Timer writeBufferPeriodicFlushTimer = null; private Timer writeBufferPeriodicFlushTimer = null;
private final int maxKeyValueSize; private final int maxKeyValueSize;
@ -188,7 +189,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
if (currentWriteBufferSize.get() == 0) { if (currentWriteBufferSize.get() == 0) {
firstRecordInBufferTimestamp = System.currentTimeMillis(); firstRecordInBufferTimestamp.set(System.currentTimeMillis());
} }
// This behavior is highly non-intuitive... it does not protect us against // This behavior is highly non-intuitive... it does not protect us against
@ -214,23 +215,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
@VisibleForTesting @VisibleForTesting
protected long getExecutedWriteBufferPeriodicFlushes() { protected long getExecutedWriteBufferPeriodicFlushes() {
return executedWriteBufferPeriodicFlushes; return executedWriteBufferPeriodicFlushes.get();
} }
private long firstRecordInBufferTimestamp = 0; private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
private long executedWriteBufferPeriodicFlushes = 0; private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
private void timerCallbackForWriteBufferPeriodicFlush() { private void timerCallbackForWriteBufferPeriodicFlush() {
if (currentWriteBufferSize.get() == 0) { if (currentWriteBufferSize.get() == 0) {
return; // Nothing to flush return; // Nothing to flush
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) { if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
return; // No need to flush yet return; // No need to flush yet
} }
// The first record in the writebuffer has been in there too long --> flush // The first record in the writebuffer has been in there too long --> flush
try { try {
executedWriteBufferPeriodicFlushes++; executedWriteBufferPeriodicFlushes.incrementAndGet();
flush(); flush();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
@ -370,18 +371,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
} }
@Override @Override
public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs; long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get();
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
// Both parameters have minimal values. // Both parameters have minimal values.
this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs); writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
this.writeBufferPeriodicFlushTimerTickMs = writeBufferPeriodicFlushTimerTickMs.set(
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs); Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
// If something changed we stop the old Timer. // If something changed we stop the old Timer.
if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs || if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) { writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
if (writeBufferPeriodicFlushTimer != null) { if (writeBufferPeriodicFlushTimer != null) {
writeBufferPeriodicFlushTimer.cancel(); writeBufferPeriodicFlushTimer.cancel();
writeBufferPeriodicFlushTimer = null; writeBufferPeriodicFlushTimer = null;
@ -390,25 +391,26 @@ public class BufferedMutatorImpl implements BufferedMutator {
// If we have the need for a timer and there is none we start it // If we have the need for a timer and there is none we start it
if (writeBufferPeriodicFlushTimer == null && if (writeBufferPeriodicFlushTimer == null &&
writeBufferPeriodicFlushTimeoutMs > 0) { writeBufferPeriodicFlushTimeoutMs.get() > 0) {
writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
writeBufferPeriodicFlushTimer.schedule(new TimerTask() { writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
} }
}, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs); }, writeBufferPeriodicFlushTimerTickMs.get(),
writeBufferPeriodicFlushTimerTickMs.get());
} }
} }
@Override @Override
public long getWriteBufferPeriodicFlushTimeoutMs() { public long getWriteBufferPeriodicFlushTimeoutMs() {
return this.writeBufferPeriodicFlushTimeoutMs; return writeBufferPeriodicFlushTimeoutMs.get();
} }
@Override @Override
public long getWriteBufferPeriodicFlushTimerTickMs() { public long getWriteBufferPeriodicFlushTimerTickMs() {
return this.writeBufferPeriodicFlushTimerTickMs; return writeBufferPeriodicFlushTimerTickMs.get();
} }
@Override @Override