HBASE-27390 getClusterMetrics NullPointerException when ServerTask status null (#4853)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
96e88144de
commit
105a2749f4
|
@ -44,8 +44,8 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
|
||||||
private boolean snapshot = false;
|
private boolean snapshot = false;
|
||||||
private Map<String, Object> callInfoMap = new HashMap<>();
|
private Map<String, Object> callInfoMap = new HashMap<>();
|
||||||
|
|
||||||
public MonitoredRPCHandlerImpl() {
|
public MonitoredRPCHandlerImpl(String description) {
|
||||||
super(false);
|
super(false, description);
|
||||||
// in this implementation, WAITING indicates that the handler is not
|
// in this implementation, WAITING indicates that the handler is not
|
||||||
// actively servicing an RPC call.
|
// actively servicing an RPC call.
|
||||||
setState(State.WAITING);
|
setState(State.WAITING);
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.GsonUtil;
|
import org.apache.hadoop.hbase.util.GsonUtil;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
import org.apache.hbase.thirdparty.com.google.gson.Gson;
|
import org.apache.hbase.thirdparty.com.google.gson.Gson;
|
||||||
|
|
||||||
|
@ -46,11 +47,13 @@ class MonitoredTaskImpl implements MonitoredTask {
|
||||||
|
|
||||||
private static final Gson GSON = GsonUtil.createGson().create();
|
private static final Gson GSON = GsonUtil.createGson().create();
|
||||||
|
|
||||||
public MonitoredTaskImpl(boolean enableJournal) {
|
public MonitoredTaskImpl(boolean enableJournal, String description) {
|
||||||
startTime = EnvironmentEdgeManager.currentTime();
|
startTime = EnvironmentEdgeManager.currentTime();
|
||||||
statusTime = startTime;
|
statusTime = startTime;
|
||||||
stateTime = startTime;
|
stateTime = startTime;
|
||||||
warnTime = startTime;
|
warnTime = startTime;
|
||||||
|
this.description = description;
|
||||||
|
this.status = "status unset";
|
||||||
if (enableJournal) {
|
if (enableJournal) {
|
||||||
journal = new ConcurrentLinkedQueue<>();
|
journal = new ConcurrentLinkedQueue<>();
|
||||||
} else {
|
} else {
|
||||||
|
@ -161,6 +164,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setStatus(String status) {
|
public void setStatus(String status) {
|
||||||
|
Preconditions.checkNotNull(status, "Status is null");
|
||||||
this.status = status;
|
this.status = status;
|
||||||
statusTime = EnvironmentEdgeManager.currentTime();
|
statusTime = EnvironmentEdgeManager.currentTime();
|
||||||
if (journal != null) {
|
if (journal != null) {
|
||||||
|
@ -175,6 +179,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setDescription(String description) {
|
public void setDescription(String description) {
|
||||||
|
Preconditions.checkNotNull(description, "Description is null");
|
||||||
this.description = description;
|
this.description = description;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,8 +94,7 @@ public class TaskMonitor {
|
||||||
|
|
||||||
public synchronized MonitoredTask createStatus(String description, boolean ignore,
|
public synchronized MonitoredTask createStatus(String description, boolean ignore,
|
||||||
boolean enableJournal) {
|
boolean enableJournal) {
|
||||||
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
|
MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
|
||||||
stat.setDescription(description);
|
|
||||||
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
||||||
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
|
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
|
||||||
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
|
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
|
||||||
|
@ -109,8 +108,7 @@ public class TaskMonitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized MonitoredRPCHandler createRPCStatus(String description) {
|
public synchronized MonitoredRPCHandler createRPCStatus(String description) {
|
||||||
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
|
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
|
||||||
stat.setDescription(description);
|
|
||||||
MonitoredRPCHandler proxy =
|
MonitoredRPCHandler proxy =
|
||||||
(MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
(MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
||||||
new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
|
new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class TestCallRunner {
|
||||||
|
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> {
|
||||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
cr.run();
|
cr.run();
|
||||||
}, testName.getMethodName());
|
}, testName.getMethodName());
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ public class TestCallRunner {
|
||||||
|
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> {
|
||||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
cr.run();
|
cr.run();
|
||||||
}, testName.getMethodName());
|
}, testName.getMethodName());
|
||||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||||
|
@ -116,7 +116,7 @@ public class TestCallRunner {
|
||||||
|
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> {
|
||||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
cr.drop();
|
cr.drop();
|
||||||
}, testName.getMethodName());
|
}, testName.getMethodName());
|
||||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||||
|
@ -142,7 +142,7 @@ public class TestCallRunner {
|
||||||
|
|
||||||
TraceUtil.trace(() -> {
|
TraceUtil.trace(() -> {
|
||||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||||
cr.setStatus(new MonitoredRPCHandlerImpl());
|
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
cr.drop();
|
cr.drop();
|
||||||
}, testName.getMethodName());
|
}, testName.getMethodName());
|
||||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||||
|
|
|
@ -113,7 +113,7 @@ public class TestFifoRpcScheduler {
|
||||||
|
|
||||||
for (int i = totalCallMethods; i > 0; i--) {
|
for (int i = totalCallMethods; i > 0; i--) {
|
||||||
CallRunner task = createMockTask();
|
CallRunner task = createMockTask();
|
||||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
|
|
||||||
if (!scheduler.dispatch(task)) {
|
if (!scheduler.dispatch(task)) {
|
||||||
unableToDispatch++;
|
unableToDispatch++;
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class TestSimpleRpcScheduler {
|
||||||
scheduler.init(CONTEXT);
|
scheduler.init(CONTEXT);
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
CallRunner task = createMockTask();
|
CallRunner task = createMockTask();
|
||||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
scheduler.dispatch(task);
|
scheduler.dispatch(task);
|
||||||
verify(task, timeout(10000)).run();
|
verify(task, timeout(10000)).run();
|
||||||
scheduler.stop();
|
scheduler.stop();
|
||||||
|
@ -164,7 +164,7 @@ public class TestSimpleRpcScheduler {
|
||||||
int totalCallMethods = 10;
|
int totalCallMethods = 10;
|
||||||
for (int i = totalCallMethods; i > 0; i--) {
|
for (int i = totalCallMethods; i > 0; i--) {
|
||||||
CallRunner task = createMockTask();
|
CallRunner task = createMockTask();
|
||||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
scheduler.dispatch(task);
|
scheduler.dispatch(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ public class TestSimpleRpcScheduler {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
for (CallRunner task : tasks) {
|
for (CallRunner task : tasks) {
|
||||||
task.setStatus(new MonitoredRPCHandlerImpl());
|
task.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
doAnswer(answerToRun).when(task).run();
|
doAnswer(answerToRun).when(task).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,7 +524,7 @@ public class TestSimpleRpcScheduler {
|
||||||
|
|
||||||
private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
|
private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
|
||||||
final int value, final int sleepInterval) {
|
final int value, final int sleepInterval) {
|
||||||
callTask.setStatus(new MonitoredRPCHandlerImpl());
|
callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
|
||||||
doAnswer(new Answer<Object>() {
|
doAnswer(new Answer<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
|
|
|
@ -63,6 +63,8 @@ public class TestTaskMonitor {
|
||||||
assertEquals(task.getDescription(), taskFromTm.getDescription());
|
assertEquals(task.getDescription(), taskFromTm.getDescription());
|
||||||
assertEquals(-1, taskFromTm.getCompletionTimestamp());
|
assertEquals(-1, taskFromTm.getCompletionTimestamp());
|
||||||
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
|
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
|
||||||
|
assertEquals(task.getStatus(), taskFromTm.getStatus());
|
||||||
|
assertEquals("status unset", taskFromTm.getStatus());
|
||||||
|
|
||||||
// Mark it as finished
|
// Mark it as finished
|
||||||
task.markComplete("Finished!");
|
task.markComplete("Finished!");
|
||||||
|
@ -229,7 +231,7 @@ public class TestTaskMonitor {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClone() throws Exception {
|
public void testClone() throws Exception {
|
||||||
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
|
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test");
|
||||||
monitor.abort("abort RPC");
|
monitor.abort("abort RPC");
|
||||||
TestParam testParam = new TestParam("param1");
|
TestParam testParam = new TestParam("param1");
|
||||||
monitor.setRPC("method1", new Object[] { testParam }, 0);
|
monitor.setRPC("method1", new Object[] { testParam }, 0);
|
||||||
|
|
Loading…
Reference in New Issue