diff --git a/bin/rolling-restart.sh b/bin/rolling-restart.sh
index 123ab1ec3d6..ae7a9be02a5 100755
--- a/bin/rolling-restart.sh
+++ b/bin/rolling-restart.sh
@@ -64,7 +64,7 @@ else
zmaster=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.master`
if [ "$zmaster" == "null" ]; then zmaster="master"; fi
zmaster=$zparent/$zmaster
- echo -n "Waiting for Master ZNode to expire"
+ echo -n "Waiting for Master ZNode ${zmaster} to expire"
while bin/hbase zkcli stat $zmaster >/dev/null 2>&1; do
echo -n "."
sleep 1
@@ -76,6 +76,27 @@ else
"$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \
--hosts "${HBASE_BACKUP_MASTERS}" start master-backup
+ echo "Wait a minute for master to come up join cluster"
+ sleep 60
+
+ # Master joing cluster will start in cleaning out regions in transition.
+ # Wait until the master has cleaned out regions in transition before
+ # giving it a bunch of work to do; master is vulnerable during startup
+ zunassigned=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.unassigned`
+ if [ "$zunassigned" == "null" ]; then zunassigned="unassigned"; fi
+ zunassigned="$zparent/$zunassigned"
+ echo -n "Waiting for ${zunassigned} to empty"
+ while true ; do
+ unassigned=`$bin/hbase zkcli stat ${zunassigned} 2>&1 |grep -e 'numChildren = '|sed -e 's,numChildren = ,,'`
+ if test 0 -eq ${unassigned}
+ then
+ break
+ else
+ echo -n " ${unassigned}"
+ fi
+ sleep 1
+ done
+
# unlike the masters, roll all regionservers one-at-a-time
export HBASE_SLAVE_PARALLEL=false
"$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \
diff --git a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 76caf5f0d91..56325feab2a 100644
--- a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -22,11 +22,10 @@ package org.apache.hadoop.hbase.executor;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -41,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
* and a Runnable
that handles the object that is added to the queue.
*
- *
In order to create a new service, create an instance of this class and
+ *
In order to create a new service, create an instance of this class and
* then do: instance.startExecutorService("myService");
. When done
* call {@link #shutdown()}.
*
@@ -170,9 +169,7 @@ public class ExecutorService {
throw new RuntimeException("An executor service with the name " + name +
" is already running (2)!");
}
- LOG.debug("Starting executor service name=" + name +
- ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
- ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
+ LOG.debug("Starting executor service name=" + name);
}
boolean isExecutorServiceRunning(String name) {
@@ -182,7 +179,7 @@ public class ExecutorService {
public void shutdown() {
for(Entry entry: this.executorMap.entrySet()) {
List wasRunning =
- entry.getValue().threadPoolExecutor.shutdownNow();
+ entry.getValue().poolExecutor.shutdownNow();
if (!wasRunning.isEmpty()) {
LOG.info(entry.getKey() + " had " + wasRunning + " on shutdown");
}
@@ -243,15 +240,14 @@ public class ExecutorService {
/**
* Executor instance.
*/
- private static class Executor {
- // default number of threads in the pool
- private int corePoolSize = 1;
+ static class Executor {
// how long to retain excess threads
- private long keepAliveTimeInMillis = 1000;
+ final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
- private final ThreadPoolExecutor threadPoolExecutor;
+ final java.util.concurrent.ExecutorService poolExecutor;
// work queue to use - unbounded queue
- BlockingQueue workQueue = new PriorityBlockingQueue();
+ final PriorityBlockingQueue workQueue =
+ new PriorityBlockingQueue();
private final AtomicInteger threadid = new AtomicInteger(0);
private final String name;
private final Map eventHandlerListeners;
@@ -261,12 +257,10 @@ public class ExecutorService {
this.name = name;
this.eventHandlerListeners = eventHandlerListeners;
// create the thread pool executor
- this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
- keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
// name the threads for this threadpool
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat(this.name + "-" + this.threadid.incrementAndGet());
- this.threadPoolExecutor.setThreadFactory(tfb.build());
+ this.poolExecutor = Executors.newFixedThreadPool(maxThreads, tfb.build());
}
/**
@@ -281,7 +275,7 @@ public class ExecutorService {
if (listener != null) {
event.setListener(listener);
}
- this.threadPoolExecutor.execute(event);
+ this.poolExecutor.execute(event);
}
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a07aea87435..d6511cd2127 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -497,13 +497,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
try {
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
- conf.getInt("hbase.master.executor.openregion.threads", 10));
+ conf.getInt("hbase.master.executor.openregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
- conf.getInt("hbase.master.executor.closeregion.threads", 10));
+ conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
- conf.getInt("hbase.master.executor.serverops.threads", 5));
+ conf.getInt("hbase.master.executor.serverops.threads", 3));
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS,
- conf.getInt("hbase.master.executor.tableops.threads", 5));
+ conf.getInt("hbase.master.executor.tableops.threads", 3));
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 424f0769d9c..ff8d258624b 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1131,13 +1131,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Start executor services
this.service = new ExecutorService(getServerName());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
- conf.getInt("hbase.regionserver.executor.openregion.threads", 5));
+ conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
- conf.getInt("hbase.regionserver.executor.closeregion.threads", 5));
+ conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
diff --git a/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
new file mode 100644
index 00000000000..d7c8323a762
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
+import org.junit.Test;
+
+public class TestExecutorService {
+ @Test
+ public void testExecutorService() throws Exception {
+ int maxThreads = 5;
+ int maxTries = 10;
+ int sleepInterval = 10;
+
+ // Start an executor service pool with max 5 threads
+ ExecutorService executorService = new ExecutorService("unit_test");
+ executorService.startExecutorService(
+ ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
+
+ Executor executor =
+ executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
+
+ AtomicBoolean lock = new AtomicBoolean(true);
+ AtomicInteger counter = new AtomicInteger(0);
+
+ for (int i = 0; i < maxThreads; i++) {
+ executorService.submit(
+ new TestEventHandler(EventType.M_SERVER_SHUTDOWN, lock, counter));
+ }
+
+ int tries = 0;
+ while (counter.get() < maxThreads && tries < maxTries) {
+ System.out.println("Waiting for all event handlers to start...");
+ Thread.sleep(sleepInterval);
+ tries++;
+ }
+
+ assertEquals(maxThreads, counter.get());
+
+ synchronized (lock) {
+ lock.set(false);
+ lock.notifyAll();
+ }
+
+ while (counter.get() < (maxThreads * 2) && tries < maxTries) {
+ System.out.println("Waiting for all event handlers to finish...");
+ Thread.sleep(sleepInterval);
+ tries++;
+ }
+
+ assertEquals(maxThreads*2, counter.get());
+
+ // Add too many. Make sure they are queued. Make sure we don't get
+ // RejectedExecutionException.
+ for (int i = 0; i < maxThreads; i++) {
+ executorService.submit(
+ new TestEventHandler(EventType.M_SERVER_SHUTDOWN, lock, counter));
+ }
+
+ Thread.sleep(executor.keepAliveTimeInMillis * 2);
+ }
+
+ public static class TestEventHandler extends EventHandler {
+ private AtomicBoolean lock;
+ private AtomicInteger counter;
+
+ public TestEventHandler(EventType eventType, AtomicBoolean lock,
+ AtomicInteger counter) {
+ super(null, eventType);
+ this.lock = lock;
+ this.counter = counter;
+ }
+
+ @Override
+ public void process() throws IOException {
+ int num = counter.incrementAndGet();
+ System.out.println("Running process #" + num + ", thread=" + Thread.currentThread());
+ synchronized (lock) {
+ while (lock.get()) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+ }
+ counter.incrementAndGet();
+ }
+ }
+}
\ No newline at end of file