HBASE-18451 PeriodicMemstoreFlusher should inspect the queue before adding a delayed flush request, fix logging
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
8dea600795
commit
e26a6e0e10
|
@ -32,8 +32,9 @@ public interface FlushRequester {
|
||||||
* @param region the Region requesting the cache flush
|
* @param region the Region requesting the cache flush
|
||||||
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
||||||
* rolling.
|
* rolling.
|
||||||
|
* @return true if our region is added into the queue, false otherwise
|
||||||
*/
|
*/
|
||||||
void requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
|
boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tell the listener the cache needs to be flushed after a delay
|
* Tell the listener the cache needs to be flushed after a delay
|
||||||
|
@ -42,8 +43,9 @@ public interface FlushRequester {
|
||||||
* @param delay after how much time should the flush happen
|
* @param delay after how much time should the flush happen
|
||||||
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
|
||||||
* rolling.
|
* rolling.
|
||||||
|
* @return true if our region is added into the queue, false otherwise
|
||||||
*/
|
*/
|
||||||
void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
|
boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a FlushRequestListener
|
* Register a FlushRequestListener
|
||||||
|
|
|
@ -1753,14 +1753,14 @@ public class HRegionServer extends HasThread implements
|
||||||
FlushRequester requester = server.getFlushRequester();
|
FlushRequester requester = server.getFlushRequester();
|
||||||
if (requester != null) {
|
if (requester != null) {
|
||||||
long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME;
|
long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME;
|
||||||
LOG.info(getName() + " requesting flush of " +
|
|
||||||
r.getRegionInfo().getRegionNameAsString() + " because " +
|
|
||||||
whyFlush.toString() +
|
|
||||||
" after random delay " + randomDelay + "ms");
|
|
||||||
//Throttle the flushes by putting a delay. If we don't throttle, and there
|
//Throttle the flushes by putting a delay. If we don't throttle, and there
|
||||||
//is a balanced write-load on the regions in a table, we might end up
|
//is a balanced write-load on the regions in a table, we might end up
|
||||||
//overwhelming the filesystem with too many flushes at once.
|
//overwhelming the filesystem with too many flushes at once.
|
||||||
requester.requestDelayedFlush(r, randomDelay, false);
|
if (requester.requestDelayedFlush(r, randomDelay, false)) {
|
||||||
|
LOG.info("{} requesting flush of {} because {} after random delay {} ms",
|
||||||
|
getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
|
||||||
|
randomDelay);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -455,8 +455,8 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
|
public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
|
||||||
r.incrementFlushesQueuedCount();
|
FlushLifeCycleTracker tracker) {
|
||||||
synchronized (regionsInQueue) {
|
synchronized (regionsInQueue) {
|
||||||
if (!regionsInQueue.containsKey(r)) {
|
if (!regionsInQueue.containsKey(r)) {
|
||||||
// This entry has no delay so it will be added at the top of the flush
|
// This entry has no delay so it will be added at the top of the flush
|
||||||
|
@ -464,15 +464,17 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
|
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
|
||||||
this.regionsInQueue.put(r, fqe);
|
this.regionsInQueue.put(r, fqe);
|
||||||
this.flushQueue.add(fqe);
|
this.flushQueue.add(fqe);
|
||||||
|
r.incrementFlushesQueuedCount();
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
tracker.notExecuted("Flush already requested on " + r);
|
tracker.notExecuted("Flush already requested on " + r);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
|
public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
|
||||||
r.incrementFlushesQueuedCount();
|
|
||||||
synchronized (regionsInQueue) {
|
synchronized (regionsInQueue) {
|
||||||
if (!regionsInQueue.containsKey(r)) {
|
if (!regionsInQueue.containsKey(r)) {
|
||||||
// This entry has some delay
|
// This entry has some delay
|
||||||
|
@ -481,7 +483,10 @@ class MemStoreFlusher implements FlushRequester {
|
||||||
fqe.requeue(delay);
|
fqe.requeue(delay);
|
||||||
this.regionsInQueue.put(r, fqe);
|
this.regionsInQueue.put(r, fqe);
|
||||||
this.flushQueue.add(fqe);
|
this.flushQueue.add(fqe);
|
||||||
|
r.incrementFlushesQueuedCount();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -758,14 +758,15 @@ public class TestHeapMemoryManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestFlush(HRegion region, boolean forceFlushAllStores,
|
public boolean requestFlush(HRegion region, boolean forceFlushAllStores,
|
||||||
FlushLifeCycleTracker tracker) {
|
FlushLifeCycleTracker tracker) {
|
||||||
this.listener.flushRequested(flushType, region);
|
this.listener.flushRequested(flushType, region);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
|
public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1109,16 +1109,18 @@ public abstract class AbstractTestWALReplay {
|
||||||
private HRegion r;
|
private HRegion r;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
|
public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
|
||||||
try {
|
try {
|
||||||
r.flush(force);
|
r.flush(force);
|
||||||
|
return true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("Exception flushing", e);
|
throw new RuntimeException("Exception flushing", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
|
public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue