HBASE-13877 Interrupt to flush from TableFlushProcedure causes dataloss in ITBLL

This commit is contained in:
Enis Soztutar 2015-06-14 11:22:07 -07:00
parent 682b8ab8a5
commit c6dd3f965b
11 changed files with 104 additions and 26 deletions

View File

@ -219,7 +219,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
private static final int MISSING_ROWS_TO_LOG = 2; // YARN complains when too many counters
private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
private static final int WIDTH_DEFAULT = 1000000;
private static final int WRAP_DEFAULT = 25;
@ -665,6 +665,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
*/
public static class WALMapperSearcher extends WALMapper {
private SortedSet<byte []> keysToFind;
private AtomicInteger rows = new AtomicInteger(0);
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
@ -686,8 +687,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
boolean b = this.keysToFind.contains(row);
if (b) {
String keyStr = Bytes.toStringBinary(row);
LOG.info("Found cell=" + cell);
context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
try {
LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
} catch (IOException|InterruptedException e) {
LOG.warn(e);
}
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
}
context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
}
return b;
}

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
/**
* This flush region implementation uses the distributed procedure framework to flush

View File

@ -33,7 +33,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
@ -157,7 +159,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf);
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, taskManager);
}
@ -195,13 +197,15 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* failures.
*/
static class FlushTableSubprocedurePool {
private final Abortable abortable;
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean stopped;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
FlushTableSubprocedurePool(String name, Configuration conf) {
FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
@ -259,9 +263,13 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
// we are stopped so we can just exit.
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
Throwable cause = e.getCause();
if (cause instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
throw (ForeignException)e.getCause();
} else if (cause instanceof DroppedSnapshotException) {
// we have to abort the region server according to contract of flush
abortable.abort("Received DroppedSnapshotException, aborting", cause);
}
LOG.warn("Got Exception in FlushSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
@ -272,7 +280,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
/**
* This attempts to cancel out all pending and in progress tasks (interruptions issues)
* This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
* tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
* @throws InterruptedException
*/
void cancelTasks() throws InterruptedException {
@ -289,13 +298,14 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
}
/**
* Abruptly shutdown the thread pool. Call when exiting a region server.
* Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
* interrupted (see HBASE-13877)
*/
void stop() {
if (this.stopped) return;
this.stopped = true;
this.executor.shutdownNow();
this.executor.shutdown();
}
}

View File

@ -475,6 +475,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
* @return true if the memstores were flushed, else false.
*/
@Override
public boolean isFlushSucceeded() {
return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
.FLUSHED_COMPACTION_NEEDED;
@ -484,6 +485,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
* @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
*/
@Override
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
@ -1272,6 +1274,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* vector if already closed and null if judged that it should not close.
*
* @throws IOException e
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public Map<byte[], List<StoreFile>> close() throws IOException {
return close(false);
@ -1309,6 +1314,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* we are not to close at this time or we are already closed.
*
* @throws IOException e
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
@ -1327,6 +1335,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/**
* Exposed for some very specific unit tests.
*/
@VisibleForTesting
public void setClosing(boolean closing) {
this.closing.set(closing);
}
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
throws IOException {
if (isClosed()) {
@ -1826,7 +1842,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
@ -2337,6 +2354,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Bytes.toStringBinary(getRegionInfo().getRegionName()));
dse.initCause(t);
status.abort("Flush failed: " + StringUtils.stringifyException(t));
// Callers for flushcache() should catch DroppedSnapshotException and abort the region server.
// However, since we may have the region read lock, we cannot call close(true) here since
// we cannot promote to a write lock. Instead we are setting closing so that all other region
// operations except for close will be rejected.
this.closing.set(true);
if (rsServices != null) {
// This is a safeguard against the case where the caller fails to explicitly handle aborting
rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
}
throw dse;
}
@ -6407,6 +6436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return results;
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
// Don't need nonces here - RowMutations only supports puts and deletes
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
@ -6433,6 +6463,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* <code>rowsToLock</code> is sorted in order to avoid deadlocks.
* @throws IOException
*/
@Override
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
@ -7436,6 +7467,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** @return the coprocessor host */
@Override
public RegionCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}

View File

@ -1329,6 +1329,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
return MergeRegionsResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}
@ -1741,6 +1744,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
((HRegion)region).forceSplit(splitPoint);
regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
return SplitRegionResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
throw new ServiceException(ex);
} catch (IOException ie) {
throw new ServiceException(ie);
}

View File

@ -147,7 +147,7 @@ public interface Region extends ConfigurationObserver {
*/
long getOldestHfileTs(boolean majorCompactioOnly) throws IOException;
/**
/**
* @return map of column family names to max sequence id that was read from storage when this
* region was opened
*/
@ -168,7 +168,7 @@ public interface Region extends ConfigurationObserver {
///////////////////////////////////////////////////////////////////////////
// Metrics
/** @return read requests count for this region */
long getReadRequestsCount();
@ -192,7 +192,7 @@ public interface Region extends ConfigurationObserver {
/** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL();
/** @return the size of data processed bypassing the WAL, in bytes */
long getDataInMemoryWithoutWAL();
@ -227,7 +227,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
* modifies data.
* modifies data.
* Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@ -237,7 +237,7 @@ public interface Region extends ConfigurationObserver {
/**
* This method needs to be called before any public call that reads or
* modifies data.
* modifies data.
* Acquires a read lock and checks if the region is closing or closed.
* <p>{@link #closeRegionOperation} MUST then always be called after
* the operation has completed, whether it succeeded or failed.
@ -427,7 +427,7 @@ public interface Region extends ConfigurationObserver {
/**
* Perform atomic mutations within the region.
*
*
* @param mutations The list of mutations to perform.
* <code>mutations</code> can contain operations for multiple rows.
* Caller has to ensure that all rows are contained in this region.
@ -623,13 +623,13 @@ public interface Region extends ConfigurationObserver {
CANNOT_FLUSH_MEMSTORE_EMPTY,
CANNOT_FLUSH
}
/** @return the detailed result code */
Result getResult();
/** @return true if the memstores were flushed, else false */
boolean isFlushSucceeded();
/** @return True if the flush requested a compaction, else false */
boolean isCompactionNeeded();
}
@ -653,6 +653,8 @@ public interface Region extends ConfigurationObserver {
*
* @throws IOException general io exceptions
* because a snapshot was not properly persisted.
* @throws DroppedSnapshotException Thrown when abort is required. The caller MUST catch this
* exception and MUST abort. Any further operation to the region may cause data loss.
*/
FlushResult flush(boolean force) throws IOException;
@ -660,7 +662,7 @@ public interface Region extends ConfigurationObserver {
* Synchronously compact all stores in the region.
* <p>This operation could block for a long time, so don't call it from a
* time-sensitive thread.
* <p>Note that no locks are taken to prevent possible conflicts between
* <p>Note that no locks are taken to prevent possible conflicts between
* compaction and splitting activities. The regionserver does not normally compact
* and split in parallel. However by calling this method you may introduce
* unexpected and unhandled concurrency. Don't do this unless you know what

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
@ -93,6 +94,10 @@ class RegionMergeRequest implements Runnable {
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
if (e instanceof DroppedSnapshotException) {
server.abort("Replay of WAL required. Forcing server shutdown", e);
return;
}
try {
LOG.warn("Running rollback/cleanup of failed merge of "
+ region_a +" and "+ region_b + "; " + e.getMessage(), e);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -91,6 +92,10 @@ class SplitRequest implements Runnable {
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
if (e instanceof DroppedSnapshotException) {
server.abort("Replay of WAL required. Forcing server shutdown", e);
return;
}
try {
LOG.info("Running rollback/cleanup of failed split of " +
parent.getRegionInfo().getRegionNameAsString() + "; " + e.getMessage(), e);
@ -148,7 +153,7 @@ class SplitRequest implements Runnable {
try {
this.tableLock.release();
} catch (IOException ex) {
LOG.error("Could not release the table lock (something is really wrong). "
LOG.error("Could not release the table lock (something is really wrong). "
+ "Aborting this server to avoid holding the lock forever.");
this.server.abort("Abort; we got an error when releasing the table lock "
+ "on " + parent.getRegionInfo().getRegionNameAsString());

View File

@ -35,7 +35,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -184,7 +186,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
switch (snapshot.getType()) {
case FLUSH:
SnapshotSubprocedurePool taskManager =
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, snapshot, taskManager);
case SKIPFLUSH:
@ -196,7 +198,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* To minimized the code change, class name is not changed.
*/
SnapshotSubprocedurePool taskManager2 =
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, snapshot, taskManager2);
@ -265,13 +267,15 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* operations such as compactions and replication sinks.
*/
static class SnapshotSubprocedurePool {
private final Abortable abortable;
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean stopped;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
SnapshotSubprocedurePool(String name, Configuration conf) {
SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
@ -331,9 +335,13 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
}
// we are stopped so we can just exit.
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
Throwable cause = e.getCause();
if (cause instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
throw (ForeignException)e.getCause();
} else if (cause instanceof DroppedSnapshotException) {
// we have to abort the region server according to contract of flush
abortable.abort("Received DroppedSnapshotException, aborting", cause);
}
LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
@ -371,7 +379,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
if (this.stopped) return;
this.stopped = true;
this.executor.shutdownNow();
this.executor.shutdown();
}
}

View File

@ -412,6 +412,7 @@ public class TestHRegion {
Assert.fail("Didn't bubble up IOE!");
} catch (DroppedSnapshotException dse) {
// What we are expecting
region.closing.set(false); // this is needed for the rest of the test to work
}
// Make it so all writes succeed from here on out
ffs.fault.set(false);

View File

@ -714,6 +714,8 @@ public class TestWALReplay {
+ t.getMessage());
// simulated to abort server
Mockito.doReturn(true).when(rsServices).isAborted();
region.setClosing(false); // region normally does not accept writes after
// DroppedSnapshotException. We mock around it for this test.
}
// writing more data
int moreRow = 10;