diff --git a/CHANGES.txt b/CHANGES.txt index 50018135059..53326ca7f72 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -421,6 +421,7 @@ Release 0.91.0 - Unreleased (Brad Anderson) HBASE-4291 Improve display of regions in transition in UI to be more readable (todd) + HBASE-4281 Add facility to dump current state of all executors (todd) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 958de8d5384..ad60ba903bc 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -229,4 +229,21 @@ public abstract class EventHandler implements Runnable, Comparable { public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } -} \ No newline at end of file + + @Override + public String toString() { + return "Event #" + getSeqid() + + " of type " + eventType + + " (" + getInformativeName() + ")"; + } + + /** + * Event implementations should override thie class to provide an + * informative name about what event they are handling. For example, + * event-specific information such as which region or server is + * being processed should be included if possible. + */ + public String getInformativeName() { + return this.getClass().toString(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 06b0849f1fa..b3cb746137e 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -19,12 +19,19 @@ */ package org.apache.hadoop.hbase.executor; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.lang.management.ThreadInfo; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -33,7 +40,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -245,6 +255,14 @@ public class ExecutorService { return this.eventHandlerListeners.remove(type); } + public Map getAllExecutorStatuses() { + Map ret = Maps.newHashMap(); + for (Map.Entry e : executorMap.entrySet()) { + ret.put(e.getKey(), e.getValue().getStatus()); + } + return ret; + } + /** * Executor instance. */ @@ -252,7 +270,7 @@ public class ExecutorService { // how long to retain excess threads final long keepAliveTimeInMillis = 1000; // the thread pool executor that services the requests - final ThreadPoolExecutor threadPoolExecutor; + final TrackingThreadPoolExecutor threadPoolExecutor; // work queue to use - unbounded queue final BlockingQueue q = new LinkedBlockingQueue(); private final String name; @@ -266,7 +284,8 @@ public class ExecutorService { this.name = name; this.eventHandlerListeners = eventHandlerListeners; // create the thread pool executor - this.threadPoolExecutor = new ThreadPoolExecutor(maxThreads, maxThreads, + this.threadPoolExecutor = new TrackingThreadPoolExecutor( + maxThreads, maxThreads, keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q); // name the threads for this threadpool ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); @@ -292,5 +311,133 @@ public class ExecutorService { public String toString() { return getClass().getSimpleName() + "-" + id + "-" + name; } + + public ExecutorStatus getStatus() { + List queuedEvents = Lists.newArrayList(); + for (Runnable r : q) { + if (!(r instanceof EventHandler)) { + LOG.warn("Non-EventHandler " + r + " queued in " + name); + continue; + } + queuedEvents.add((EventHandler)r); + } + + List running = Lists.newArrayList(); + for (Map.Entry e : + threadPoolExecutor.getRunningTasks().entrySet()) { + Runnable r = e.getValue(); + if (!(r instanceof EventHandler)) { + LOG.warn("Non-EventHandler " + r + " running in " + name); + continue; + } + running.add(new RunningEventStatus(e.getKey(), (EventHandler)r)); + } + + return new ExecutorStatus(this, queuedEvents, running); + } + } + + /** + * A subclass of ThreadPoolExecutor that keeps track of the Runnables that + * are executing at any given point in time. + */ + static class TrackingThreadPoolExecutor extends ThreadPoolExecutor { + private ConcurrentMap running = Maps.newConcurrentMap(); + + public TrackingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + running.remove(Thread.currentThread()); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + Runnable oldPut = running.put(t, r); + assert oldPut == null : "inconsistency for thread " + t; + super.beforeExecute(t, r); + } + + /** + * @return a map of the threads currently running tasks + * inside this executor. Each key is an active thread, + * and the value is the task that is currently running. + * Note that this is not a stable snapshot of the map. + */ + public ConcurrentMap getRunningTasks() { + return running; + } + } + + /** + * A snapshot of the status of a particular executor. This includes + * the contents of the executor's pending queue, as well as the + * threads and events currently being processed. + * + * This is a consistent snapshot that is immutable once constructed. + */ + public static class ExecutorStatus { + final Executor executor; + final List queuedEvents; + final List running; + + ExecutorStatus(Executor executor, + List queuedEvents, + List running) { + this.executor = executor; + this.queuedEvents = queuedEvents; + this.running = running; + } + + /** + * Dump a textual representation of the executor's status + * to the given writer. + * + * @param out the stream to write to + * @param indent a string prefix for each line, used for indentation + */ + public void dumpTo(Writer out, String indent) throws IOException { + out.write(indent + "Status for executor: " + executor + "\n"); + out.write(indent + "=======================================\n"); + out.write(indent + queuedEvents.size() + " events queued, " + + running.size() + " running\n"); + if (!queuedEvents.isEmpty()) { + out.write(indent + "Queued:\n"); + for (EventHandler e : queuedEvents) { + out.write(indent + " " + e + "\n"); + } + out.write("\n"); + } + if (!running.isEmpty()) { + out.write(indent + "Running:\n"); + for (RunningEventStatus stat : running) { + out.write(indent + " Running on thread '" + + stat.threadInfo.getThreadName() + + "': " + stat.event + "\n"); + out.write(ThreadMonitoring.formatThreadInfo( + stat.threadInfo, indent + " ")); + out.write("\n"); + } + } + out.flush(); + } + } + + /** + * The status of a particular event that is in the middle of being + * handled by an executor. + */ + public static class RunningEventStatus { + final ThreadInfo threadInfo; + final EventHandler event; + + public RunningEventStatus(Thread t, EventHandler event) { + this.threadInfo = ThreadMonitoring.getThreadInfo(t); + this.event = event; + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index dd2d6f637c0..4754ff08150 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -71,6 +71,15 @@ public class ServerShutdownHandler extends EventHandler { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } } + + @Override + public String getInformativeName() { + if (serverName != null) { + return this.getClass().getSimpleName() + " for " + serverName; + } else { + return super.getInformativeName(); + } + } /** * Before assign the ROOT region, ensure it haven't diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java b/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java new file mode 100644 index 00000000000..b4af3338ba4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadMonitoring.java @@ -0,0 +1,95 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +public abstract class ThreadMonitoring { + + private static final ThreadMXBean threadBean = + ManagementFactory.getThreadMXBean(); + private static final int STACK_DEPTH = 20; + + public static ThreadInfo getThreadInfo(Thread t) { + long tid = t.getId(); + return threadBean.getThreadInfo(tid, STACK_DEPTH); + } + + + /** + * Format the given ThreadInfo object as a String. + * @param indent a prefix for each line, used for nested indentation + */ + public static String formatThreadInfo(ThreadInfo threadInfo, String indent) { + StringBuilder sb = new StringBuilder(); + appendThreadInfo(sb, threadInfo, indent); + return sb.toString(); + } + + /** + * Print all of the thread's information and stack traces. + * + * @param stream the stream to + * + */ + public static void appendThreadInfo(StringBuilder sb, + ThreadInfo info, + String indent) { + boolean contention = threadBean.isThreadContentionMonitoringEnabled(); + + if (info == null) { + sb.append(indent).append("Inactive (perhaps exited while monitoring was done)\n"); + return; + } + String taskName = getTaskName(info.getThreadId(), info.getThreadName()); + sb.append(indent).append("Thread ").append(taskName).append(":\n"); + + Thread.State state = info.getThreadState(); + sb.append(indent).append(" State: ").append(state).append("\n"); + sb.append(indent).append(" Blocked count: ").append(info.getBlockedCount()).append("\n"); + sb.append(indent).append(" Waited count: ").append(info.getWaitedCount()).append("\n"); + if (contention) { + sb.append(indent).append(" Blocked time: " + info.getBlockedTime()).append("\n"); + sb.append(indent).append(" Waited time: " + info.getWaitedTime()).append("\n"); + } + if (state == Thread.State.WAITING) { + sb.append(indent).append(" Waiting on ").append(info.getLockName()).append("\n"); + } else if (state == Thread.State.BLOCKED) { + sb.append(indent).append(" Blocked on ").append(info.getLockName()).append("\n"); + sb.append(indent).append(" Blocked by ").append( + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())).append("\n"); + } + sb.append(indent).append(" Stack:").append("\n"); + for (StackTraceElement frame: info.getStackTrace()) { + sb.append(indent).append(" ").append(frame.toString()).append("\n"); + } + } + + private static String getTaskName(long id, String name) { + if (name == null) { + return Long.toString(id); + } + return id + " (" + name + ")"; + } + + +} diff --git a/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index 88e5aaa0bd1..68d76f2802c 100644 --- a/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -19,9 +19,12 @@ */ package org.apache.hadoop.hbase.executor; +import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService.Executor; +import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.junit.Test; import static org.mockito.Mockito.*; @@ -82,6 +86,12 @@ public class TestExecutorService { assertEquals(maxThreads, counter.get()); assertEquals(maxThreads, pool.getPoolSize()); + ExecutorStatus status = executor.getStatus(); + assertTrue(status.queuedEvents.isEmpty()); + assertEquals(5, status.running.size()); + checkStatusDump(status); + + // Now interrupt the running Executor synchronized (lock) { lock.set(false); @@ -116,6 +126,15 @@ public class TestExecutorService { assertEquals(maxThreads, pool.getPoolSize()); } + private void checkStatusDump(ExecutorStatus status) throws IOException { + StringWriter sw = new StringWriter(); + status.dumpTo(sw, ""); + String dump = sw.toString(); + LOG.info("Got status dump:\n" + dump); + + assertTrue(dump.contains("Waiting on java.util.concurrent.atomic.AtomicBoolean")); + } + public static class TestEventHandler extends EventHandler { private AtomicBoolean lock; private AtomicInteger counter;