HBASE-7651 RegionServerSnapshotManager fails with CancellationException if previous snapshot fails in per region task

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445862 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 19:05:55 +00:00
parent 69523a5ef8
commit 6522e34e10
2 changed files with 69 additions and 56 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
/**
* This online snapshot implementation forces uses the distributed procedure framework to force a
@ -74,6 +75,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
// snapshots that involve multiple regions and regionservers. It is still possible to have
// an interleaving such that globally regions are missing, so we still need the verification
// step.
LOG.debug("Starting region operation on " + region);
region.startRegionOperation();
try {
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
@ -81,6 +83,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
region.addRegionToSnapshot(snapshot, monitor);
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
} finally {
LOG.debug("Closing region operation on " + region);
region.closeRegionOperation();
}
return null;
@ -95,6 +98,13 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
monitor.rethrowException();
// assert that the taskManager is empty.
if (taskManager.hasTasks()) {
throw new IllegalStateException("Attempting to take snapshot "
+ SnapshotDescriptionUtils.toString(snapshot)
+ " but we have currently have outstanding tasks");
}
// Add all hfiles already existing in region.
for (HRegion region : regions) {
// submit one task per region for parallelize by region.
@ -104,7 +114,11 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
// wait for everything to complete.
LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
try {
taskManager.waitForOutstandingTasks();
} catch (InterruptedException e) {
throw new ForeignException(getMemberName(), e);
}
}
/**
@ -128,9 +142,13 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
*/
@Override
public void cleanup(Exception e) {
LOG.info("Aborting all log roll online snapshot subprocedure task threads for '"
LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
+ snapshot.getName() + "' due to error", e);
try {
taskManager.cancelTasks();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
/**

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.snapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
@ -66,7 +66,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
* <p>
* On startup, requires {@link #start()} to be called.
* <p>
* On shutdown, requires {@link #stop(boolean)} to be called
* On shutdown, requires {@link #stop()} to be called
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -95,26 +95,23 @@ public class RegionServerSnapshotManager {
private final RegionServerServices rss;
private final ProcedureMemberRpcs memberRpcs;
private final ProcedureMember member;
private final SnapshotSubprocedurePool taskManager;
/**
* Exposed for testing.
* @param conf
* @param conf HBase configuration.
* @param parent parent running the snapshot handler
* @param controller use a custom snapshot controller
* @param cohortMember use a custom cohort member
* @param memberRpc use specified memberRpc instance
* @param procMember use specified ProcedureMember
*/
RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
ProcedureMemberRpcs controller, ProcedureMember cohortMember) {
ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
this.rss = parent;
this.memberRpcs = controller;
this.member = cohortMember;
// read in the snapshot request configuration properties
taskManager = new SnapshotSubprocedurePool(parent, conf);
this.memberRpcs = memberRpc;
this.member = procMember;
}
/**
* Create a default snapshot handler - uses a zookeeper based cohort controller.
* Create a default snapshot handler - uses a zookeeper based member controller.
* @param rss region server running the handler
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
@ -135,9 +132,6 @@ public class RegionServerSnapshotManager {
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads, nodeName);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
// setup the task manager to run all the snapshots tasks
taskManager = new SnapshotSubprocedurePool(rss, conf);
}
/**
@ -156,12 +150,6 @@ public class RegionServerSnapshotManager {
String mode = force ? "abruptly" : "gracefully";
LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
if (force) {
this.taskManager.stop();
} else {
this.taskManager.shutdown();
}
try {
this.member.close();
} finally {
@ -201,14 +189,19 @@ public class RegionServerSnapshotManager {
// expects participation in the procedure and without sending message the snapshot attempt
// will hang and fail.
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " + snapshot.getTable());
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
+ snapshot.getTable());
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
Configuration conf = rss.getConfiguration();
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
switch (snapshot.getType()) {
case FLUSH:
SnapshotSubprocedurePool taskManager =
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, snapshot, taskManager);
default:
@ -274,24 +267,28 @@ public class RegionServerSnapshotManager {
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
SnapshotSubprocedurePool(Server parent, Configuration conf) {
SnapshotSubprocedurePool(String name, Configuration conf) {
// configure the executor service
long keepAlive = conf.getLong(
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = parent.getServerName().toString();
this.name = name;
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
+ name + ")-snapshot-pool"));
taskPool = new ExecutorCompletionService<Void>(executor);
}
boolean hasTasks() {
return futures.size() != 0;
}
/**
* Submit a task to the pool.
*
* NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
* version does not support issuing tasks from multiple concurrnty table snapshots requests.
* version does not support issuing tasks from multiple concurrent table snapshots requests.
*/
void submitTask(final Callable<Void> task) {
Future<Void> f = this.taskPool.submit(task);
@ -302,18 +299,16 @@ public class RegionServerSnapshotManager {
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
* This *must* be called to after all tasks are submitted via submitTask.
*
* TODO: to support multiple concurrent snapshots this code needs to be rewritten. The common
* pool of futures not being thread safe is part of the problem.
*
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
* @throws InterruptedException
* @throws SnapshotCreationException if the snapshot failed while we were waiting
*/
boolean waitForOutstandingTasks() throws ForeignException {
boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
LOG.debug("Waiting for local region snapshots to finish.");
int sz = futures.size();
try {
// Using the completion service to process the futures that finish first first.
int sz = futures.size();
for (int i = 0; i < sz; i++) {
Future<Void> f = taskPool.take();
f.get();
@ -325,48 +320,48 @@ public class RegionServerSnapshotManager {
LOG.debug("Completed " + sz + " local region snapshots.");
return true;
} catch (InterruptedException e) {
// TODO this isn't completely right and needs to be revisited.
LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
if (stopped) {
if (!stopped) {
Thread.currentThread().interrupt();
throw new ForeignException("SnapshotSubprocedurePool", e);
}
Thread.currentThread().interrupt();
// we are stopped so we can just exit.
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
throw (ForeignException)e.getCause();
}
LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
throw new ForeignException(name, e.getCause());
} finally {
// close off remaining tasks
for (Future<Void> f: futures) {
if (!f.isDone()){
LOG.warn("cancelling region task");
f.cancel(true);
}
}
futures.clear();
cancelTasks();
}
return false;
}
/**
* This attempts to cancel out all pending and in progress tasks (interruptions issues)
* @throws InterruptedException
*/
void cancelTasks() {
for (Future<Void> f: futures) {
void cancelTasks() throws InterruptedException {
Collection<Future<Void>> tasks = futures;
LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
for (Future<Void> f: tasks) {
f.cancel(true);
}
// evict remaining tasks and futures from taskPool.
LOG.debug(taskPool);
while (!futures.isEmpty()) {
// block to remove cancelled futures;
LOG.warn("Removing cancelled elements from taskPool");
futures.remove(taskPool.take());
}
stop();
}
/**
* This politely shutsdown the thread pool. Call when gracefully exiting a region server.
*/
void shutdown() {
this.executor.shutdown();
}
/**
* This abruptly shutsdown the thread pool. Call when exiting a region server.
* Abruptly shutdown the thread pool. Call when exiting a region server.
*/
void stop() {
if (this.stopped) return;