HBASE-27390 getClusterMetrics NullPointerException when ServerTask status null (#4853)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
2874c07ca5
commit
47b12d75f8
|
@ -44,8 +44,8 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
|
|||
private boolean snapshot = false;
|
||||
private Map<String, Object> callInfoMap = new HashMap<>();
|
||||
|
||||
public MonitoredRPCHandlerImpl() {
|
||||
super(false);
|
||||
public MonitoredRPCHandlerImpl(String description) {
|
||||
super(false, description);
|
||||
// in this implementation, WAITING indicates that the handler is not
|
||||
// actively servicing an RPC call.
|
||||
setState(State.WAITING);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.GsonUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.Gson;
|
||||
|
||||
|
@ -46,11 +47,13 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
|
||||
private static final Gson GSON = GsonUtil.createGson().create();
|
||||
|
||||
public MonitoredTaskImpl(boolean enableJournal) {
|
||||
public MonitoredTaskImpl(boolean enableJournal, String description) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
statusTime = startTime;
|
||||
stateTime = startTime;
|
||||
warnTime = startTime;
|
||||
this.description = description;
|
||||
this.status = "status unset";
|
||||
if (enableJournal) {
|
||||
journal = new ConcurrentLinkedQueue<>();
|
||||
} else {
|
||||
|
@ -161,6 +164,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
|
||||
@Override
|
||||
public void setStatus(String status) {
|
||||
Preconditions.checkNotNull(status, "Status is null");
|
||||
this.status = status;
|
||||
statusTime = EnvironmentEdgeManager.currentTime();
|
||||
if (journal != null) {
|
||||
|
@ -175,6 +179,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
|
||||
@Override
|
||||
public void setDescription(String description) {
|
||||
Preconditions.checkNotNull(description, "Description is null");
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
|
|
|
@ -89,8 +89,7 @@ public class TaskMonitor {
|
|||
}
|
||||
|
||||
public synchronized MonitoredTask createStatus(String description, boolean enableJournal) {
|
||||
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
|
||||
stat.setDescription(description);
|
||||
MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
|
||||
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
||||
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
|
||||
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
|
||||
|
@ -102,8 +101,7 @@ public class TaskMonitor {
|
|||
}
|
||||
|
||||
public synchronized MonitoredRPCHandler createRPCStatus(String description) {
|
||||
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
|
||||
stat.setDescription(description);
|
||||
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
|
||||
MonitoredRPCHandler proxy =
|
||||
(MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
||||
new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
|
||||
|
|
|
@ -81,7 +81,7 @@ public class TestCallRunner {
|
|||
|
||||
TraceUtil.trace(() -> {
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
cr.run();
|
||||
}, testName.getMethodName());
|
||||
|
||||
|
@ -101,7 +101,7 @@ public class TestCallRunner {
|
|||
|
||||
TraceUtil.trace(() -> {
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
cr.run();
|
||||
}, testName.getMethodName());
|
||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||
|
@ -116,7 +116,7 @@ public class TestCallRunner {
|
|||
|
||||
TraceUtil.trace(() -> {
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
cr.drop();
|
||||
}, testName.getMethodName());
|
||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||
|
@ -142,7 +142,7 @@ public class TestCallRunner {
|
|||
|
||||
TraceUtil.trace(() -> {
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
||||
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
cr.drop();
|
||||
}, testName.getMethodName());
|
||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TestFifoRpcScheduler {
|
|||
|
||||
for (int i = totalCallMethods; i > 0; i--) {
|
||||
CallRunner task = createMockTask();
|
||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
||||
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
|
||||
if (!scheduler.dispatch(task)) {
|
||||
unableToDispatch++;
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestSimpleRpcScheduler {
|
|||
scheduler.init(CONTEXT);
|
||||
scheduler.start();
|
||||
CallRunner task = createMockTask();
|
||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
||||
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
scheduler.dispatch(task);
|
||||
verify(task, timeout(10000)).run();
|
||||
scheduler.stop();
|
||||
|
@ -164,7 +164,7 @@ public class TestSimpleRpcScheduler {
|
|||
int totalCallMethods = 10;
|
||||
for (int i = totalCallMethods; i > 0; i--) {
|
||||
CallRunner task = createMockTask();
|
||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
||||
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
scheduler.dispatch(task);
|
||||
}
|
||||
|
||||
|
@ -205,7 +205,7 @@ public class TestSimpleRpcScheduler {
|
|||
}
|
||||
};
|
||||
for (CallRunner task : tasks) {
|
||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
||||
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
doAnswer(answerToRun).when(task).run();
|
||||
}
|
||||
|
||||
|
@ -513,7 +513,7 @@ public class TestSimpleRpcScheduler {
|
|||
|
||||
private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
|
||||
final int value, final int sleepInterval) {
|
||||
callTask.setStatus(new MonitoredRPCHandlerImpl());
|
||||
callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||
doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) {
|
||||
|
|
|
@ -63,6 +63,8 @@ public class TestTaskMonitor {
|
|||
assertEquals(task.getDescription(), taskFromTm.getDescription());
|
||||
assertEquals(-1, taskFromTm.getCompletionTimestamp());
|
||||
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
|
||||
assertEquals(task.getStatus(), taskFromTm.getStatus());
|
||||
assertEquals("status unset", taskFromTm.getStatus());
|
||||
|
||||
// Mark it as finished
|
||||
task.markComplete("Finished!");
|
||||
|
@ -229,7 +231,7 @@ public class TestTaskMonitor {
|
|||
|
||||
@Test
|
||||
public void testClone() throws Exception {
|
||||
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
|
||||
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test");
|
||||
monitor.abort("abort RPC");
|
||||
TestParam testParam = new TestParam("param1");
|
||||
monitor.setRPC("method1", new Object[] { testParam }, 0);
|
||||
|
|
Loading…
Reference in New Issue