From 15b32460f9484fc0c635172c81b6a6315283775a Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Fri, 27 Oct 2017 11:53:25 +0530 Subject: [PATCH] HBASE-18906 Provide Region#waitForFlushes API. --- .../hadoop/hbase/regionserver/HRegion.java | 26 +++++++++++++------ .../hadoop/hbase/regionserver/Region.java | 9 +++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f0c9ec28f53..3cb0e549051 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1768,30 +1768,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** Wait for all current flushes of the region to complete + /** + * Wait for all current flushes of the region to complete */ - // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for - // Phoenix needs. public void waitForFlushes() { + waitForFlushes(0);// Unbound wait + } + + @Override + public boolean waitForFlushes(long timeout) { synchronized (writestate) { if (this.writestate.readOnly) { // we should not wait for replayed flushed if we are read only (for example in case the // region is a secondary replica). - return; + return true; } - if (!writestate.flushing) return; + if (!writestate.flushing) return true; long start = System.currentTimeMillis(); + long duration = 0; boolean interrupted = false; + LOG.debug("waiting for cache flush to complete for region " + this); try { while (writestate.flushing) { - LOG.debug("waiting for cache flush to complete for region " + this); + if (timeout > 0 && duration >= timeout) break; try { - writestate.wait(); + long toWait = timeout == 0 ? 0 : (timeout - duration); + writestate.wait(toWait); } catch (InterruptedException iex) { // essentially ignore and propagate the interrupt back up LOG.warn("Interrupted while waiting"); interrupted = true; break; + } finally { + duration = System.currentTimeMillis() - start; } } } finally { @@ -1799,10 +1808,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Thread.currentThread().interrupt(); } } - long duration = System.currentTimeMillis() - start; LOG.debug("Waited " + duration + " ms for flush to complete"); + return !(writestate.flushing); } } + protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index c0827cbf04e..a5bad1acb80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -473,4 +473,13 @@ public interface Region extends ConfigurationObserver { * Request flush on this region. */ void requestFlush(FlushLifeCycleTracker tracker) throws IOException; + + /** + * Wait for all current flushes of the region to complete + * + * @param timeout The maximum time to wait in milliseconds. + * @return False when timeout elapsed but flushes are not over. True when flushes are over within + * max wait time period. + */ + boolean waitForFlushes(long timeout); }