HBASE-4057 Implement HBase version of "show processlist"

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1167578 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-09-10 18:44:45 +00:00
parent 699e15b7a1
commit eaa40e323e
16 changed files with 569 additions and 66 deletions

View File

@ -540,6 +540,7 @@ Release 0.91.0 - Unreleased
(Riley Patterson) (Riley Patterson)
HBASE-4292 Add a debugging dump servlet to the master and regionserver HBASE-4292 Add a debugging dump servlet to the master and regionserver
(todd) (todd)
HBASE-4057 Implement HBase version of "show processlist" (Riley Patterson)
Release 0.90.5 - Unreleased Release 0.90.5 - Unreleased

View File

@ -20,40 +20,74 @@ limitations under the License.
<%import> <%import>
java.util.*; java.util.*;
org.apache.hadoop.hbase.monitoring.*; org.apache.hadoop.hbase.monitoring.*;
org.apache.hadoop.util.StringUtils;
</%import> </%import>
<%args> <%args>
TaskMonitor taskMonitor = TaskMonitor.get(); TaskMonitor taskMonitor = TaskMonitor.get();
String filter = "general";
String format = "html";
</%args> </%args>
<%java> <%java>
List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
Iterator<? extends MonitoredTask> iter = tasks.iterator();
// apply requested filter
while (iter.hasNext()) {
MonitoredTask t = iter.next();
if (filter.equals("general")) {
if (t instanceof MonitoredRPCHandler)
iter.remove();
} else if (filter.equals("handler")) {
if (!(t instanceof MonitoredRPCHandler))
iter.remove();
} else if (filter.equals("rpc")) {
if (!(t instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) t).isRPCRunning())
iter.remove();
} else if (filter.equals("operation")) {
if (!(t instanceof MonitoredRPCHandler) ||
!((MonitoredRPCHandler) t).isOperationRunning())
iter.remove();
}
}
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
List<MonitoredTask> tasks = taskMonitor.getTasks();
Collections.reverse(tasks); Collections.reverse(tasks);
boolean first = true;
</%java> </%java>
<h2>Currently running tasks</h2> <%if format.equals("json")%>
[<%for MonitoredTask task : tasks%><%if first%><%java>first = false;</%java><%else>,</%if><% task.toJSON() %></%for>]
<%if tasks.isEmpty()%>
No tasks currently running on this node.
<%else> <%else>
<div style="float:right;">
<a href="?filter=all">Show All Monitored Tasks</a> |
<a href="?filter=general">Show non-RPC Tasks</a> |
<a href="?filter=handler">Show All RPC Handler Tasks</a> |
<a href="?filter=rpc">Show Active RPC Calls</a> |
<a href="?filter=operation">Show Client Operations</a> |
<a href="?format=json&filter=<% filter %>">View as JSON</a>
</div>
<h2>Recent tasks</h2>
<%if tasks.isEmpty()%>
No tasks currently running on this node.
<%else>
<table>
<tr>
<th>Start Time</th>
<th>Description</th>
<th>State</th>
<th>Status</th>
</tr>
<%for MonitoredTask task : tasks %>
<tr class="task-monitor-<% task.getState() %>">
<td><% new Date(task.getStartTime()) %></td>
<td><% task.getDescription() %></td>
<td><% task.getState() %>
(since <% StringUtils.formatTimeDiff(now, task.getStateTime()) %> ago)
</td>
<td><% task.getStatus() %>
(since <% StringUtils.formatTimeDiff(now, task.getStatusTime()) %>
ago)</td>
</tr>
</%for>
</table>
<table>
<tr>
<th>Description</th>
<th>Status</th>
<th>Age</th>
</tr>
<%for MonitoredTask task : tasks %>
<tr class="task-monitor-<% task.getState() %>">
<td><% task.getDescription() %></td>
</td>
<td><% task.getStatus() %></td>
<td><% (int)((now - task.getStartTime())/1000) %>s
<%if task.getCompletionTimestamp() != -1%>
(Completed <% (now - task.getCompletionTimestamp())/1000 %>s ago)
</%if> </%if>
</td>
</tr>
</%for>
</table>
</%if> </%if>

View File

@ -25,6 +25,8 @@ ServerName rootLocation = null;
ServerName metaLocation = null; ServerName metaLocation = null;
List<ServerName> servers = null; List<ServerName> servers = null;
boolean showAppendWarning = false; boolean showAppendWarning = false;
String filter = "general";
String format = "html";
</%args> </%args>
<%import> <%import>
java.util.*; java.util.*;
@ -40,6 +42,10 @@ org.apache.hadoop.hbase.client.HBaseAdmin;
org.apache.hadoop.hbase.client.HConnectionManager; org.apache.hadoop.hbase.client.HConnectionManager;
org.apache.hadoop.hbase.HTableDescriptor; org.apache.hadoop.hbase.HTableDescriptor;
</%import> </%import>
<%if format.equals("json") %>
<& ../common/TaskMonitorTmpl; filter = filter; format = "json" &>
<%java return; %>
</%if>
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
@ -94,7 +100,7 @@ org.apache.hadoop.hbase.HTableDescriptor;
<tr><td>Zookeeper Quorum</td><td><% master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr> <tr><td>Zookeeper Quorum</td><td><% master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
</table> </table>
<& ../common/TaskMonitorTmpl &> <& ../common/TaskMonitorTmpl; filter = filter &>
<%if (rootLocation != null) %> <%if (rootLocation != null) %>
<& catalogTables &> <& catalogTables &>

View File

@ -19,6 +19,8 @@ limitations under the License.
</%doc> </%doc>
<%args> <%args>
HRegionServer regionServer; HRegionServer regionServer;
String filter = "general";
String format = "html";
</%args> </%args>
<%import> <%import>
java.util.*; java.util.*;
@ -33,6 +35,10 @@ org.apache.hadoop.hbase.HServerInfo;
org.apache.hadoop.hbase.HServerLoad; org.apache.hadoop.hbase.HServerLoad;
org.apache.hadoop.hbase.HRegionInfo; org.apache.hadoop.hbase.HRegionInfo;
</%import> </%import>
<%if format.equals("json") %>
<& ../common/TaskMonitorTmpl; filter = filter; format = "json" &>
<%java return; %>
</%if>
<%java> <%java>
HServerInfo serverInfo = null; HServerInfo serverInfo = null;
try { try {
@ -73,7 +79,7 @@ org.apache.hadoop.hbase.HRegionInfo;
<tr><td>Zookeeper Quorum</td><td><% regionServer.getZooKeeper().getQuorum() %></td><td>Addresses of all registered ZK servers</td></tr> <tr><td>Zookeeper Quorum</td><td><% regionServer.getZooKeeper().getQuorum() %></td><td>Addresses of all registered ZK servers</td></tr>
</table> </table>
<& ../common/TaskMonitorTmpl &> <& ../common/TaskMonitorTmpl; filter = filter &>
<h2>Online Regions</h2> <h2>Online Regions</h2>
<%if (onlineRegions != null && onlineRegions.size() > 0) %> <%if (onlineRegions != null && onlineRegions.size() > 0) %>

View File

@ -59,6 +59,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -1029,6 +1031,10 @@ public abstract class HBaseServer implements RpcServer {
return hostAddress; return hostAddress;
} }
public int getRemotePort() {
return remotePort;
}
public void setLastContact(long lastContact) { public void setLastContact(long lastContact) {
this.lastContact = lastContact; this.lastContact = lastContact;
} }
@ -1176,6 +1182,7 @@ public abstract class HBaseServer implements RpcServer {
/** Handles queued calls . */ /** Handles queued calls . */
private class Handler extends Thread { private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue; private final BlockingQueue<Call> myCallQueue;
private MonitoredRPCHandler status;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) { public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq; this.myCallQueue = cq;
@ -1187,15 +1194,21 @@ public abstract class HBaseServer implements RpcServer {
threadName = "PRI " + threadName; threadName = "PRI " + threadName;
} }
this.setName(threadName); this.setName(threadName);
this.status = TaskMonitor.get().createRPCStatus(threadName);
} }
@Override @Override
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
status.setStatus("starting");
SERVER.set(HBaseServer.this); SERVER.set(HBaseServer.this);
while (running) { while (running) {
try { try {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " + LOG.debug(getName() + ": has #" + call.id + " from " +
@ -1209,7 +1222,9 @@ public abstract class HBaseServer implements RpcServer {
try { try {
if (!started) if (!started)
throw new ServerNotRunningYetException("Server is not running yet"); throw new ServerNotRunningYetException("Server is not running yet");
value = call(call.connection.protocol, call.param, call.timestamp); // make the call // make the call
value = call(call.connection.protocol, call.param, call.timestamp,
status);
} catch (Throwable e) { } catch (Throwable e) {
LOG.debug(getName()+", call "+call+": error: " + e, e); LOG.debug(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName(); errorClass = e.getClass().getName();

View File

@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function; import com.google.common.base.Function;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -48,7 +49,7 @@ public interface RpcServer {
* @throws java.io.IOException e * @throws java.io.IOException e
*/ */
Writable call(Class<? extends VersionedProtocol> protocol, Writable call(Class<? extends VersionedProtocol> protocol,
Writable param, long receiveTime) Writable param, long receiveTime, MonitoredRPCHandler status)
throws IOException; throws IOException;
int getNumOpenConnections(); int getNumOpenConnections();

View File

@ -38,6 +38,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects; import org.apache.hadoop.hbase.util.Objects;
@ -316,7 +317,7 @@ class WritableRpcEngine implements RpcEngine {
@Override @Override
public Writable call(Class<? extends VersionedProtocol> protocol, public Writable call(Class<? extends VersionedProtocol> protocol,
Writable param, long receivedTime) Writable param, long receivedTime, MonitoredRPCHandler status)
throws IOException { throws IOException {
try { try {
Invocation call = (Invocation)param; Invocation call = (Invocation)param;
@ -325,6 +326,9 @@ class WritableRpcEngine implements RpcEngine {
"cause is a version mismatch between client and server."); "cause is a version mismatch between client and server.");
} }
if (verbose) log("Call: " + call); if (verbose) log("Call: " + call);
status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
status.setRPCPacket(param);
status.resume("Servicing call");
Method method = Method method =
protocol.getMethod(call.getMethodName(), protocol.getMethod(call.getMethodName(),
@ -369,7 +373,8 @@ class WritableRpcEngine implements RpcEngine {
// when tagging, we let TooLarge trump TooSmall to keep output simple // when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow. // note that large responses will often also be slow.
logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"), logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
startTime, processingTime, qTime, responseSize); status.getClient(), startTime, processingTime, qTime,
responseSize);
// provides a count of log-reported slow responses // provides a count of log-reported slow responses
if (tooSlow) { if (tooSlow) {
rpcMetrics.rpcSlowResponseTime.inc(processingTime); rpcMetrics.rpcSlowResponseTime.inc(processingTime);
@ -407,13 +412,14 @@ class WritableRpcEngine implements RpcEngine {
* client Operations. * client Operations.
* @param call The call to log. * @param call The call to log.
* @param tag The tag that will be used to indicate this event in the log. * @param tag The tag that will be used to indicate this event in the log.
* @param client The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms. * @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms. * @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue * @param qTime The duration that the call spent on the queue
* prior to being initiated, in ms. * prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer. * @param responseSize The size in bytes of the response buffer.
*/ */
private void logResponse(Invocation call, String tag, private void logResponse(Invocation call, String tag, String clientAddress,
long startTime, int processingTime, int qTime, long responseSize) long startTime, int processingTime, int qTime, long responseSize)
throws IOException { throws IOException {
Object params[] = call.getParameters(); Object params[] = call.getParameters();
@ -425,6 +431,7 @@ class WritableRpcEngine implements RpcEngine {
responseInfo.put("processingtimems", processingTime); responseInfo.put("processingtimems", processingTime);
responseInfo.put("queuetimems", qTime); responseInfo.put("queuetimems", qTime);
responseInfo.put("responsesize", responseSize); responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress);
responseInfo.put("class", instance.getClass().getSimpleName()); responseInfo.put("class", instance.getClass().getSimpleName());
responseInfo.put("method", call.getMethodName()); responseInfo.put("method", call.getMethodName());
if (params.length == 2 && instance instanceof HRegionServer && if (params.length == 2 && instance instanceof HRegionServer &&

View File

@ -61,13 +61,17 @@ public class MasterStatusServlet extends HttpServlet {
List<ServerName> servers = master.getServerManager().getOnlineServersList(); List<ServerName> servers = master.getServerManager().getOnlineServersList();
response.setContentType("text/html"); response.setContentType("text/html");
new MasterStatusTmpl() MasterStatusTmpl tmpl = new MasterStatusTmpl()
.setFrags(frags) .setFrags(frags)
.setShowAppendWarning(shouldShowAppendWarning(conf)) .setShowAppendWarning(shouldShowAppendWarning(conf))
.setRootLocation(rootLocation) .setRootLocation(rootLocation)
.setMetaLocation(metaLocation) .setMetaLocation(metaLocation)
.setServers(servers) .setServers(servers);
.render(response.getWriter(), if (request.getParameter("filter") != null)
tmpl.setFilter(request.getParameter("filter"));
if (request.getParameter("format") != null)
tmpl.setFormat(request.getParameter("format"));
tmpl.render(response.getWriter(),
master, admin); master, admin);
} }

View File

@ -0,0 +1,44 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
/**
* A MonitoredTask implementation optimized for use with RPC Handlers
* handling frequent, short duration tasks. String concatenations and object
* allocations are avoided in methods that will be hit by every RPC call.
*/
public interface MonitoredRPCHandler extends MonitoredTask {
public abstract String getRPC();
public abstract String getRPC(boolean withParams);
public abstract long getRPCPacketLength();
public abstract String getClient();
public abstract long getRPCStartTime();
public abstract long getRPCQueueTime();
public abstract boolean isRPCRunning();
public abstract boolean isOperationRunning();
public abstract void setRPC(String methodName, Object [] params,
long queueTime);
public abstract void setRPCPacket(Writable param);
public abstract void setConnection(String clientAddress, int remotePort);
}

View File

@ -0,0 +1,256 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* A MonitoredTask implementation designed for use with RPC Handlers
* handling frequent, short duration tasks. String concatenations and object
* allocations are avoided in methods that will be hit by every RPC call.
*/
public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
implements MonitoredRPCHandler {
private String clientAddress;
private int remotePort;
private long rpcQueueTime;
private long rpcStartTime;
private String methodName = "";
private Object [] params = {};
private Writable packet;
public MonitoredRPCHandlerImpl() {
super();
// in this implementation, WAITING indicates that the handler is not
// actively servicing an RPC call.
setState(State.WAITING);
}
@Override
public synchronized MonitoredRPCHandlerImpl clone() {
return (MonitoredRPCHandlerImpl) super.clone();
}
/**
* Gets the status of this handler; if it is currently servicing an RPC,
* this status will include the RPC information.
* @return a String describing the current status.
*/
@Override
public String getStatus() {
if (getState() != State.RUNNING) {
return super.getStatus();
}
return super.getStatus() + " from " + getClient() + ": " + getRPC();
}
/**
* Accesses the queue time for the currently running RPC on the
* monitored Handler.
* @return the queue timestamp or -1 if there is no RPC currently running.
*/
public long getRPCQueueTime() {
if (getState() != State.RUNNING) {
return -1;
}
return rpcQueueTime;
}
/**
* Accesses the start time for the currently running RPC on the
* monitored Handler.
* @return the start timestamp or -1 if there is no RPC currently running.
*/
public long getRPCStartTime() {
if (getState() != State.RUNNING) {
return -1;
}
return rpcStartTime;
}
/**
* Produces a string representation of the method currently being serviced
* by this Handler.
* @return a string representing the method call without parameters
*/
public String getRPC() {
return getRPC(false);
}
/**
* Produces a string representation of the method currently being serviced
* by this Handler.
* @param withParams toggle inclusion of parameters in the RPC String
* @return A human-readable string representation of the method call.
*/
public synchronized String getRPC(boolean withParams) {
if (getState() != State.RUNNING) {
// no RPC is currently running
return "";
}
StringBuilder buffer = new StringBuilder(256);
buffer.append(methodName);
if (withParams) {
buffer.append("(");
for (int i = 0; i < params.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(params[i]);
}
buffer.append(")");
}
return buffer.toString();
}
/**
* Produces a string representation of the method currently being serviced
* by this Handler.
* @return A human-readable string representation of the method call.
*/
public long getRPCPacketLength() {
if (getState() != State.RUNNING || packet == null) {
// no RPC is currently running, or we don't have an RPC's packet info
return -1L;
}
if (!(packet instanceof WritableWithSize)) {
// the packet passed to us doesn't expose size information
return -1L;
}
return ((WritableWithSize) packet).getWritableSize();
}
/**
* If an RPC call is currently running, produces a String representation of
* the connection from which it was received.
* @return A human-readable string representation of the address and port
* of the client.
*/
public String getClient() {
return clientAddress + ":" + remotePort;
}
/**
* Indicates to the client whether this task is monitoring a currently active
* RPC call.
* @return true if the monitored handler is currently servicing an RPC call.
*/
public boolean isRPCRunning() {
return getState() == State.RUNNING;
}
/**
* Indicates to the client whether this task is monitoring a currently active
* RPC call to a database command. (as defined by
* o.a.h.h.client.Operation)
* @return true if the monitored handler is currently servicing an RPC call
* to a database command.
*/
public boolean isOperationRunning() {
if(!isRPCRunning()) {
return false;
}
for(Object param : params) {
if (param instanceof Operation) {
return true;
}
}
return false;
}
/**
* Tells this instance that it is monitoring a new RPC call.
* @param methodName The name of the method that will be called by the RPC.
* @param params The parameters that will be passed to the indicated method.
*/
public synchronized void setRPC(String methodName, Object [] params,
long queueTime) {
this.methodName = methodName;
this.params = params;
this.rpcStartTime = System.currentTimeMillis();
this.rpcQueueTime = queueTime;
this.state = State.RUNNING;
}
/**
* Gives this instance a reference to the Writable received by the RPC, so
* that it can later compute its size if asked for it.
* @param param The Writable received by the RPC for this call
*/
public void setRPCPacket(Writable param) {
this.packet = param;
}
/**
* Registers current handler client details.
* @param clientAddress the address of the current client
* @param remotePort the port from which the client connected
*/
public void setConnection(String clientAddress, int remotePort) {
this.clientAddress = clientAddress;
this.remotePort = remotePort;
}
public synchronized Map<String, Object> toMap() {
// only include RPC info if the Handler is actively servicing an RPC call
Map<String, Object> map = super.toMap();
if (getState() != State.RUNNING) {
return map;
}
Map<String, Object> rpcJSON = new HashMap<String, Object>();
ArrayList paramList = new ArrayList();
map.put("rpcCall", rpcJSON);
rpcJSON.put("queuetimems", getRPCQueueTime());
rpcJSON.put("starttimems", getRPCStartTime());
rpcJSON.put("clientaddress", clientAddress);
rpcJSON.put("remoteport", remotePort);
rpcJSON.put("packetlength", getRPCPacketLength());
rpcJSON.put("method", methodName);
rpcJSON.put("params", paramList);
for(Object param : params) {
if(param instanceof byte []) {
paramList.add(Bytes.toStringBinary((byte []) param));
} else if (param instanceof Operation) {
paramList.add(((Operation) param).toMap());
} else {
paramList.add(param.toString());
}
}
return map;
}
@Override
public String toString() {
if (getState() != State.RUNNING) {
return super.toString();
}
return super.toString() + ", rpcMethod=" + getRPC();
}
}

View File

@ -19,28 +19,32 @@
*/ */
package org.apache.hadoop.hbase.monitoring; package org.apache.hadoop.hbase.monitoring;
public interface MonitoredTask { import java.io.IOException;
import java.util.Map;
public interface MonitoredTask extends Cloneable {
enum State { enum State {
RUNNING, RUNNING,
WAITING,
COMPLETE, COMPLETE,
ABORTED; ABORTED;
} }
public abstract long getStartTime(); public abstract long getStartTime();
public abstract String getDescription(); public abstract String getDescription();
public abstract String getStatus(); public abstract String getStatus();
public abstract long getStatusTime();
public abstract State getState(); public abstract State getState();
public abstract long getStateTime();
public abstract long getCompletionTimestamp(); public abstract long getCompletionTimestamp();
public abstract void markComplete(String msg); public abstract void markComplete(String msg);
public abstract void pause(String msg);
public abstract void resume(String msg);
public abstract void abort(String msg); public abstract void abort(String msg);
public abstract void expireNow();
public abstract void setStatus(String status); public abstract void setStatus(String status);
public abstract void setDescription(String description); public abstract void setDescription(String description);
/** /**
@ -49,5 +53,24 @@ public interface MonitoredTask {
*/ */
public abstract void cleanup(); public abstract void cleanup();
/**
* Public exposure of Object.clone() in order to allow clients to easily
* capture current state.
* @returns a copy of the object whose references will not change
*/
public abstract MonitoredTask clone();
/**
* Creates a string map of internal details for extensible exposure of
* monitored tasks.
* @return A Map containing information for this task.
*/
public abstract Map<String, Object> toMap() throws IOException;
/**
* Creates a JSON object for parseable exposure of monitored tasks.
* @return An encoded JSON object containing information for this task.
*/
public abstract String toJSON() throws IOException;
} }

View File

@ -19,19 +19,35 @@
*/ */
package org.apache.hadoop.hbase.monitoring; package org.apache.hadoop.hbase.monitoring;
import com.google.common.annotations.VisibleForTesting; import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
class MonitoredTaskImpl implements MonitoredTask { class MonitoredTaskImpl implements MonitoredTask {
private long startTime; private long startTime;
private long completionTimestamp = -1; private long statusTime;
private long stateTime;
private String status; private volatile String status;
private String description; private volatile String description;
private State state = State.RUNNING; protected volatile State state = State.RUNNING;
public MonitoredTaskImpl() { public MonitoredTaskImpl() {
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
statusTime = startTime;
stateTime = startTime;
}
@Override
public synchronized MonitoredTaskImpl clone() {
try {
return (MonitoredTaskImpl) super.clone();
} catch (CloneNotSupportedException e) {
throw new AssertionError(); // Won't happen
}
} }
@Override @Override
@ -49,33 +65,62 @@ class MonitoredTaskImpl implements MonitoredTask {
return status; return status;
} }
@Override
public long getStatusTime() {
return statusTime;
}
@Override @Override
public State getState() { public State getState() {
return state; return state;
} }
@Override
public long getStateTime() {
return stateTime;
}
@Override @Override
public long getCompletionTimestamp() { public long getCompletionTimestamp() {
return completionTimestamp; if (state == State.COMPLETE || state == State.ABORTED) {
return stateTime;
}
return -1;
} }
@Override @Override
public void markComplete(String status) { public void markComplete(String status) {
state = State.COMPLETE; setState(State.COMPLETE);
setStatus(status); setStatus(status);
completionTimestamp = System.currentTimeMillis(); }
@Override
public void pause(String msg) {
setState(State.WAITING);
setStatus(msg);
}
@Override
public void resume(String msg) {
setState(State.RUNNING);
setStatus(msg);
} }
@Override @Override
public void abort(String msg) { public void abort(String msg) {
setStatus(msg); setStatus(msg);
state = State.ABORTED; setState(State.ABORTED);
completionTimestamp = System.currentTimeMillis();
} }
@Override @Override
public void setStatus(String status) { public void setStatus(String status) {
this.status = status; this.status = status;
statusTime = System.currentTimeMillis();
}
protected void setState(State state) {
this.state = state;
stateTime = System.currentTimeMillis();
} }
@Override @Override
@ -86,8 +131,7 @@ class MonitoredTaskImpl implements MonitoredTask {
@Override @Override
public void cleanup() { public void cleanup() {
if (state == State.RUNNING) { if (state == State.RUNNING) {
state = State.ABORTED; setState(State.ABORTED);
completionTimestamp = System.currentTimeMillis();
} }
} }
@ -95,8 +139,41 @@ class MonitoredTaskImpl implements MonitoredTask {
* Force the completion timestamp backwards so that * Force the completion timestamp backwards so that
* it expires now. * it expires now.
*/ */
@VisibleForTesting public void expireNow() {
void expireNow() { stateTime -= 180 * 1000;
completionTimestamp -= 180 * 1000;
} }
@Override
public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("description", getDescription());
map.put("status", getStatus());
map.put("state", getState());
map.put("starttimems", getStartTime());
map.put("statustimems", getCompletionTimestamp());
map.put("statetimems", getCompletionTimestamp());
return map;
}
@Override
public String toJSON() throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(toMap());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(512);
sb.append(getDescription());
sb.append(": status=");
sb.append(getStatus());
sb.append(", state=");
sb.append(getState());
sb.append(", startTime=");
sb.append(getStartTime());
sb.append(", completionTime=");
sb.append(getCompletionTimestamp());
return sb.toString();
}
} }

View File

@ -71,7 +71,18 @@ public class TaskMonitor {
stat.getClass().getClassLoader(), stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class }, new Class<?>[] { MonitoredTask.class },
new PassthroughInvocationHandler<MonitoredTask>(stat)); new PassthroughInvocationHandler<MonitoredTask>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair);
return proxy;
}
public MonitoredRPCHandler createRPCStatus(String description) {
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
stat.setDescription(description);
MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredRPCHandler.class },
new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy); TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair); tasks.add(pair);
return proxy; return proxy;
@ -107,11 +118,17 @@ public class TaskMonitor {
} }
} }
/**
* Produces a list containing copies of the current state of all non-expired
* MonitoredTasks handled by this TaskMonitor.
* @return A complete list of MonitoredTasks.
*/
public synchronized List<MonitoredTask> getTasks() { public synchronized List<MonitoredTask> getTasks() {
purgeExpiredTasks(); purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size()); ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
for (TaskAndWeakRefPair pair : tasks) { for (TaskAndWeakRefPair pair : tasks) {
ret.add(pair.get()); MonitoredTask t = pair.get();
ret.add(t.clone());
} }
return ret; return ret;
} }
@ -181,7 +198,8 @@ public class TaskMonitor {
} }
/** /**
* An InvocationHandler that simply passes through calls to the original object. * An InvocationHandler that simply passes through calls to the original
* object.
*/ */
private static class PassthroughInvocationHandler<T> implements InvocationHandler { private static class PassthroughInvocationHandler<T> implements InvocationHandler {
private T delegatee; private T delegatee;

View File

@ -40,7 +40,12 @@ public class RSStatusServlet extends HttpServlet {
assert hrs != null : "No RS in context!"; assert hrs != null : "No RS in context!";
resp.setContentType("text/html"); resp.setContentType("text/html");
new RSStatusTmpl().render(resp.getWriter(), hrs); RSStatusTmpl tmpl = new RSStatusTmpl();
if (req.getParameter("format") != null)
tmpl.setFormat(req.getParameter("format"));
if (req.getParameter("filter") != null)
tmpl.setFilter(req.getParameter("filter"));
tmpl.render(resp.getWriter(), hrs);
} }
} }

View File

@ -25,3 +25,9 @@ tr.task-monitor-COMPLETE td {
tr.task-monitor-ABORTED td { tr.task-monitor-ABORTED td {
background-color: #ffa; background-color: #ffa;
} }
tr.task-monitor-WAITING td {
background-color: #ccc;
font-style: italic;
}
+

View File

@ -44,13 +44,13 @@ public class TestTaskMonitor {
// Mark it as finished // Mark it as finished
task.markComplete("Finished!"); task.markComplete("Finished!");
assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState()); assertEquals(MonitoredTask.State.COMPLETE, task.getState());
// It should still show up in the TaskMonitor list // It should still show up in the TaskMonitor list
assertEquals(1, tm.getTasks().size()); assertEquals(1, tm.getTasks().size());
// If we mark its completion time back a few minutes, it should get gced // If we mark its completion time back a few minutes, it should get gced
((MonitoredTaskImpl)taskFromTm).expireNow(); task.expireNow();
assertEquals(0, tm.getTasks().size()); assertEquals(0, tm.getTasks().size());
} }