HBASE-18906 Provide Region#waitForFlushes API.

This commit is contained in:
anoopsamjohn 2017-10-27 11:53:25 +05:30
parent bdcdb5cf9a
commit 8e6d116ae3
2 changed files with 27 additions and 8 deletions

View File

@ -1768,30 +1768,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
/** Wait for all current flushes of the region to complete /**
* Wait for all current flushes of the region to complete
*/ */
// TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for
// Phoenix needs.
public void waitForFlushes() { public void waitForFlushes() {
waitForFlushes(0);// Unbound wait
}
@Override
public boolean waitForFlushes(long timeout) {
synchronized (writestate) { synchronized (writestate) {
if (this.writestate.readOnly) { if (this.writestate.readOnly) {
// we should not wait for replayed flushed if we are read only (for example in case the // we should not wait for replayed flushed if we are read only (for example in case the
// region is a secondary replica). // region is a secondary replica).
return; return true;
} }
if (!writestate.flushing) return; if (!writestate.flushing) return true;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
long duration = 0;
boolean interrupted = false; boolean interrupted = false;
LOG.debug("waiting for cache flush to complete for region " + this);
try { try {
while (writestate.flushing) { while (writestate.flushing) {
LOG.debug("waiting for cache flush to complete for region " + this); if (timeout > 0 && duration >= timeout) break;
try { try {
writestate.wait(); long toWait = timeout == 0 ? 0 : (timeout - duration);
writestate.wait(toWait);
} catch (InterruptedException iex) { } catch (InterruptedException iex) {
// essentially ignore and propagate the interrupt back up // essentially ignore and propagate the interrupt back up
LOG.warn("Interrupted while waiting"); LOG.warn("Interrupted while waiting");
interrupted = true; interrupted = true;
break; break;
} finally {
duration = System.currentTimeMillis() - start;
} }
} }
} finally { } finally {
@ -1799,10 +1808,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
long duration = System.currentTimeMillis() - start;
LOG.debug("Waited " + duration + " ms for flush to complete"); LOG.debug("Waited " + duration + " ms for flush to complete");
return !(writestate.flushing);
} }
} }
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) { final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount()); int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());

View File

@ -473,4 +473,13 @@ public interface Region extends ConfigurationObserver {
* Request flush on this region. * Request flush on this region.
*/ */
void requestFlush(FlushLifeCycleTracker tracker) throws IOException; void requestFlush(FlushLifeCycleTracker tracker) throws IOException;
/**
* Wait for all current flushes of the region to complete
*
* @param timeout The maximum time to wait in milliseconds.
* @return False when timeout elapsed but flushes are not over. True when flushes are over within
* max wait time period.
*/
boolean waitForFlushes(long timeout);
} }