HBASE-7271 Have a single executor for all zkWorkers in the assignment manager

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1419351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2012-12-10 11:34:46 +00:00
parent f0f66f4ba9
commit d6bc735d19
1 changed files with 98 additions and 26 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -145,10 +147,10 @@ public class AssignmentManager extends ZooKeeperListener {
private java.util.concurrent.ExecutorService threadPoolExecutorService; private java.util.concurrent.ExecutorService threadPoolExecutorService;
// A bunch of ZK events workers. Each is a single thread executor service // A bunch of ZK events workers. Each is a single thread executor service
private java.util.concurrent.ExecutorService[] zkEventWorkers; private final java.util.concurrent.ExecutorService zkEventWorkers;
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{ private List<EventType> ignoreStatesRSOffline = Arrays.asList(
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED }); EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
// metrics instance to send metrics for RITs // metrics instance to send metrics for RITs
MetricsMaster metricsMaster; MetricsMaster metricsMaster;
@ -202,14 +204,10 @@ public class AssignmentManager extends ZooKeeperListener {
this.metricsMaster = metricsMaster;// can be null only with tests. this.metricsMaster = metricsMaster;// can be null only with tests.
this.regionStates = new RegionStates(server, serverManager); this.regionStates = new RegionStates(server, serverManager);
int workers = conf.getInt("hbase.assignment.zkevent.workers", 5); int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
zkEventWorkers = new java.util.concurrent.ExecutorService[workers]; ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
ThreadFactory threadFactory = zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
Threads.newDaemonThreadFactory("am-zkevent-worker"); TimeUnit.SECONDS, threadFactory);
for (int i = 0; i < workers; i++) {
zkEventWorkers[i] = Threads.getBoundedCachedThreadPool(
1, 60L, TimeUnit.SECONDS, threadFactory);
}
} }
void startTimeOutMonitor() { void startTimeOutMonitor() {
@ -919,14 +917,83 @@ public class AssignmentManager extends ZooKeeperListener {
handleAssignmentEvent(path); handleAssignmentEvent(path);
} }
// We don't want to have two events on the same region managed simultaneously.
// For this reason, we need to wait if an event on the same region is currently in progress.
// So we track the region names of the events in progress, and we keep a waiting list.
private final Set<String> regionsInProgress = new HashSet<String>();
// In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
// this as we want the events to be managed in the same order as we received them.
private final LinkedHashMultimap <String, RegionRunnable>
zkEventWorkerWaitingList = LinkedHashMultimap.create();
/**
* A specific runnable that works only on a region.
*/
private static interface RegionRunnable extends Runnable{
/**
* @return - the name of the region it works on.
*/
public String getRegionName();
}
/**
* Submit a task, ensuring that there is only one task at a time that working on a given region.
* Order is respected.
*/
protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
synchronized (regionsInProgress) {
// If we're there is already a task with this region, we add it to the
// waiting list and return.
if (regionsInProgress.contains(regRunnable.getRegionName())) {
synchronized (zkEventWorkerWaitingList){
zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
}
return;
}
// No event in progress on this region => we can submit a new task immediately.
regionsInProgress.add(regRunnable.getRegionName());
zkEventWorkers.submit(new Runnable() {
@Override
public void run() {
try {
regRunnable.run();
} finally {
// now that we have finished, let's see if there is an event for the same region in the
// waiting list. If it's the case, we can now submit it to the pool.
synchronized (regionsInProgress) {
regionsInProgress.remove(regRunnable.getRegionName());
synchronized (zkEventWorkerWaitingList) {
java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
regRunnable.getRegionName());
if (!waiting.isEmpty()) {
// We want the first object only. The only way to get it is through an iterator.
RegionRunnable toSubmit = waiting.iterator().next();
zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
zkEventWorkersSubmit(toSubmit);
}
}
}
}
}
});
}
}
@Override @Override
public void nodeDeleted(final String path) { public void nodeDeleted(final String path) {
if (path.startsWith(watcher.assignmentZNode)) { if (path.startsWith(watcher.assignmentZNode)) {
int wi = Math.abs(path.hashCode() % zkEventWorkers.length); final String regionName = ZKAssign.getRegionName(watcher, path);
zkEventWorkers[wi].submit(new Runnable() { zkEventWorkersSubmit(new RegionRunnable() {
@Override
public String getRegionName() {
return regionName;
}
@Override @Override
public void run() { public void run() {
String regionName = ZKAssign.getRegionName(watcher, path);
Lock lock = locker.acquireLock(regionName); Lock lock = locker.acquireLock(regionName);
try { try {
RegionState rs = regionStates.getRegionTransitionState(regionName); RegionState rs = regionStates.getRegionTransitionState(regionName);
@ -983,8 +1050,7 @@ public class AssignmentManager extends ZooKeeperListener {
@Override @Override
public void nodeChildrenChanged(String path) { public void nodeChildrenChanged(String path) {
if (path.equals(watcher.assignmentZNode)) { if (path.equals(watcher.assignmentZNode)) {
int wi = Math.abs(path.hashCode() % zkEventWorkers.length); zkEventWorkers.submit(new Runnable() {
zkEventWorkers[wi].submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@ -1003,7 +1069,7 @@ public class AssignmentManager extends ZooKeeperListener {
} }
} }
} }
} catch(KeeperException e) { } catch (KeeperException e) {
server.abort("Unexpected ZK exception reading unassigned children", e); server.abort("Unexpected ZK exception reading unassigned children", e);
} }
} }
@ -1043,8 +1109,14 @@ public class AssignmentManager extends ZooKeeperListener {
*/ */
private void handleAssignmentEvent(final String path) { private void handleAssignmentEvent(final String path) {
if (path.startsWith(watcher.assignmentZNode)) { if (path.startsWith(watcher.assignmentZNode)) {
int wi = Math.abs(path.hashCode() % zkEventWorkers.length); final String regionName = ZKAssign.getRegionName(watcher, path);
zkEventWorkers[wi].submit(new Runnable() {
zkEventWorkersSubmit(new RegionRunnable() {
@Override
public String getRegionName() {
return regionName;
}
@Override @Override
public void run() { public void run() {
try { try {
@ -1852,8 +1924,7 @@ public class AssignmentManager extends ZooKeeperListener {
+ "can't be created."); + "can't be created.");
return; return;
} }
} catch (KeeperException ee) { } catch (KeeperException e) {
Exception e = ee;
if (e instanceof NodeExistsException) { if (e instanceof NodeExistsException) {
// Handle race between master initiated close and regionserver // Handle race between master initiated close and regionserver
// orchestrated splitting. See if existing node is in a // orchestrated splitting. See if existing node is in a
@ -2595,8 +2666,7 @@ public class AssignmentManager extends ZooKeeperListener {
ServerName addressFromZK = rt != null? rt.getServerName(): null; ServerName addressFromZK = rt != null? rt.getServerName(): null;
if (addressFromZK != null) { if (addressFromZK != null) {
// if we get something from ZK, we will use the data // if we get something from ZK, we will use the data
boolean matchZK = (addressFromZK != null && boolean matchZK = addressFromZK.equals(serverName);
addressFromZK.equals(serverName));
LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() + LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() +
" is on server=" + addressFromZK + " is on server=" + addressFromZK +
" server being checked=: " + serverName); " server being checked=: " + serverName);
@ -2678,10 +2748,12 @@ public class AssignmentManager extends ZooKeeperListener {
* Shutdown the threadpool executor service * Shutdown the threadpool executor service
*/ */
public void shutdown() { public void shutdown() {
threadPoolExecutorService.shutdownNow(); // It's an immediate shutdown, so we're clearing the remaining tasks.
for (int i = 0, n = zkEventWorkers.length; i < n; i++) { synchronized (zkEventWorkerWaitingList){
zkEventWorkers[i].shutdownNow(); zkEventWorkerWaitingList.clear();
} }
threadPoolExecutorService.shutdownNow();
zkEventWorkers.shutdownNow();
} }
protected void setEnabledTable(String tableName) { protected void setEnabledTable(String tableName) {