port stats handler responsesThrown changes to 12.0.x

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2022-05-12 10:44:06 +10:00
parent 385f00f72b
commit 172480184c
2 changed files with 148 additions and 84 deletions

View File

@ -41,6 +41,7 @@ public class StatisticsHandler extends Handler.Wrapper
private final SampleStatistic _handleTimeStats = new SampleStatistic();
private final SampleStatistic _processTimeStats = new SampleStatistic();
private final LongAdder _responsesThrown = new LongAdder();
private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
private final LongAdder _responses3xx = new LongAdder();
@ -51,87 +52,22 @@ public class StatisticsHandler extends Handler.Wrapper
public Request.Processor handle(Request request) throws Exception
{
long beginTimeStamp = System.nanoTime();
_connectionStats.computeIfAbsent(request.getConnectionMetaData().getId(), id ->
{
// TODO test this with localconnector endpoint that has multiple requests per connection.
request.getConnectionMetaData().getConnection().addEventListener(new Connection.Listener()
{
@Override
public void onClosed(Connection connection)
{
// complete connections stats
_connectionStats.remove(id);
}
});
return "SomeConnectionStatsObject";
});
StatisticsRequest statisticsRequest = new StatisticsRequest(request);
_handleStats.increment();
_requestStats.increment();
StatisticsRequest statisticsRequest = new StatisticsRequest(request);
// TODO don't need to do this until we see a Processor
request.addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
try
{
@Override
public void send(MetaData.Request request, MetaData.Response response, boolean last, Callback callback, ByteBuffer... content)
{
if (response != null)
{
switch (response.getStatus() / 100)
{
case 1 -> _responses1xx.increment();
case 2 -> _responses2xx.increment();
case 3 -> _responses3xx.increment();
case 4 -> _responses4xx.increment();
case 5 -> _responses5xx.increment();
default ->
{
}
}
}
for (ByteBuffer b : content)
{
statisticsRequest._bytesWritten.add(b.remaining());
}
super.send(request, response, last, callback, content);
}
@Override
public Content readContent()
{
Content content = super.readContent();
if (content != null)
statisticsRequest._bytesRead.add(content.remaining());
return content;
}
@Override
public void succeeded()
{
super.succeeded();
_processStats.decrement();
_requestStats.decrement();
_processTimeStats.record(System.nanoTime() - statisticsRequest._processStartTimeStamp);
_requestTimeStats.record(System.nanoTime() - getNanoTimeStamp());
}
@Override
public void failed(Throwable x)
{
super.failed(x);
_processStats.decrement();
_requestStats.decrement();
_processTimeStats.record(System.nanoTime() - statisticsRequest._processStartTimeStamp);
_requestTimeStats.record(System.nanoTime() - getNanoTimeStamp());
}
});
Request.Processor processor = super.handle(statisticsRequest);
_handleTimeStats.record(System.nanoTime() - beginTimeStamp);
return processor == null ? null : statisticsRequest.wrapProcessor(processor);
return statisticsRequest.wrapProcessor(super.handle(statisticsRequest));
}
catch (Throwable t)
{
_responsesThrown.increment();
throw t;
}
finally
{
_handleTimeStats.record(System.nanoTime() - beginTimeStamp);
}
}
private class StatisticsRequest extends Request.WrapperProcessor
@ -175,9 +111,93 @@ public class StatisticsHandler extends Handler.Wrapper
@Override
public void process(Request ignored, Response response, Callback callback) throws Exception
{
_processStats.increment();
_processStartTimeStamp = System.nanoTime();
super.process(this, response, callback);
_processStats.increment();
_requestStats.increment();
_connectionStats.computeIfAbsent(getConnectionMetaData().getId(), id ->
{
// TODO test this with localconnector endpoint that has multiple requests per connection.
getConnectionMetaData().getConnection().addEventListener(new Connection.Listener()
{
@Override
public void onClosed(Connection connection)
{
// complete connections stats
_connectionStats.remove(id);
}
});
return "SomeConnectionStatsObject";
});
addHttpStreamWrapper(s -> new HttpStream.Wrapper(s)
{
@Override
public void send(MetaData.Request request, MetaData.Response response, boolean last, Callback callback, ByteBuffer... content)
{
if (response != null)
{
switch (response.getStatus() / 100)
{
case 1 -> _responses1xx.increment();
case 2 -> _responses2xx.increment();
case 3 -> _responses3xx.increment();
case 4 -> _responses4xx.increment();
case 5 -> _responses5xx.increment();
default ->
{
}
}
}
for (ByteBuffer b : content)
{
_bytesWritten.add(b.remaining());
}
super.send(request, response, last, callback, content);
}
@Override
public Content readContent()
{
Content content = super.readContent();
if (content != null)
_bytesRead.add(content.remaining());
return content;
}
@Override
public void succeeded()
{
super.succeeded();
_requestStats.decrement();
_requestTimeStats.record(System.nanoTime() - getNanoTimeStamp());
}
@Override
public void failed(Throwable x)
{
super.failed(x);
_requestStats.decrement();
_requestTimeStats.record(System.nanoTime() - getNanoTimeStamp());
}
});
try
{
super.process(this, response, callback);
}
catch (Throwable t)
{
_responsesThrown.increment();
throw t;
}
finally
{
_processStats.decrement();
_processTimeStats.record(System.nanoTime() - _processStartTimeStamp);
}
}
}
@ -229,6 +249,12 @@ public class StatisticsHandler extends Handler.Wrapper
return _responses5xx.intValue();
}
@ManagedAttribute("number of requests that threw an exception during handling or processing")
public int getResponsesThrown()
{
return _responsesThrown.intValue();
}
@ManagedAttribute("")
public int getHandlings()
{

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ -629,9 +630,9 @@ public class StatisticsHandlerTest
barrier[0].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
assertEquals(1, _statsHandler.getHandlings());
assertEquals(0, _statsHandler.getRequests());
assertEquals(0, _statsHandler.getRequestsActive());
assertEquals(0, _statsHandler.getProcessings());
assertEquals(0, _statsHandler.getProcessingsActive());
assertEquals(0, _statsHandler.getProcessingsMax());
@ -658,6 +659,43 @@ public class StatisticsHandlerTest
assertEquals(0, _statsHandler.getProcessingsActive());
assertEquals(1, _statsHandler.getProcessingsMax());
}
@Test
public void testThrownResponse() throws Exception
{
_statsHandler.setHandler(new Handler.Abstract(Invocable.InvocationType.BLOCKING)
{
@Override
public Request.Processor handle(Request request)
{
throw new IllegalStateException("expected");
}
});
_server.start();
try (StacklessLogging ignored = new StacklessLogging(Response.class))
{
String request = "GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
String response = _connector.getResponse(request);
assertThat(response, containsString("HTTP/1.1 500 Server Error"));
}
assertEquals(1, _statsHandler.getHandlings());
assertEquals(0, _statsHandler.getRequests());
assertEquals(0, _statsHandler.getRequestsActive());
assertEquals(0, _statsHandler.getRequestsActiveMax());
// We get no recorded status, but we get a recorded thrown response.
assertEquals(0, _statsHandler.getResponses1xx());
assertEquals(0, _statsHandler.getResponses2xx());
assertEquals(0, _statsHandler.getResponses3xx());
assertEquals(0, _statsHandler.getResponses4xx());
assertEquals(0, _statsHandler.getResponses5xx());
assertEquals(1, _statsHandler.getResponsesThrown());
}
//
// @Test
// public void waitForSuspendedRequestTest() throws Exception
@ -928,12 +966,12 @@ public class StatisticsHandlerTest
barrier[0].await();
assertEquals(1, _statistics.getConnections());
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
// assertEquals(1, _statsHandler.getDispatched());
// assertEquals(1, _statsHandler.getDispatchedActive());
barrier[1].await();
assertEquals(1, _statsHandler.getRequests());
assertEquals(1, _statsHandler.getRequestsActive());
barrier[2].await();
assertTrue(_latchHandler.await());
await().atMost(5, TimeUnit.SECONDS).until(_statsHandler::getRequestsActive, equalTo(0));