HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

This commit is contained in:
Enis Soztutar 2015-10-27 14:24:21 -07:00
parent dfa05284cf
commit d5d81d675a
6 changed files with 29 additions and 17 deletions

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -59,20 +58,22 @@ public class CallRunner {
this.rpcServer = rpcServer; this.rpcServer = rpcServer;
// Add size of the call to queue size. // Add size of the call to queue size.
this.rpcServer.addCallSize(call.getSize()); this.rpcServer.addCallSize(call.getSize());
this.status = getStatus();
} }
public Call getCall() { public Call getCall() {
return call; return call;
} }
public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
/** /**
* Cleanup after ourselves... let go of references. * Cleanup after ourselves... let go of references.
*/ */
private void cleanup() { private void cleanup() {
this.call = null; this.call = null;
this.rpcServer = null; this.rpcServer = null;
this.status = null;
} }
public void run() { public void run() {
@ -160,16 +161,4 @@ public class CallRunner {
cleanup(); cleanup();
} }
} }
MonitoredRPCHandler getStatus() {
// It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
RpcServer.MONITORED_RPC.set(status);
return status;
}
} }

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.CallRunner;
/** /**
* A very simple {@code }RpcScheduler} that serves incoming requests in order. * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@ -70,6 +69,7 @@ public class FifoRpcScheduler extends RpcScheduler {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
task.setStatus(RpcServer.getStatus());
task.run(); task.run();
} }
}); });

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -124,7 +125,9 @@ public abstract class RpcExecutor {
try { try {
while (running) { while (running) {
try { try {
MonitoredRPCHandler status = RpcServer.getStatus();
CallRunner task = myQueue.take(); CallRunner task = myQueue.take();
task.setStatus(status);
try { try {
activeHandlerCount.incrementAndGet(); activeHandlerCount.incrementAndGet();
task.run(); task.run();

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@ -2314,7 +2315,8 @@ public class RpcServer implements RpcServerInterface {
* @param user client user * @param user client user
* @param connection incoming connection * @param connection incoming connection
* @param addr InetAddress of incoming connection * @param addr InetAddress of incoming connection
* @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol * @throws org.apache.hadoop.security.authorize.AuthorizationException
* when the client isn't authorized to talk the protocol
*/ */
public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr) public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
throws AuthorizationException { throws AuthorizationException {
@ -2498,6 +2500,18 @@ public class RpcServer implements RpcServerInterface {
return bsasi == null? null: bsasi.getBlockingService(); return bsasi == null? null: bsasi.getBlockingService();
} }
static MonitoredRPCHandler getStatus() {
// It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
if (status != null) {
return status;
}
status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
status.pause("Waiting for a call");
RpcServer.MONITORED_RPC.set(status);
return status;
}
/** Returns the remote side ip address when invoked inside an RPC /** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error. * Returns null incase of an error.
* @return InetAddress * @return InetAddress

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test; import org.junit.Test;
@ -35,6 +36,7 @@ public class TestCallRunner {
RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class); RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
mockCall.connection = Mockito.mock(RpcServer.Connection.class); mockCall.connection = Mockito.mock(RpcServer.Connection.class);
CallRunner cr = new CallRunner(mockRpcServer, mockCall); CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run(); cr.run();
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -87,6 +88,7 @@ public class TestSimpleRpcScheduler {
scheduler.init(CONTEXT); scheduler.init(CONTEXT);
scheduler.start(); scheduler.start();
CallRunner task = createMockTask(); CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
scheduler.dispatch(task); scheduler.dispatch(task);
verify(task, timeout(1000)).run(); verify(task, timeout(1000)).run();
scheduler.stop(); scheduler.stop();
@ -121,6 +123,7 @@ public class TestSimpleRpcScheduler {
} }
}; };
for (CallRunner task : tasks) { for (CallRunner task : tasks) {
task.setStatus(new MonitoredRPCHandlerImpl());
doAnswer(answerToRun).when(task).run(); doAnswer(answerToRun).when(task).run();
} }
@ -303,6 +306,7 @@ public class TestSimpleRpcScheduler {
private void doAnswerTaskExecution(final CallRunner callTask, private void doAnswerTaskExecution(final CallRunner callTask,
final ArrayList<Integer> results, final int value, final int sleepInterval) { final ArrayList<Integer> results, final int value, final int sleepInterval) {
callTask.setStatus(new MonitoredRPCHandlerImpl());
doAnswer(new Answer<Object>() { doAnswer(new Answer<Object>() {
@Override @Override
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {