HBASE-18906 Provide Region#waitForFlushes API.

This commit is contained in:
anoopsamjohn 2017-10-27 11:53:25 +05:30
parent 090c10f2a4
commit 15b32460f9
2 changed files with 27 additions and 8 deletions

View File

@ -1768,30 +1768,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/** Wait for all current flushes of the region to complete
/**
* Wait for all current flushes of the region to complete
*/
// TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for
// Phoenix needs.
public void waitForFlushes() {
waitForFlushes(0);// Unbound wait
}
@Override
public boolean waitForFlushes(long timeout) {
synchronized (writestate) {
if (this.writestate.readOnly) {
// we should not wait for replayed flushed if we are read only (for example in case the
// region is a secondary replica).
return;
return true;
}
if (!writestate.flushing) return;
if (!writestate.flushing) return true;
long start = System.currentTimeMillis();
long duration = 0;
boolean interrupted = false;
LOG.debug("waiting for cache flush to complete for region " + this);
try {
while (writestate.flushing) {
LOG.debug("waiting for cache flush to complete for region " + this);
if (timeout > 0 && duration >= timeout) break;
try {
writestate.wait();
long toWait = timeout == 0 ? 0 : (timeout - duration);
writestate.wait(toWait);
} catch (InterruptedException iex) {
// essentially ignore and propagate the interrupt back up
LOG.warn("Interrupted while waiting");
interrupted = true;
break;
} finally {
duration = System.currentTimeMillis() - start;
}
}
} finally {
@ -1799,10 +1808,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Thread.currentThread().interrupt();
}
}
long duration = System.currentTimeMillis() - start;
LOG.debug("Waited " + duration + " ms for flush to complete");
return !(writestate.flushing);
}
}
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());

View File

@ -473,4 +473,13 @@ public interface Region extends ConfigurationObserver {
* Request flush on this region.
*/
void requestFlush(FlushLifeCycleTracker tracker) throws IOException;
/**
* Wait for all current flushes of the region to complete
*
* @param timeout The maximum time to wait in milliseconds.
* @return False when timeout elapsed but flushes are not over. True when flushes are over within
* max wait time period.
*/
boolean waitForFlushes(long timeout);
}