implements connection.getBytes[In|Out] for http transport #922 (#1965)

* implements connection.getBytes[In|Out] for http transport, #922

Signed-off-by: olivier lamy <olamy@webtide.com>

* use LongAdder rather than AtomicLong

Signed-off-by: olivier lamy <olamy@webtide.com>

* changes by Greg review

Signed-off-by: olivier lamy <olamy@webtide.com>

* changes by Greg review

Signed-off-by: olivier lamy <olamy@webtide.com>
This commit is contained in:
Olivier Lamy 2017-11-14 18:41:47 +11:00 committed by Greg Wilkins
parent 9fab69ea02
commit 786f128808
6 changed files with 91 additions and 4 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client.http; package org.eclipse.jetty.client.http;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
@ -39,6 +40,8 @@ public class HttpChannelOverHTTP extends HttpChannel
private final HttpConnectionOverHTTP connection; private final HttpConnectionOverHTTP connection;
private final HttpSenderOverHTTP sender; private final HttpSenderOverHTTP sender;
private final HttpReceiverOverHTTP receiver; private final HttpReceiverOverHTTP receiver;
private final LongAdder inMessages = new LongAdder();
private final LongAdder outMessages = new LongAdder();
public HttpChannelOverHTTP(HttpConnectionOverHTTP connection) public HttpChannelOverHTTP(HttpConnectionOverHTTP connection)
{ {
@ -80,7 +83,10 @@ public class HttpChannelOverHTTP extends HttpChannel
{ {
HttpExchange exchange = getHttpExchange(); HttpExchange exchange = getHttpExchange();
if (exchange != null) if (exchange != null)
sender.send(exchange); {
sender.send( exchange );
outMessages.increment();
}
} }
@Override @Override
@ -127,6 +133,7 @@ public class HttpChannelOverHTTP extends HttpChannel
public void receive() public void receive()
{ {
inMessages.increment();
receiver.receive(); receiver.receive();
} }
@ -180,6 +187,16 @@ public class HttpChannelOverHTTP extends HttpChannel
} }
} }
protected long getMessagesIn()
{
return inMessages.longValue();
}
protected long getMessagesOut()
{
return outMessages.longValue();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -23,6 +23,7 @@ import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpDestination;
@ -49,6 +50,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private final HttpChannelOverHTTP channel; private final HttpChannelOverHTTP channel;
private long idleTimeout; private long idleTimeout;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise) public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{ {
super(endPoint, destination.getHttpClient().getExecutor()); super(endPoint, destination.getHttpClient().getExecutor());
@ -72,6 +76,41 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return (HttpDestinationOverHTTP)delegate.getHttpDestination(); return (HttpDestinationOverHTTP)delegate.getHttpDestination();
} }
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
protected void addBytesIn(long bytesIn)
{
this.bytesIn.add( bytesIn );
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
protected void addBytesOut(long bytesOut)
{
this.bytesOut.add( bytesOut );
}
@Override
public long getMessagesIn()
{
return getHttpChannel().getMessagesIn();
}
@Override
public long getMessagesOut()
{
return getHttpChannel().getMessagesOut();
}
@Override @Override
public void send(Request request, Response.CompleteListener listener) public void send(Request request, Response.CompleteListener listener)
{ {

View File

@ -123,6 +123,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
return; return;
int read = endPoint.fill(buffer); int read = endPoint.fill(buffer);
connection.addBytesIn( read );
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint); LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint);

View File

@ -59,7 +59,7 @@ public class HttpSenderOverHTTP extends HttpSender
{ {
try try
{ {
new HeadersCallback(exchange, content, callback).iterate(); new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -191,17 +191,19 @@ public class HttpSenderOverHTTP extends HttpSender
private final HttpExchange exchange; private final HttpExchange exchange;
private final Callback callback; private final Callback callback;
private final MetaData.Request metaData; private final MetaData.Request metaData;
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
private ByteBuffer headerBuffer; private ByteBuffer headerBuffer;
private ByteBuffer chunkBuffer; private ByteBuffer chunkBuffer;
private ByteBuffer contentBuffer; private ByteBuffer contentBuffer;
private boolean lastContent; private boolean lastContent;
private boolean generated; private boolean generated;
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback) public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
{ {
super(false); super(false);
this.exchange = exchange; this.exchange = exchange;
this.callback = callback; this.callback = callback;
this.httpConnectionOverHTTP = httpConnectionOverHTTP;
HttpRequest request = exchange.getRequest(); HttpRequest request = exchange.getRequest();
ContentProvider requestContent = request.getContent(); ContentProvider requestContent = request.getContent();
@ -258,6 +260,11 @@ public class HttpSenderOverHTTP extends HttpSender
chunkBuffer = BufferUtil.EMPTY_BUFFER; chunkBuffer = BufferUtil.EMPTY_BUFFER;
if (contentBuffer == null) if (contentBuffer == null)
contentBuffer = BufferUtil.EMPTY_BUFFER; contentBuffer = BufferUtil.EMPTY_BUFFER;
httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer) //
+ BufferUtil.length(contentBuffer) //
+ BufferUtil.length(chunkBuffer));
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer); endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
generated = true; generated = true;
return Action.SCHEDULED; return Action.SCHEDULED;

View File

@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
@ -72,6 +74,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback(); private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback(); private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations; private final boolean _recordHttpComplianceViolations;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
/** /**
* Get the current connection that this thread is dispatched to. * Get the current connection that this thread is dispatched to.
@ -229,6 +233,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
// Fill the request buffer (if needed). // Fill the request buffer (if needed).
int filled = fillRequestBuffer(); int filled = fillRequestBuffer();
bytesIn.add( filled );
// Parse the request buffer. // Parse the request buffer.
boolean handle = parseRequestBuffer(); boolean handle = parseRequestBuffer();
@ -519,7 +524,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
if(_sendCallback.reset(info,head,content,lastContent,callback)) if(_sendCallback.reset(info,head,content,lastContent,callback))
{
_sendCallback.iterate(); _sendCallback.iterate();
}
} }
@ -565,6 +572,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_blockingReadCallback.failed(e); _blockingReadCallback.failed(e);
} }
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
@Override @Override
public String toConnectionString() public String toConnectionString()
{ {
@ -724,6 +743,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
case FLUSH: case FLUSH:
{ {
HttpConnection.this.bytesOut.add(BufferUtil.length(_header) //
+ BufferUtil.length(_content)
+ BufferUtil.length(chunk));
// Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
if (_head || _generator.isNoContent()) if (_head || _generator.isNoContent())
{ {
@ -759,6 +781,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
succeeded(); // nothing to write succeeded(); // nothing to write
} }
return Action.SCHEDULED; return Action.SCHEDULED;
} }
case SHUTDOWN_OUT: case SHUTDOWN_OUT:

View File

@ -47,7 +47,7 @@ public class ConnectionStatisticsTest extends AbstractTest
@Test @Test
public void testConnectionStatistics() throws Exception public void testConnectionStatistics() throws Exception
{ {
Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2C, Transport.H2)); Assume.assumeThat(transport, Matchers.isOneOf( Transport.HTTP, Transport.H2C, Transport.H2));
start(new AbstractHandler() start(new AbstractHandler()
{ {