HBASE-8706 Some improvement in snapshot (binlijin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1492398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2013-06-12 19:50:00 +00:00
parent 3b18accd8e
commit ba8801c2ff
9 changed files with 85 additions and 31 deletions

View File

@ -114,7 +114,7 @@ public class SnapshotManager implements Stoppable {
private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
/** By default, check to see if the snapshot is complete (ms) */
private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
/**
* Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
@ -132,7 +132,6 @@ public class SnapshotManager implements Stoppable {
private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
private boolean stopped;
private final long wakeFrequency;
private final MasterServices master; // Needed by TableEventHandlers
private final MetricsMaster metricsMaster;
private final ProcedureCoordinator coordinator;
@ -169,16 +168,17 @@ public class SnapshotManager implements Stoppable {
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency);
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
this.coordinator = new ProcedureCoordinator(comms, tpool);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
}
@ -198,8 +198,6 @@ public class SnapshotManager implements Stoppable {
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
SNAPSHOT_WAKE_MILLIS_DEFAULT);
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
@ -871,6 +869,11 @@ public class SnapshotManager implements Stoppable {
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
restoreHandler.cancel(why);
}
try {
coordinator.close();
} catch (IOException e) {
LOG.error("stop ProcedureCoordinator error", e);
}
}
@Override

View File

@ -102,7 +102,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
this.rootDir = this.master.getMasterFileSystem().getRootDir();
this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
this.monitor = new ForeignExceptionDispatcher();
this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
this.tableLockManager = master.getTableLockManager();
this.tableLock = this.tableLockManager.writeLock(Bytes.toBytes(snapshot.getTable())
@ -168,6 +168,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
// run the snapshot
snapshotRegions(regionsAndLocations);
monitor.rethrowException();
// extract each pair to separate lists
Set<String> serverNames = new HashSet<String>();

View File

@ -51,11 +51,14 @@ import com.google.common.collect.MapMaker;
public class ProcedureCoordinator {
private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
final static long TIMEOUT_MILLIS_DEFAULT = 60000;
final static long WAKE_MILLIS_DEFAULT = 500;
private final ProcedureCoordinatorRpcs rpcs;
private final ExecutorService pool;
private final long wakeTimeMillis;
private final long timeoutMillis;
// Running procedure table. Maps procedure name to running procedure reference
private final ConcurrentMap<String, Procedure> procedures =
@ -71,6 +74,23 @@ public class ProcedureCoordinator {
* @param pool Used for executing procedures.
*/
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
}
/**
* Create and start a ProcedureCoordinator.
*
* The rpc object registers the ProcedureCoordinator and starts any threads in
* this constructor.
*
* @param rpcs
* @param pool Used for executing procedures.
* @param timeoutMillis
*/
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
long timeoutMillis, long wakeTimeMillis) {
this.timeoutMillis = timeoutMillis;
this.wakeTimeMillis = wakeTimeMillis;
this.rpcs = rpcs;
this.pool = pool;
this.rpcs.start(this);
@ -78,10 +98,24 @@ public class ProcedureCoordinator {
/**
* Default thread pool for the procedure
*
* @param coordName
* @param opThreads the maximum number of threads to allow in the pool
*/
public static ThreadPoolExecutor defaultPool(String coordName, long keepAliveTime, int opThreads,
long wakeFrequency) {
return new ThreadPoolExecutor(1, opThreads, keepAliveTime, TimeUnit.SECONDS,
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
}
/**
* Default thread pool for the procedure
*
* @param coordName
* @param opThreads the maximum number of threads to allow in the pool
* @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
*/
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
long keepAliveMillis) {
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
}
@ -194,7 +228,7 @@ public class ProcedureCoordinator {
Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) {
// build the procedure
return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, TIMEOUT_MILLIS_DEFAULT,
return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
procName, procArgs, expectedMembers);
}

View File

@ -51,6 +51,8 @@ import com.google.common.collect.MapMaker;
public class ProcedureMember implements Closeable {
private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
private final SubprocedureFactory builder;
private final ProcedureMemberRpcs rpcs;
@ -72,9 +74,26 @@ public class ProcedureMember implements Closeable {
this.builder = factory;
}
public static ThreadPoolExecutor defaultPool(long wakeFrequency, long keepAlive,
int procThreads, String memberName) {
return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
/**
* Default thread pool for the procedure
*
* @param memberName
* @param procThreads the maximum number of threads to allow in the pool
*/
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
}
/**
* Default thread pool for the procedure
*
* @param memberName
* @param procThreads the maximum number of threads to allow in the pool
* @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
*/
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
long keepAliveMillis) {
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
}
@ -85,7 +104,7 @@ public class ProcedureMember implements Closeable {
* @return reference to the Procedure member's rpcs object
*/
ProcedureMemberRpcs getRpcs() {
return rpcs;
return rpcs;
}

View File

@ -24,9 +24,9 @@ import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
/**
@ -198,7 +198,6 @@ abstract public class Subprocedure implements Callable<Void> {
} else {
msg = "Subprocedure '" + barrierName + "' failed!";
}
LOG.error(msg , e);
cancel(msg, e);
LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -125,12 +125,11 @@ public class RegionServerSnapshotManager {
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads, nodeName);
ThreadPoolExecutor pool = ProcedureMember.defaultPool(nodeName, opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
@ -191,7 +190,7 @@ public class RegionServerSnapshotManager {
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
+ snapshot.getTable());
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
Configuration conf = rss.getConfiguration();
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
@ -355,7 +354,6 @@ public class RegionServerSnapshotManager {
}
// evict remaining tasks and futures from taskPool.
LOG.debug(taskPool);
while (!futures.isEmpty()) {
// block to remove cancelled futures;
LOG.warn("Removing cancelled elements from taskPool");

View File

@ -86,7 +86,7 @@ public class TestProcedureCoordinator {
}
private ProcedureCoordinator buildNewCoordinator() {
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
return spy(new ProcedureCoordinator(controller, pool));
}

View File

@ -88,7 +88,7 @@ public class TestProcedureMember {
*/
private ProcedureMember buildCohortMember() {
String name = "node";
ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
return new ProcedureMember(mockMemberComms, pool, mockBuilder);
}
@ -98,7 +98,7 @@ public class TestProcedureMember {
private void buildCohortMemberPair() throws IOException {
dispatcher = new ForeignExceptionDispatcher();
String name = "node";
ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
Subprocedure subproc = new EmptySubprocedure(member, dispatcher);

View File

@ -128,7 +128,7 @@ public class TestZKProcedure {
// start running the controller
ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
coordZkw, opDescription, COORDINATOR_NODE_NAME);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
@Override
public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
@ -146,7 +146,7 @@ public class TestZKProcedure {
for (String member : members) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
comms.start(procMember);
@ -210,7 +210,7 @@ public class TestZKProcedure {
ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
// start a member for each node
@ -220,7 +220,7 @@ public class TestZKProcedure {
for (String member : expected) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
controller.start(mem);