HBASE-18358 Backport HBASE-18099 'FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish'

This commit is contained in:
tedyu 2017-07-11 17:26:22 -07:00
parent cc4301ca08
commit c0f743e44f
1 changed files with 25 additions and 5 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver.snapshot; package org.apache.hadoop.hbase.regionserver.snapshot;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -24,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.ProcedureMember;
@ -52,6 +54,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
private final SnapshotSubprocedurePool taskManager; private final SnapshotSubprocedurePool taskManager;
private boolean snapshotSkipFlush = false; private boolean snapshotSkipFlush = false;
// the maximum number of attempts we flush
final static int MAX_RETRIES = 3;
public FlushSnapshotSubprocedure(ProcedureMember member, public FlushSnapshotSubprocedure(ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
List<Region> regions, SnapshotDescription snapshot, List<Region> regions, SnapshotDescription snapshot,
@ -96,11 +101,26 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
LOG.debug("take snapshot without flush memstore first"); LOG.debug("take snapshot without flush memstore first");
} else { } else {
LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
boolean succeeded = false;
long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
for (int i = 0; i < MAX_RETRIES; i++) {
FlushResult res = region.flush(true); FlushResult res = region.flush(true);
if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
// CANNOT_FLUSH may mean that a flush is already on-going // CANNOT_FLUSH may mean that a flush is already on-going
// we need to wait for that flush to complete // we need to wait for that flush to complete
region.waitForFlushes(); region.waitForFlushes();
if (region.getMaxFlushedSeqId() >= readPt) {
// writes at the start of the snapshot have been persisted
succeeded = true;
break;
}
} else {
succeeded = true;
break;
}
}
if (!succeeded) {
throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
} }
} }
((HRegion)region).addRegionToSnapshot(snapshot, monitor); ((HRegion)region).addRegionToSnapshot(snapshot, monitor);