HBASE-26727 Fix CallDroppedException reporting (#4088)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
8fb3d4666a
commit
a49c758f58
|
@ -44,6 +44,7 @@ public interface ExceptionTrackingSource extends BaseSource {
|
||||||
String EXCEPTIONS_QUOTA_EXCEEDED = "exceptions.quotaExceeded";
|
String EXCEPTIONS_QUOTA_EXCEEDED = "exceptions.quotaExceeded";
|
||||||
String EXCEPTIONS_RPC_THROTTLING = "exceptions.rpcThrottling";
|
String EXCEPTIONS_RPC_THROTTLING = "exceptions.rpcThrottling";
|
||||||
String EXCEPTIONS_CALL_DROPPED = "exceptions.callDropped";
|
String EXCEPTIONS_CALL_DROPPED = "exceptions.callDropped";
|
||||||
|
String EXCEPTIONS_CALL_TIMED_OUT = "exceptions.callTimedOut";
|
||||||
|
|
||||||
void exception();
|
void exception();
|
||||||
|
|
||||||
|
@ -62,4 +63,5 @@ public interface ExceptionTrackingSource extends BaseSource {
|
||||||
void quotaExceededException();
|
void quotaExceededException();
|
||||||
void rpcThrottlingException();
|
void rpcThrottlingException();
|
||||||
void callDroppedException();
|
void callDroppedException();
|
||||||
|
void callTimedOut();
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ public class ExceptionTrackingSourceImpl extends BaseSourceImpl
|
||||||
protected MutableFastCounter exceptionsQuotaExceeded;
|
protected MutableFastCounter exceptionsQuotaExceeded;
|
||||||
protected MutableFastCounter exceptionsRpcThrottling;
|
protected MutableFastCounter exceptionsRpcThrottling;
|
||||||
protected MutableFastCounter exceptionsCallDropped;
|
protected MutableFastCounter exceptionsCallDropped;
|
||||||
|
protected MutableFastCounter exceptionsCallTimedOut;
|
||||||
|
|
||||||
public ExceptionTrackingSourceImpl(String metricsName, String metricsDescription,
|
public ExceptionTrackingSourceImpl(String metricsName, String metricsDescription,
|
||||||
String metricsContext, String metricsJmxContext) {
|
String metricsContext, String metricsJmxContext) {
|
||||||
|
@ -75,6 +76,8 @@ public class ExceptionTrackingSourceImpl extends BaseSourceImpl
|
||||||
.newCounter(EXCEPTIONS_RPC_THROTTLING, EXCEPTIONS_TYPE_DESC, 0L);
|
.newCounter(EXCEPTIONS_RPC_THROTTLING, EXCEPTIONS_TYPE_DESC, 0L);
|
||||||
this.exceptionsCallDropped = this.getMetricsRegistry()
|
this.exceptionsCallDropped = this.getMetricsRegistry()
|
||||||
.newCounter(EXCEPTIONS_CALL_DROPPED, EXCEPTIONS_TYPE_DESC, 0L);
|
.newCounter(EXCEPTIONS_CALL_DROPPED, EXCEPTIONS_TYPE_DESC, 0L);
|
||||||
|
this.exceptionsCallTimedOut = this.getMetricsRegistry()
|
||||||
|
.newCounter(EXCEPTIONS_CALL_TIMED_OUT, EXCEPTIONS_TYPE_DESC, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -141,4 +144,9 @@ public class ExceptionTrackingSourceImpl extends BaseSourceImpl
|
||||||
public void callDroppedException() {
|
public void callDroppedException() {
|
||||||
exceptionsCallDropped.incr();
|
exceptionsCallDropped.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callTimedOut() {
|
||||||
|
exceptionsCallTimedOut.incr();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ public class CallRunner {
|
||||||
call.setStartTime(EnvironmentEdgeManager.currentTime());
|
call.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||||
if (call.getStartTime() > call.getDeadline()) {
|
if (call.getStartTime() > call.getDeadline()) {
|
||||||
RpcServer.LOG.warn("Dropping timed out call: " + call);
|
RpcServer.LOG.warn("Dropping timed out call: " + call);
|
||||||
|
this.rpcServer.getMetrics().callTimedOut();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.status.setStatus("Setting up call");
|
this.status.setStatus("Setting up call");
|
||||||
|
@ -207,6 +208,7 @@ public class CallRunner {
|
||||||
call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
|
call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
|
||||||
+ (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
|
+ (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
|
||||||
call.sendResponseIfReady();
|
call.sendResponseIfReady();
|
||||||
|
this.rpcServer.getMetrics().exception(CALL_DROPPED_EXCEPTION);
|
||||||
} catch (ClosedChannelException cce) {
|
} catch (ClosedChannelException cce) {
|
||||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||||
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
|
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
|
||||||
|
|
|
@ -135,6 +135,10 @@ public class MetricsHBaseServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void callTimedOut() {
|
||||||
|
source.callTimedOut();
|
||||||
|
}
|
||||||
|
|
||||||
public MetricsHBaseServerSource getMetricsSource() {
|
public MetricsHBaseServerSource getMetricsSource() {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import org.apache.hadoop.hbase.CallDroppedException;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||||
|
@ -60,7 +62,7 @@ public class TestCallRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCallRunnerDrop() {
|
public void testCallRunnerDropDisconnected() {
|
||||||
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
|
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
|
||||||
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
|
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
|
||||||
ServerCall mockCall = Mockito.mock(ServerCall.class);
|
ServerCall mockCall = Mockito.mock(ServerCall.class);
|
||||||
|
@ -71,4 +73,21 @@ public class TestCallRunner {
|
||||||
cr.drop();
|
cr.drop();
|
||||||
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCallRunnerDropConnected() {
|
||||||
|
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
|
||||||
|
MetricsHBaseServer mockMetrics = Mockito.mock(MetricsHBaseServer.class);
|
||||||
|
Mockito.when(mockRpcServer.getMetrics()).thenReturn(mockMetrics);
|
||||||
|
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
|
||||||
|
Mockito.when(mockRpcServer.getListenerAddress()).thenReturn(InetSocketAddress.createUnresolved("foo", 60020));
|
||||||
|
ServerCall mockCall = Mockito.mock(ServerCall.class);
|
||||||
|
Mockito.when(mockCall.disconnectSince()).thenReturn(-1L);
|
||||||
|
|
||||||
|
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||||
|
cr.setStatus(new MonitoredRPCHandlerImpl());
|
||||||
|
cr.drop();
|
||||||
|
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
|
||||||
|
Mockito.verify(mockMetrics).exception(Mockito.any(CallDroppedException.class));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue