HBASE-6977 Multithread processing ZK assignment events

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1402226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-10-25 17:32:08 +00:00
parent c9073c4631
commit d70e3d8180
3 changed files with 133 additions and 112 deletions

View File

@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.KVComparator;

View File

@ -32,7 +32,8 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -76,7 +77,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -98,7 +98,7 @@ public class AssignmentManager extends ZooKeeperListener {
public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME, public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
-1, -1L); -1, -1L);
protected Server server; protected final Server server;
private ServerManager serverManager; private ServerManager serverManager;
@ -145,6 +145,9 @@ public class AssignmentManager extends ZooKeeperListener {
//Thread pool executor service for timeout monitor //Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService; private java.util.concurrent.ExecutorService threadPoolExecutorService;
// A bunch of ZK events workers. Each is a single thread executor service
private java.util.concurrent.ExecutorService[] zkEventWorkers;
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{ private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED }); EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
@ -161,18 +164,6 @@ public class AssignmentManager extends ZooKeeperListener {
*/ */
final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
// A temp ZK watcher for bulk assigner to avoid deadlock,
// will be removed in HBASE-6977
//
// A separate ZK watcher used for async ZK node offline.
// We can't use that exiting one because it could lead to
// deadlocks if its event thread asks for a locker held by a bulk
// assigner thread. This watcher is just for async ZK node offline.
// In HBASE-6977, we are going to process assignment ZK events
// outside of ZK event thread, so there won't be deadlock
// threat anymore. That's when this watcher to be removed.
private final ZooKeeperWatcher asyncOfflineZKWatcher;
/** /**
* Constructs a new assignment manager. * Constructs a new assignment manager.
* *
@ -206,13 +197,20 @@ public class AssignmentManager extends ZooKeeperListener {
this.maximumAttempts = this.maximumAttempts =
this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer; this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool(); int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am"));
this.masterMetrics = metrics;// can be null only with tests. this.masterMetrics = metrics;// can be null only with tests.
this.regionStates = new RegionStates(server, serverManager); this.regionStates = new RegionStates(server, serverManager);
// A temp ZK watcher for bulk assigner to avoid deadlock,
// will be removed in HBASE-6977 int workers = conf.getInt("hbase.assignment.zkevent.workers", 5);
asyncOfflineZKWatcher = new ZooKeeperWatcher(conf, zkEventWorkers = new java.util.concurrent.ExecutorService[workers];
"async offline ZK watcher", server); ThreadFactory threadFactory =
Threads.newDaemonThreadFactory("am-zkevent-worker");
for (int i = 0; i < workers; i++) {
zkEventWorkers[i] = Threads.getBoundedCachedThreadPool(
1, 60L, TimeUnit.SECONDS, threadFactory);
}
} }
void startTimeOutMonitor() { void startTimeOutMonitor() {
@ -923,56 +921,47 @@ public class AssignmentManager extends ZooKeeperListener {
handleAssignmentEvent(path); handleAssignmentEvent(path);
} }
private void handleAssignmentEvent(final String path) {
if (!path.startsWith(watcher.assignmentZNode)) return;
try {
Stat stat = new Stat();
byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
if (data == null) return;
RegionTransition rt = RegionTransition.parseFrom(data);
handleRegion(rt, stat.getVersion());
} catch (KeeperException e) {
server.abort("Unexpected ZK exception reading unassigned node data", e);
} catch (DeserializationException e) {
server.abort("Unexpected exception deserializing node data", e);
}
}
@Override @Override
public void nodeDeleted(final String path) { public void nodeDeleted(final String path) {
if (path.startsWith(this.watcher.assignmentZNode)) { if (path.startsWith(watcher.assignmentZNode)) {
String regionName = ZKAssign.getRegionName(this.watcher, path); int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
Lock lock = locker.acquireLock(regionName); zkEventWorkers[wi].submit(new Runnable() {
try { @Override
RegionState rs = regionStates.getRegionTransitionState(regionName); public void run() {
if (rs != null) { String regionName = ZKAssign.getRegionName(watcher, path);
HRegionInfo regionInfo = rs.getRegion(); Lock lock = locker.acquireLock(regionName);
if (rs.isSplit()) { try {
LOG.debug("Ephemeral node deleted, regionserver crashed?, " + RegionState rs = regionStates.getRegionTransitionState(regionName);
"clearing from RIT; rs=" + rs); if (rs == null) return;
regionOffline(rs.getRegion());
} else { HRegionInfo regionInfo = rs.getRegion();
LOG.debug("The znode of region " + regionInfo.getRegionNameAsString() if (rs.isSplit()) {
+ " has been deleted."); LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
if (rs.isOpened()) { "clearing from RIT; rs=" + rs);
ServerName serverName = rs.getServerName(); regionOffline(rs.getRegion());
regionOnline(regionInfo, serverName); } else {
LOG.info("The master has opened the region " LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+ regionInfo.getRegionNameAsString() + " that was online on " + " has been deleted.");
+ serverName); if (rs.isOpened()) {
if (this.getZKTable().isDisablingOrDisabledTable( ServerName serverName = rs.getServerName();
regionInfo.getTableNameAsString())) { regionOnline(regionInfo, serverName);
LOG.debug("Opened region " LOG.info("The master has opened the region "
+ regionInfo.getRegionNameAsString() + " but " + regionInfo.getRegionNameAsString() + " that was online on "
+ "this table is disabled, triggering close of region"); + serverName);
unassign(regionInfo); if (getZKTable().isDisablingOrDisabledTable(
regionInfo.getTableNameAsString())) {
LOG.debug("Opened region "
+ regionInfo.getRegionNameAsString() + " but "
+ "this table is disabled, triggering close of region");
unassign(regionInfo);
}
} }
} }
} finally {
lock.unlock();
} }
} }
} finally { });
lock.unlock();
}
} }
} }
@ -990,14 +979,32 @@ public class AssignmentManager extends ZooKeeperListener {
*/ */
@Override @Override
public void nodeChildrenChanged(String path) { public void nodeChildrenChanged(String path) {
if(path.equals(watcher.assignmentZNode)) { if (path.equals(watcher.assignmentZNode)) {
try { int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
// Just make sure we see the changes for the new znodes zkEventWorkers[wi].submit(new Runnable() {
ZKUtil.listChildrenAndWatchThem(watcher, @Override
watcher.assignmentZNode); public void run() {
} catch(KeeperException e) { try {
server.abort("Unexpected ZK exception reading unassigned children", e); // Just make sure we see the changes for the new znodes
} List<String> children =
ZKUtil.listChildrenAndWatchForNewChildren(
watcher, watcher.assignmentZNode);
if (children != null) {
for (String child : children) {
// if region is in transition, we already have a watch
// on it, so no need to watch it again. So, as I know for now,
// this is needed to watch splitting nodes only.
if (!regionStates.isRegionInTransition(child)) {
ZKUtil.watchAndCheckExists(watcher,
ZKUtil.joinZNode(watcher.assignmentZNode, child));
}
}
}
} catch(KeeperException e) {
server.abort("Unexpected ZK exception reading unassigned children", e);
}
}
});
} }
} }
@ -1023,6 +1030,37 @@ public class AssignmentManager extends ZooKeeperListener {
addToServersInUpdatingTimer(sn); addToServersInUpdatingTimer(sn);
} }
/**
* Pass the assignment event to a worker for processing.
* Each worker is a single thread executor service. The reason
* for just one thread is to make sure all events for a given
* region are processed in order.
*
* @param path
*/
private void handleAssignmentEvent(final String path) {
if (path.startsWith(watcher.assignmentZNode)) {
int wi = Math.abs(path.hashCode() % zkEventWorkers.length);
zkEventWorkers[wi].submit(new Runnable() {
@Override
public void run() {
try {
Stat stat = new Stat();
byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
if (data == null) return;
RegionTransition rt = RegionTransition.parseFrom(data);
handleRegion(rt, stat.getVersion());
} catch (KeeperException e) {
server.abort("Unexpected ZK exception reading unassigned node data", e);
} catch (DeserializationException e) {
server.abort("Unexpected exception deserializing node data", e);
}
}
});
}
}
/** /**
* Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater} * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
* will update timers for this server in background * will update timers for this server in background
@ -1175,14 +1213,13 @@ public class AssignmentManager extends ZooKeeperListener {
AtomicInteger counter = new AtomicInteger(0); AtomicInteger counter = new AtomicInteger(0);
Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>(); Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
OfflineCallback cb = new OfflineCallback( OfflineCallback cb = new OfflineCallback(
regionStates, asyncOfflineZKWatcher, destination, counter, offlineNodesVersions); watcher, destination, counter, offlineNodesVersions);
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size()); Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
List<RegionState> states = new ArrayList<RegionState>(regions.size()); List<RegionState> states = new ArrayList<RegionState>(regions.size());
for (HRegionInfo region : regions) { for (HRegionInfo region : regions) {
String encodedRegionName = region.getEncodedName(); String encodedRegionName = region.getEncodedName();
RegionState state = forceRegionStateToOffline(region, true); RegionState state = forceRegionStateToOffline(region, true);
if (state != null && asyncSetOfflineInZooKeeper( if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
state, asyncOfflineZKWatcher, cb, destination)) {
RegionPlan plan = new RegionPlan(region, state.getServerName(), destination); RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
plans.put(encodedRegionName, plan); plans.put(encodedRegionName, plan);
states.add(state); states.add(state);
@ -1228,12 +1265,6 @@ public class AssignmentManager extends ZooKeeperListener {
Lock lock = locks.remove(encodedRegionName); Lock lock = locks.remove(encodedRegionName);
lock.unlock(); lock.unlock();
} else { } else {
try { // Set the ZK watcher explicitly
ZKAssign.getData(this.watcher, encodedRegionName);
} catch (KeeperException e) {
server.abort("Unexpected exception watching ZKAssign node", e);
return false;
}
regionStates.updateRegionState(region, regionStates.updateRegionState(region,
RegionState.State.PENDING_OPEN, destination); RegionState.State.PENDING_OPEN, destination);
regionOpenInfos.add(new Pair<HRegionInfo, Integer>( regionOpenInfos.add(new Pair<HRegionInfo, Integer>(
@ -2640,8 +2671,9 @@ public class AssignmentManager extends ZooKeeperListener {
* Shutdown the threadpool executor service * Shutdown the threadpool executor service
*/ */
public void shutdown() { public void shutdown() {
if (null != threadPoolExecutorService) { threadPoolExecutorService.shutdownNow();
this.threadPoolExecutorService.shutdown(); for (int i = 0, n = zkEventWorkers.length; i < n; i++) {
zkEventWorkers[i].shutdownNow();
} }
} }
@ -2664,8 +2696,7 @@ public class AssignmentManager extends ZooKeeperListener {
* updating zk). * updating zk).
*/ */
private boolean asyncSetOfflineInZooKeeper(final RegionState state, private boolean asyncSetOfflineInZooKeeper(final RegionState state,
final ZooKeeperWatcher zkw, final AsyncCallback.StringCallback cb, final AsyncCallback.StringCallback cb, final ServerName destination) {
final ServerName destination) {
if (!state.isClosed() && !state.isOffline()) { if (!state.isClosed() && !state.isOffline()) {
this.server.abort("Unexpected state trying to OFFLINE; " + state, this.server.abort("Unexpected state trying to OFFLINE; " + state,
new IllegalStateException()); new IllegalStateException());
@ -2674,7 +2705,7 @@ public class AssignmentManager extends ZooKeeperListener {
regionStates.updateRegionState( regionStates.updateRegionState(
state.getRegion(), RegionState.State.OFFLINE); state.getRegion(), RegionState.State.OFFLINE);
try { try {
ZKAssign.asyncCreateNodeOffline(zkw, state.getRegion(), ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
destination, cb, state); destination, cb, state);
} catch (KeeperException e) { } catch (KeeperException e) {
if (e instanceof NodeExistsException) { if (e instanceof NodeExistsException) {

View File

@ -41,13 +41,15 @@ public class OfflineCallback implements StringCallback {
private final ExistCallback callBack; private final ExistCallback callBack;
private final ZooKeeperWatcher zkw; private final ZooKeeperWatcher zkw;
private final ServerName destination; private final ServerName destination;
private final AtomicInteger counter;
OfflineCallback(final RegionStates regionStates, OfflineCallback(final ZooKeeperWatcher zkw,
final ZooKeeperWatcher zkw, final ServerName destination, final ServerName destination, final AtomicInteger counter,
final AtomicInteger counter, final Map<String, Integer> offlineNodesVersions) { final Map<String, Integer> offlineNodesVersions) {
this.callBack = new ExistCallback( this.callBack = new ExistCallback(
regionStates, counter, destination, offlineNodesVersions); destination, counter, offlineNodesVersions);
this.destination = destination; this.destination = destination;
this.counter = counter;
this.zkw = zkw; this.zkw = zkw;
} }
@ -59,13 +61,12 @@ public class OfflineCallback implements StringCallback {
// This is result code. If non-zero, need to resubmit. // This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " + LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2"); "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
this.zkw.abort("Connectionloss writing unassigned at " + path + this.counter.addAndGet(1);
", rc=" + rc, null);
return; return;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("rs=" + (RegionState)ctx LOG.debug("rs=" + ctx + ", server=" + destination);
+ ", server=" + this.destination.toString());
} }
// Async exists to set a watcher so we'll get triggered when // Async exists to set a watcher so we'll get triggered when
// unassigned node changes. // unassigned node changes.
@ -80,17 +81,15 @@ public class OfflineCallback implements StringCallback {
static class ExistCallback implements StatCallback { static class ExistCallback implements StatCallback {
private final Log LOG = LogFactory.getLog(ExistCallback.class); private final Log LOG = LogFactory.getLog(ExistCallback.class);
private final Map<String, Integer> offlineNodesVersions; private final Map<String, Integer> offlineNodesVersions;
private final RegionStates regionStates;
private final AtomicInteger counter; private final AtomicInteger counter;
private ServerName destination; private ServerName destination;
ExistCallback(final RegionStates regionStates, ExistCallback(final ServerName destination,
final AtomicInteger counter, ServerName destination, final AtomicInteger counter,
final Map<String, Integer> offlineNodesVersions) { final Map<String, Integer> offlineNodesVersions) {
this.offlineNodesVersions = offlineNodesVersions; this.offlineNodesVersions = offlineNodesVersions;
this.regionStates = regionStates;
this.counter = counter;
this.destination = destination; this.destination = destination;
this.counter = counter;
} }
@Override @Override
@ -99,24 +98,16 @@ public class OfflineCallback implements StringCallback {
// This is result code. If non-zero, need to resubmit. // This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " + LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2"); "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
this.counter.addAndGet(1);
return; return;
} }
RegionState state = (RegionState)ctx;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("rs=" + state LOG.debug("rs=" + ctx + ", server=" + destination);
+ ", server=" + this.destination.toString());
} }
// Transition RegionState to PENDING_OPEN here in master; means we've HRegionInfo region = ((RegionState)ctx).getRegion();
// sent the open. We're a little ahead of ourselves here since we've not
// yet sent out the actual open but putting this state change after the
// call to open risks our writing PENDING_OPEN after state has been moved
// to OPENING by the regionserver.
HRegionInfo region = state.getRegion();
offlineNodesVersions.put( offlineNodesVersions.put(
region.getEncodedName(), Integer.valueOf(stat.getVersion())); region.getEncodedName(), Integer.valueOf(stat.getVersion()));
regionStates.updateRegionState(region,
RegionState.State.PENDING_OPEN, destination);
this.counter.addAndGet(1); this.counter.addAndGet(1);
} }
} }