From 0dcbba156359298d21cf0aca36f37a989606e0cb Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Sun, 31 Dec 2017 11:58:24 +0100 Subject: [PATCH] HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations Signed-off-by: Chia-Ping Tsai --- .../hbase/client/BufferedMutatorImpl.java | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 13b1a811039..b171fc404d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator { private final AtomicInteger undealtMutationCount = new AtomicInteger(0); private final long writeBufferSize; - private long writeBufferPeriodicFlushTimeoutMs; - private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS; + private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0); + private final AtomicLong writeBufferPeriodicFlushTimerTickMs = + new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); private Timer writeBufferPeriodicFlushTimer = null; private final int maxKeyValueSize; @@ -188,7 +189,7 @@ public class BufferedMutatorImpl implements BufferedMutator { } if (currentWriteBufferSize.get() == 0) { - firstRecordInBufferTimestamp = System.currentTimeMillis(); + firstRecordInBufferTimestamp.set(System.currentTimeMillis()); } // This behavior is highly non-intuitive... it does not protect us against @@ -214,23 +215,23 @@ public class BufferedMutatorImpl implements BufferedMutator { @VisibleForTesting protected long getExecutedWriteBufferPeriodicFlushes() { - return executedWriteBufferPeriodicFlushes; + return executedWriteBufferPeriodicFlushes.get(); } - private long firstRecordInBufferTimestamp = 0; - private long executedWriteBufferPeriodicFlushes = 0; + private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0); + private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0); private void timerCallbackForWriteBufferPeriodicFlush() { if (currentWriteBufferSize.get() == 0) { return; // Nothing to flush } long now = System.currentTimeMillis(); - if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) { + if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) { return; // No need to flush yet } // The first record in the writebuffer has been in there too long --> flush try { - executedWriteBufferPeriodicFlushes++; + executedWriteBufferPeriodicFlushes.incrementAndGet(); flush(); } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); @@ -370,18 +371,18 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { - long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs; - long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; + public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { + long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get(); + long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get(); // Both parameters have minimal values. - this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs); - this.writeBufferPeriodicFlushTimerTickMs = - Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs); + writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs)); + writeBufferPeriodicFlushTimerTickMs.set( + Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs)); // If something changed we stop the old Timer. - if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs || - this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) { + if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs || + writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) { if (writeBufferPeriodicFlushTimer != null) { writeBufferPeriodicFlushTimer.cancel(); writeBufferPeriodicFlushTimer = null; @@ -390,25 +391,26 @@ public class BufferedMutatorImpl implements BufferedMutator { // If we have the need for a timer and there is none we start it if (writeBufferPeriodicFlushTimer == null && - writeBufferPeriodicFlushTimeoutMs > 0) { + writeBufferPeriodicFlushTimeoutMs.get() > 0) { writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. writeBufferPeriodicFlushTimer.schedule(new TimerTask() { @Override public void run() { BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); } - }, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs); + }, writeBufferPeriodicFlushTimerTickMs.get(), + writeBufferPeriodicFlushTimerTickMs.get()); } } @Override public long getWriteBufferPeriodicFlushTimeoutMs() { - return this.writeBufferPeriodicFlushTimeoutMs; + return writeBufferPeriodicFlushTimeoutMs.get(); } @Override public long getWriteBufferPeriodicFlushTimerTickMs() { - return this.writeBufferPeriodicFlushTimerTickMs; + return writeBufferPeriodicFlushTimerTickMs.get(); } @Override