HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations
Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6a0e6fefd3
commit
a6081d30f9
|
@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
|
||||
private final long writeBufferSize;
|
||||
|
||||
private long writeBufferPeriodicFlushTimeoutMs;
|
||||
private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
|
||||
private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0);
|
||||
private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
|
||||
new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
|
||||
private Timer writeBufferPeriodicFlushTimer = null;
|
||||
|
||||
private final int maxKeyValueSize;
|
||||
|
@ -188,7 +189,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
|
||||
if (currentWriteBufferSize.get() == 0) {
|
||||
firstRecordInBufferTimestamp = System.currentTimeMillis();
|
||||
firstRecordInBufferTimestamp.set(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
// This behavior is highly non-intuitive... it does not protect us against
|
||||
|
@ -214,23 +215,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
|
||||
@VisibleForTesting
|
||||
protected long getExecutedWriteBufferPeriodicFlushes() {
|
||||
return executedWriteBufferPeriodicFlushes;
|
||||
return executedWriteBufferPeriodicFlushes.get();
|
||||
}
|
||||
|
||||
private long firstRecordInBufferTimestamp = 0;
|
||||
private long executedWriteBufferPeriodicFlushes = 0;
|
||||
private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
|
||||
private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0);
|
||||
|
||||
private void timerCallbackForWriteBufferPeriodicFlush() {
|
||||
if (currentWriteBufferSize.get() == 0) {
|
||||
return; // Nothing to flush
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
|
||||
if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) {
|
||||
return; // No need to flush yet
|
||||
}
|
||||
// The first record in the writebuffer has been in there too long --> flush
|
||||
try {
|
||||
executedWriteBufferPeriodicFlushes++;
|
||||
executedWriteBufferPeriodicFlushes.incrementAndGet();
|
||||
flush();
|
||||
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
|
||||
LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
|
||||
|
@ -370,18 +371,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
|
||||
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
|
||||
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
|
||||
public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
|
||||
long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get();
|
||||
long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
|
||||
|
||||
// Both parameters have minimal values.
|
||||
this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs);
|
||||
this.writeBufferPeriodicFlushTimerTickMs =
|
||||
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
|
||||
writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
|
||||
writeBufferPeriodicFlushTimerTickMs.set(
|
||||
Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
|
||||
|
||||
// If something changed we stop the old Timer.
|
||||
if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs ||
|
||||
this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
|
||||
if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
|
||||
writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
|
||||
if (writeBufferPeriodicFlushTimer != null) {
|
||||
writeBufferPeriodicFlushTimer.cancel();
|
||||
writeBufferPeriodicFlushTimer = null;
|
||||
|
@ -390,25 +391,26 @@ public class BufferedMutatorImpl implements BufferedMutator {
|
|||
|
||||
// If we have the need for a timer and there is none we start it
|
||||
if (writeBufferPeriodicFlushTimer == null &&
|
||||
writeBufferPeriodicFlushTimeoutMs > 0) {
|
||||
writeBufferPeriodicFlushTimeoutMs.get() > 0) {
|
||||
writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
|
||||
writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
|
||||
}
|
||||
}, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
|
||||
}, writeBufferPeriodicFlushTimerTickMs.get(),
|
||||
writeBufferPeriodicFlushTimerTickMs.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferPeriodicFlushTimeoutMs() {
|
||||
return this.writeBufferPeriodicFlushTimeoutMs;
|
||||
return writeBufferPeriodicFlushTimeoutMs.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getWriteBufferPeriodicFlushTimerTickMs() {
|
||||
return this.writeBufferPeriodicFlushTimerTickMs;
|
||||
return writeBufferPeriodicFlushTimerTickMs.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue