From eaa40e323e868bd51b35e5ead0f1f9cc250495d8 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 10 Sep 2011 18:44:45 +0000 Subject: [PATCH] HBASE-4057 Implement HBase version of "show processlist" git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1167578 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hbase/tmpl/common/TaskMonitorTmpl.jamon | 86 ++++-- .../hbase/tmpl/master/MasterStatusTmpl.jamon | 8 +- .../tmpl/regionserver/RSStatusTmpl.jamon | 8 +- .../apache/hadoop/hbase/ipc/HBaseServer.java | 17 +- .../apache/hadoop/hbase/ipc/RpcServer.java | 3 +- .../hadoop/hbase/ipc/WritableRpcEngine.java | 13 +- .../hbase/master/MasterStatusServlet.java | 10 +- .../hbase/monitoring/MonitoredRPCHandler.java | 44 +++ .../monitoring/MonitoredRPCHandlerImpl.java | 256 ++++++++++++++++++ .../hbase/monitoring/MonitoredTask.java | 37 ++- .../hbase/monitoring/MonitoredTaskImpl.java | 109 ++++++-- .../hadoop/hbase/monitoring/TaskMonitor.java | 26 +- .../hbase/regionserver/RSStatusServlet.java | 7 +- .../resources/hbase-webapps/static/hbase.css | 6 + .../hbase/monitoring/TestTaskMonitor.java | 4 +- 16 files changed, 569 insertions(+), 66 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java create mode 100644 src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java diff --git a/CHANGES.txt b/CHANGES.txt index ad484176adf..bcd323ade5d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -540,6 +540,7 @@ Release 0.91.0 - Unreleased (Riley Patterson) HBASE-4292 Add a debugging dump servlet to the master and regionserver (todd) + HBASE-4057 Implement HBase version of "show processlist" (Riley Patterson) Release 0.90.5 - Unreleased diff --git a/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon b/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon index f96d4a2339a..813c7b8b049 100644 --- a/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon +++ b/src/main/jamon/org/apache/hbase/tmpl/common/TaskMonitorTmpl.jamon @@ -20,40 +20,74 @@ limitations under the License. <%import> java.util.*; org.apache.hadoop.hbase.monitoring.*; +org.apache.hadoop.util.StringUtils; <%args> TaskMonitor taskMonitor = TaskMonitor.get(); +String filter = "general"; +String format = "html"; <%java> +List tasks = taskMonitor.getTasks(); +Iterator iter = tasks.iterator(); +// apply requested filter +while (iter.hasNext()) { + MonitoredTask t = iter.next(); + if (filter.equals("general")) { + if (t instanceof MonitoredRPCHandler) + iter.remove(); + } else if (filter.equals("handler")) { + if (!(t instanceof MonitoredRPCHandler)) + iter.remove(); + } else if (filter.equals("rpc")) { + if (!(t instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) t).isRPCRunning()) + iter.remove(); + } else if (filter.equals("operation")) { + if (!(t instanceof MonitoredRPCHandler) || + !((MonitoredRPCHandler) t).isOperationRunning()) + iter.remove(); + } +} long now = System.currentTimeMillis(); -List tasks = taskMonitor.getTasks(); Collections.reverse(tasks); - +boolean first = true; -

Currently running tasks

- -<%if tasks.isEmpty()%> -No tasks currently running on this node. +<%if format.equals("json")%> +[<%for MonitoredTask task : tasks%><%if first%><%java>first = false;<%else>,<% task.toJSON() %>] <%else> +
+ Show All Monitored Tasks | + Show non-RPC Tasks | + Show All RPC Handler Tasks | + Show Active RPC Calls | + Show Client Operations | + View as JSON +
+

Recent tasks

+ <%if tasks.isEmpty()%> + No tasks currently running on this node. + <%else> + + + + + + + + <%for MonitoredTask task : tasks %> + + + + + + + +
Start TimeDescriptionStateStatus
<% new Date(task.getStartTime()) %><% task.getDescription() %><% task.getState() %> + (since <% StringUtils.formatTimeDiff(now, task.getStateTime()) %> ago) + <% task.getStatus() %> + (since <% StringUtils.formatTimeDiff(now, task.getStatusTime()) %> + ago)
- - - - - - -<%for MonitoredTask task : tasks %> - - - - - - - -
DescriptionStatusAge
<% task.getDescription() %><% task.getStatus() %><% (int)((now - task.getStartTime())/1000) %>s - <%if task.getCompletionTimestamp() != -1%> - (Completed <% (now - task.getCompletionTimestamp())/1000 %>s ago) -
- - \ No newline at end of file + diff --git a/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon b/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon index 38731bc7244..4cfe0557244 100644 --- a/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon +++ b/src/main/jamon/org/apache/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -25,6 +25,8 @@ ServerName rootLocation = null; ServerName metaLocation = null; List servers = null; boolean showAppendWarning = false; +String filter = "general"; +String format = "html"; <%import> java.util.*; @@ -40,6 +42,10 @@ org.apache.hadoop.hbase.client.HBaseAdmin; org.apache.hadoop.hbase.client.HConnectionManager; org.apache.hadoop.hbase.HTableDescriptor; +<%if format.equals("json") %> + <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &> + <%java return; %> + @@ -94,7 +100,7 @@ org.apache.hadoop.hbase.HTableDescriptor; Zookeeper Quorum<% master.getZooKeeperWatcher().getQuorum() %>Addresses of all registered ZK servers. For more, see zk dump. -<& ../common/TaskMonitorTmpl &> +<& ../common/TaskMonitorTmpl; filter = filter &> <%if (rootLocation != null) %> <& catalogTables &> diff --git a/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon index d974e91a278..be6fceb0fef 100644 --- a/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/src/main/jamon/org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -19,6 +19,8 @@ limitations under the License. <%args> HRegionServer regionServer; +String filter = "general"; +String format = "html"; <%import> java.util.*; @@ -33,6 +35,10 @@ org.apache.hadoop.hbase.HServerInfo; org.apache.hadoop.hbase.HServerLoad; org.apache.hadoop.hbase.HRegionInfo; +<%if format.equals("json") %> + <& ../common/TaskMonitorTmpl; filter = filter; format = "json" &> + <%java return; %> + <%java> HServerInfo serverInfo = null; try { @@ -73,7 +79,7 @@ org.apache.hadoop.hbase.HRegionInfo; Zookeeper Quorum<% regionServer.getZooKeeper().getQuorum() %>Addresses of all registered ZK servers -<& ../common/TaskMonitorTmpl &> +<& ../common/TaskMonitorTmpl; filter = filter &>

Online Regions

<%if (onlineRegions != null && onlineRegions.size() > 0) %> diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 2d8c978d46e..36b0560658f 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -59,6 +59,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -1029,6 +1031,10 @@ public abstract class HBaseServer implements RpcServer { return hostAddress; } + public int getRemotePort() { + return remotePort; + } + public void setLastContact(long lastContact) { this.lastContact = lastContact; } @@ -1176,6 +1182,7 @@ public abstract class HBaseServer implements RpcServer { /** Handles queued calls . */ private class Handler extends Thread { private final BlockingQueue myCallQueue; + private MonitoredRPCHandler status; public Handler(final BlockingQueue cq, int instanceNumber) { this.myCallQueue = cq; @@ -1187,15 +1194,21 @@ public abstract class HBaseServer implements RpcServer { threadName = "PRI " + threadName; } this.setName(threadName); + this.status = TaskMonitor.get().createRPCStatus(threadName); } @Override public void run() { LOG.info(getName() + ": starting"); + status.setStatus("starting"); SERVER.set(HBaseServer.this); while (running) { try { + status.pause("Waiting for a call"); Call call = myCallQueue.take(); // pop the queue; maybe blocked here + status.setStatus("Setting up call"); + status.setConnection(call.connection.getHostAddress(), + call.connection.getRemotePort()); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + @@ -1209,7 +1222,9 @@ public abstract class HBaseServer implements RpcServer { try { if (!started) throw new ServerNotRunningYetException("Server is not running yet"); - value = call(call.connection.protocol, call.param, call.timestamp); // make the call + // make the call + value = call(call.connection.protocol, call.param, call.timestamp, + status); } catch (Throwable e) { LOG.debug(getName()+", call "+call+": error: " + e, e); errorClass = e.getClass().getName(); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 07aec23c6f7..692f589049a 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.base.Function; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import java.io.IOException; import java.net.InetSocketAddress; @@ -48,7 +49,7 @@ public interface RpcServer { * @throws java.io.IOException e */ Writable call(Class protocol, - Writable param, long receiveTime) + Writable param, long receiveTime, MonitoredRPCHandler status) throws IOException; int getNumOpenConnections(); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java index b618429aaff..60a9248f641 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Objects; @@ -316,7 +317,7 @@ class WritableRpcEngine implements RpcEngine { @Override public Writable call(Class protocol, - Writable param, long receivedTime) + Writable param, long receivedTime, MonitoredRPCHandler status) throws IOException { try { Invocation call = (Invocation)param; @@ -325,6 +326,9 @@ class WritableRpcEngine implements RpcEngine { "cause is a version mismatch between client and server."); } if (verbose) log("Call: " + call); + status.setRPC(call.getMethodName(), call.getParameters(), receivedTime); + status.setRPCPacket(param); + status.resume("Servicing call"); Method method = protocol.getMethod(call.getMethodName(), @@ -369,7 +373,8 @@ class WritableRpcEngine implements RpcEngine { // when tagging, we let TooLarge trump TooSmall to keep output simple // note that large responses will often also be slow. logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"), - startTime, processingTime, qTime, responseSize); + status.getClient(), startTime, processingTime, qTime, + responseSize); // provides a count of log-reported slow responses if (tooSlow) { rpcMetrics.rpcSlowResponseTime.inc(processingTime); @@ -407,13 +412,14 @@ class WritableRpcEngine implements RpcEngine { * client Operations. * @param call The call to log. * @param tag The tag that will be used to indicate this event in the log. + * @param client The address of the client who made this call. * @param startTime The time that the call was initiated, in ms. * @param processingTime The duration that the call took to run, in ms. * @param qTime The duration that the call spent on the queue * prior to being initiated, in ms. * @param responseSize The size in bytes of the response buffer. */ - private void logResponse(Invocation call, String tag, + private void logResponse(Invocation call, String tag, String clientAddress, long startTime, int processingTime, int qTime, long responseSize) throws IOException { Object params[] = call.getParameters(); @@ -425,6 +431,7 @@ class WritableRpcEngine implements RpcEngine { responseInfo.put("processingtimems", processingTime); responseInfo.put("queuetimems", qTime); responseInfo.put("responsesize", responseSize); + responseInfo.put("client", clientAddress); responseInfo.put("class", instance.getClass().getSimpleName()); responseInfo.put("method", call.getMethodName()); if (params.length == 2 && instance instanceof HRegionServer && diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java b/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java index b63dc2abe29..39943ef3a31 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java @@ -61,13 +61,17 @@ public class MasterStatusServlet extends HttpServlet { List servers = master.getServerManager().getOnlineServersList(); response.setContentType("text/html"); - new MasterStatusTmpl() + MasterStatusTmpl tmpl = new MasterStatusTmpl() .setFrags(frags) .setShowAppendWarning(shouldShowAppendWarning(conf)) .setRootLocation(rootLocation) .setMetaLocation(metaLocation) - .setServers(servers) - .render(response.getWriter(), + .setServers(servers); + if (request.getParameter("filter") != null) + tmpl.setFilter(request.getParameter("filter")); + if (request.getParameter("format") != null) + tmpl.setFormat(request.getParameter("format")); + tmpl.render(response.getWriter(), master, admin); } diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java new file mode 100644 index 00000000000..d4f97144d87 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java @@ -0,0 +1,44 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * A MonitoredTask implementation optimized for use with RPC Handlers + * handling frequent, short duration tasks. String concatenations and object + * allocations are avoided in methods that will be hit by every RPC call. + */ +public interface MonitoredRPCHandler extends MonitoredTask { + public abstract String getRPC(); + public abstract String getRPC(boolean withParams); + public abstract long getRPCPacketLength(); + public abstract String getClient(); + public abstract long getRPCStartTime(); + public abstract long getRPCQueueTime(); + public abstract boolean isRPCRunning(); + public abstract boolean isOperationRunning(); + + public abstract void setRPC(String methodName, Object [] params, + long queueTime); + public abstract void setRPCPacket(Writable param); + public abstract void setConnection(String clientAddress, int remotePort); +} diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java new file mode 100644 index 00000000000..493dcdb0a3a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java @@ -0,0 +1,256 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitoring; + +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * A MonitoredTask implementation designed for use with RPC Handlers + * handling frequent, short duration tasks. String concatenations and object + * allocations are avoided in methods that will be hit by every RPC call. + */ +public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl + implements MonitoredRPCHandler { + private String clientAddress; + private int remotePort; + private long rpcQueueTime; + private long rpcStartTime; + private String methodName = ""; + private Object [] params = {}; + private Writable packet; + + public MonitoredRPCHandlerImpl() { + super(); + // in this implementation, WAITING indicates that the handler is not + // actively servicing an RPC call. + setState(State.WAITING); + } + + @Override + public synchronized MonitoredRPCHandlerImpl clone() { + return (MonitoredRPCHandlerImpl) super.clone(); + } + + /** + * Gets the status of this handler; if it is currently servicing an RPC, + * this status will include the RPC information. + * @return a String describing the current status. + */ + @Override + public String getStatus() { + if (getState() != State.RUNNING) { + return super.getStatus(); + } + return super.getStatus() + " from " + getClient() + ": " + getRPC(); + } + + /** + * Accesses the queue time for the currently running RPC on the + * monitored Handler. + * @return the queue timestamp or -1 if there is no RPC currently running. + */ + public long getRPCQueueTime() { + if (getState() != State.RUNNING) { + return -1; + } + return rpcQueueTime; + } + + /** + * Accesses the start time for the currently running RPC on the + * monitored Handler. + * @return the start timestamp or -1 if there is no RPC currently running. + */ + public long getRPCStartTime() { + if (getState() != State.RUNNING) { + return -1; + } + return rpcStartTime; + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @return a string representing the method call without parameters + */ + public String getRPC() { + return getRPC(false); + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @param withParams toggle inclusion of parameters in the RPC String + * @return A human-readable string representation of the method call. + */ + public synchronized String getRPC(boolean withParams) { + if (getState() != State.RUNNING) { + // no RPC is currently running + return ""; + } + StringBuilder buffer = new StringBuilder(256); + buffer.append(methodName); + if (withParams) { + buffer.append("("); + for (int i = 0; i < params.length; i++) { + if (i != 0) + buffer.append(", "); + buffer.append(params[i]); + } + buffer.append(")"); + } + return buffer.toString(); + } + + /** + * Produces a string representation of the method currently being serviced + * by this Handler. + * @return A human-readable string representation of the method call. + */ + public long getRPCPacketLength() { + if (getState() != State.RUNNING || packet == null) { + // no RPC is currently running, or we don't have an RPC's packet info + return -1L; + } + if (!(packet instanceof WritableWithSize)) { + // the packet passed to us doesn't expose size information + return -1L; + } + return ((WritableWithSize) packet).getWritableSize(); + } + + /** + * If an RPC call is currently running, produces a String representation of + * the connection from which it was received. + * @return A human-readable string representation of the address and port + * of the client. + */ + public String getClient() { + return clientAddress + ":" + remotePort; + } + + /** + * Indicates to the client whether this task is monitoring a currently active + * RPC call. + * @return true if the monitored handler is currently servicing an RPC call. + */ + public boolean isRPCRunning() { + return getState() == State.RUNNING; + } + + /** + * Indicates to the client whether this task is monitoring a currently active + * RPC call to a database command. (as defined by + * o.a.h.h.client.Operation) + * @return true if the monitored handler is currently servicing an RPC call + * to a database command. + */ + public boolean isOperationRunning() { + if(!isRPCRunning()) { + return false; + } + for(Object param : params) { + if (param instanceof Operation) { + return true; + } + } + return false; + } + + /** + * Tells this instance that it is monitoring a new RPC call. + * @param methodName The name of the method that will be called by the RPC. + * @param params The parameters that will be passed to the indicated method. + */ + public synchronized void setRPC(String methodName, Object [] params, + long queueTime) { + this.methodName = methodName; + this.params = params; + this.rpcStartTime = System.currentTimeMillis(); + this.rpcQueueTime = queueTime; + this.state = State.RUNNING; + } + + /** + * Gives this instance a reference to the Writable received by the RPC, so + * that it can later compute its size if asked for it. + * @param param The Writable received by the RPC for this call + */ + public void setRPCPacket(Writable param) { + this.packet = param; + } + + /** + * Registers current handler client details. + * @param clientAddress the address of the current client + * @param remotePort the port from which the client connected + */ + public void setConnection(String clientAddress, int remotePort) { + this.clientAddress = clientAddress; + this.remotePort = remotePort; + } + + public synchronized Map toMap() { + // only include RPC info if the Handler is actively servicing an RPC call + Map map = super.toMap(); + if (getState() != State.RUNNING) { + return map; + } + Map rpcJSON = new HashMap(); + ArrayList paramList = new ArrayList(); + map.put("rpcCall", rpcJSON); + rpcJSON.put("queuetimems", getRPCQueueTime()); + rpcJSON.put("starttimems", getRPCStartTime()); + rpcJSON.put("clientaddress", clientAddress); + rpcJSON.put("remoteport", remotePort); + rpcJSON.put("packetlength", getRPCPacketLength()); + rpcJSON.put("method", methodName); + rpcJSON.put("params", paramList); + for(Object param : params) { + if(param instanceof byte []) { + paramList.add(Bytes.toStringBinary((byte []) param)); + } else if (param instanceof Operation) { + paramList.add(((Operation) param).toMap()); + } else { + paramList.add(param.toString()); + } + } + return map; + } + + @Override + public String toString() { + if (getState() != State.RUNNING) { + return super.toString(); + } + return super.toString() + ", rpcMethod=" + getRPC(); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java index ecbe4c2bcf3..9f5ab7a8d05 100644 --- a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java @@ -19,28 +19,32 @@ */ package org.apache.hadoop.hbase.monitoring; -public interface MonitoredTask { +import java.io.IOException; +import java.util.Map; + +public interface MonitoredTask extends Cloneable { enum State { RUNNING, + WAITING, COMPLETE, ABORTED; } public abstract long getStartTime(); - public abstract String getDescription(); - public abstract String getStatus(); - + public abstract long getStatusTime(); public abstract State getState(); - + public abstract long getStateTime(); public abstract long getCompletionTimestamp(); public abstract void markComplete(String msg); + public abstract void pause(String msg); + public abstract void resume(String msg); public abstract void abort(String msg); + public abstract void expireNow(); public abstract void setStatus(String status); - public abstract void setDescription(String description); /** @@ -49,5 +53,24 @@ public interface MonitoredTask { */ public abstract void cleanup(); + /** + * Public exposure of Object.clone() in order to allow clients to easily + * capture current state. + * @returns a copy of the object whose references will not change + */ + public abstract MonitoredTask clone(); -} \ No newline at end of file + /** + * Creates a string map of internal details for extensible exposure of + * monitored tasks. + * @return A Map containing information for this task. + */ + public abstract Map toMap() throws IOException; + + /** + * Creates a JSON object for parseable exposure of monitored tasks. + * @return An encoded JSON object containing information for this task. + */ + public abstract String toJSON() throws IOException; + +} diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java index b0edfdce258..394129c8150 100644 --- a/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java @@ -19,19 +19,35 @@ */ package org.apache.hadoop.hbase.monitoring; -import com.google.common.annotations.VisibleForTesting; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; class MonitoredTaskImpl implements MonitoredTask { private long startTime; - private long completionTimestamp = -1; + private long statusTime; + private long stateTime; - private String status; - private String description; + private volatile String status; + private volatile String description; - private State state = State.RUNNING; + protected volatile State state = State.RUNNING; public MonitoredTaskImpl() { startTime = System.currentTimeMillis(); + statusTime = startTime; + stateTime = startTime; + } + + @Override + public synchronized MonitoredTaskImpl clone() { + try { + return (MonitoredTaskImpl) super.clone(); + } catch (CloneNotSupportedException e) { + throw new AssertionError(); // Won't happen + } } @Override @@ -48,6 +64,11 @@ class MonitoredTaskImpl implements MonitoredTask { public String getStatus() { return status; } + + @Override + public long getStatusTime() { + return statusTime; + } @Override public State getState() { @@ -55,27 +76,51 @@ class MonitoredTaskImpl implements MonitoredTask { } @Override - public long getCompletionTimestamp() { - return completionTimestamp; + public long getStateTime() { + return stateTime; } + @Override + public long getCompletionTimestamp() { + if (state == State.COMPLETE || state == State.ABORTED) { + return stateTime; + } + return -1; + } + @Override public void markComplete(String status) { - state = State.COMPLETE; + setState(State.COMPLETE); setStatus(status); - completionTimestamp = System.currentTimeMillis(); + } + + @Override + public void pause(String msg) { + setState(State.WAITING); + setStatus(msg); + } + + @Override + public void resume(String msg) { + setState(State.RUNNING); + setStatus(msg); } @Override public void abort(String msg) { setStatus(msg); - state = State.ABORTED; - completionTimestamp = System.currentTimeMillis(); + setState(State.ABORTED); } @Override public void setStatus(String status) { this.status = status; + statusTime = System.currentTimeMillis(); + } + + protected void setState(State state) { + this.state = state; + stateTime = System.currentTimeMillis(); } @Override @@ -86,8 +131,7 @@ class MonitoredTaskImpl implements MonitoredTask { @Override public void cleanup() { if (state == State.RUNNING) { - state = State.ABORTED; - completionTimestamp = System.currentTimeMillis(); + setState(State.ABORTED); } } @@ -95,8 +139,41 @@ class MonitoredTaskImpl implements MonitoredTask { * Force the completion timestamp backwards so that * it expires now. */ - @VisibleForTesting - void expireNow() { - completionTimestamp -= 180 * 1000; + public void expireNow() { + stateTime -= 180 * 1000; } + + @Override + public Map toMap() { + Map map = new HashMap(); + map.put("description", getDescription()); + map.put("status", getStatus()); + map.put("state", getState()); + map.put("starttimems", getStartTime()); + map.put("statustimems", getCompletionTimestamp()); + map.put("statetimems", getCompletionTimestamp()); + return map; + } + + @Override + public String toJSON() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(toMap()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(512); + sb.append(getDescription()); + sb.append(": status="); + sb.append(getStatus()); + sb.append(", state="); + sb.append(getState()); + sb.append(", startTime="); + sb.append(getStartTime()); + sb.append(", completionTime="); + sb.append(getCompletionTimestamp()); + return sb.toString(); + } + } diff --git a/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index d994e54f2d4..53a40c0f1d4 100644 --- a/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -71,12 +71,23 @@ public class TaskMonitor { stat.getClass().getClassLoader(), new Class[] { MonitoredTask.class }, new PassthroughInvocationHandler(stat)); - TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); tasks.add(pair); return proxy; } - + + public MonitoredRPCHandler createRPCStatus(String description) { + MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(); + stat.setDescription(description); + MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance( + stat.getClass().getClassLoader(), + new Class[] { MonitoredRPCHandler.class }, + new PassthroughInvocationHandler(stat)); + TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); + tasks.add(pair); + return proxy; + } + private synchronized void purgeExpiredTasks() { int size = 0; @@ -107,11 +118,17 @@ public class TaskMonitor { } } + /** + * Produces a list containing copies of the current state of all non-expired + * MonitoredTasks handled by this TaskMonitor. + * @return A complete list of MonitoredTasks. + */ public synchronized List getTasks() { purgeExpiredTasks(); ArrayList ret = Lists.newArrayListWithCapacity(tasks.size()); for (TaskAndWeakRefPair pair : tasks) { - ret.add(pair.get()); + MonitoredTask t = pair.get(); + ret.add(t.clone()); } return ret; } @@ -181,7 +198,8 @@ public class TaskMonitor { } /** - * An InvocationHandler that simply passes through calls to the original object. + * An InvocationHandler that simply passes through calls to the original + * object. */ private static class PassthroughInvocationHandler implements InvocationHandler { private T delegatee; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java index e8c2b95325f..0f1fd0481ae 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RSStatusServlet.java @@ -40,7 +40,12 @@ public class RSStatusServlet extends HttpServlet { assert hrs != null : "No RS in context!"; resp.setContentType("text/html"); - new RSStatusTmpl().render(resp.getWriter(), hrs); + RSStatusTmpl tmpl = new RSStatusTmpl(); + if (req.getParameter("format") != null) + tmpl.setFormat(req.getParameter("format")); + if (req.getParameter("filter") != null) + tmpl.setFilter(req.getParameter("filter")); + tmpl.render(resp.getWriter(), hrs); } } diff --git a/src/main/resources/hbase-webapps/static/hbase.css b/src/main/resources/hbase-webapps/static/hbase.css index dadc0dc5719..fbed995fe2d 100644 --- a/src/main/resources/hbase-webapps/static/hbase.css +++ b/src/main/resources/hbase-webapps/static/hbase.css @@ -25,3 +25,9 @@ tr.task-monitor-COMPLETE td { tr.task-monitor-ABORTED td { background-color: #ffa; } + +tr.task-monitor-WAITING td { + background-color: #ccc; + font-style: italic; +} ++ diff --git a/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index d0d767d08c8..3766898344f 100644 --- a/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -44,13 +44,13 @@ public class TestTaskMonitor { // Mark it as finished task.markComplete("Finished!"); - assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState()); + assertEquals(MonitoredTask.State.COMPLETE, task.getState()); // It should still show up in the TaskMonitor list assertEquals(1, tm.getTasks().size()); // If we mark its completion time back a few minutes, it should get gced - ((MonitoredTaskImpl)taskFromTm).expireNow(); + task.expireNow(); assertEquals(0, tm.getTasks().size()); }