Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-09-14 16:44:39 +02:00
commit 5ec60a30ed
35 changed files with 602 additions and 124 deletions

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.rewrite.handler.RewriteHandler; import org.eclipse.jetty.rewrite.handler.RewriteHandler;
import org.eclipse.jetty.security.HashLoginService; import org.eclipse.jetty.security.HashLoginService;
import org.eclipse.jetty.server.ConnectorStatistics;
import org.eclipse.jetty.server.DebugListener; import org.eclipse.jetty.server.DebugListener;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
@ -39,6 +38,7 @@ import org.eclipse.jetty.server.LowResourceMonitor;
import org.eclipse.jetty.server.NCSARequestLog; import org.eclipse.jetty.server.NCSARequestLog;
import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnectionStatistics;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.ContextHandlerCollection;
@ -193,7 +193,7 @@ public class LikeJettyXml
StatisticsHandler stats = new StatisticsHandler(); StatisticsHandler stats = new StatisticsHandler();
stats.setHandler(server.getHandler()); stats.setHandler(server.getHandler());
server.setHandler(stats); server.setHandler(stats);
ConnectorStatistics.addToAllConnectors(server); ServerConnectionStatistics.addToAllConnectors(server);
// === Rewrite Handler // === Rewrite Handler
RewriteHandler rewrite = new RewriteHandler(); RewriteHandler rewrite = new RewriteHandler();

View File

@ -21,8 +21,8 @@ package org.eclipse.jetty.embedded;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import org.eclipse.jetty.jmx.MBeanContainer; import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.ConnectorStatistics;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnectionStatistics;
import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
@ -44,7 +44,7 @@ public class OneServletContextJmxStats
context.addServlet(DefaultServlet.class, "/"); context.addServlet(DefaultServlet.class, "/");
// Add Connector Statistics tracking to all connectors // Add Connector Statistics tracking to all connectors
ConnectorStatistics.addToAllConnectors(server); ServerConnectionStatistics.addToAllConnectors(server);
server.start(); server.start();
server.join(); server.join();

View File

@ -564,18 +564,12 @@ public class HttpClient extends ContainerLifeCycle
{ {
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<Connection>(promise) context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise.Wrapper<Connection>(promise)
{ {
@Override
public void succeeded(Connection result)
{
getPromise().succeeded(result);
}
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
int nextIndex = index + 1; int nextIndex = index + 1;
if (nextIndex == socketAddresses.size()) if (nextIndex == socketAddresses.size())
getPromise().failed(x); super.failed(x);
else else
connect(socketAddresses, nextIndex, context); connect(socketAddresses, nextIndex, context);
} }

View File

@ -39,7 +39,6 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@ -88,6 +87,20 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
this.listener = listener; this.listener = listener;
} }
@Override
public long getMessagesIn()
{
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
return session.getStreamsOpened();
}
@Override
public long getMessagesOut()
{
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
return session.getStreamsClosed();
}
@Override @Override
public void onOpen() public void onOpen()
{ {

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.IStream;
@ -36,11 +38,38 @@ public class HTTP2ClientSession extends HTTP2Session
{ {
private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class); private static final Logger LOG = Log.getLogger(HTTP2ClientSession.class);
private final AtomicLong streamsOpened = new AtomicLong();
private final AtomicLong streamsClosed = new AtomicLong();
public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl) public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl)
{ {
super(scheduler, endPoint, generator, listener, flowControl, 1); super(scheduler, endPoint, generator, listener, flowControl, 1);
} }
@Override
protected void onStreamOpened(IStream stream)
{
super.onStreamOpened(stream);
streamsOpened.incrementAndGet();
}
@Override
protected void onStreamClosed(IStream stream)
{
super.onStreamClosed(stream);
streamsClosed.incrementAndGet();
}
public long getStreamsOpened()
{
return streamsOpened.get();
}
public long getStreamsClosed()
{
return streamsClosed.get();
}
@Override @Override
public void onHeaders(HeadersFrame frame) public void onHeaders(HeadersFrame frame)
{ {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
@ -43,11 +44,12 @@ public class HTTP2Connection extends AbstractConnection
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>(); private final Queue<Runnable> tasks = new ConcurrentArrayQueue<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool; private final ByteBufferPool byteBufferPool;
private final Parser parser; private final Parser parser;
private final ISession session; private final ISession session;
private final int bufferSize; private final int bufferSize;
private final HTTP2Producer producer = new HTTP2Producer();
private final ExecutionStrategy blockingStrategy; private final ExecutionStrategy blockingStrategy;
private final ExecutionStrategy nonBlockingStrategy; private final ExecutionStrategy nonBlockingStrategy;
@ -67,7 +69,6 @@ public class HTTP2Connection extends AbstractConnection
return session; return session;
} }
protected Parser getParser() protected Parser getParser()
{ {
return parser; return parser;
@ -95,7 +96,6 @@ public class HTTP2Connection extends AbstractConnection
super.onClose(); super.onClose();
} }
@Override @Override
public void onFillable() public void onFillable()
{ {
@ -220,6 +220,10 @@ public class HTTP2Connection extends AbstractConnection
session.onShutdown(); session.onShutdown();
return null; return null;
} }
else
{
bytesIn.addAndGet(filled);
}
looping = true; looping = true;
} }
@ -237,12 +241,6 @@ public class HTTP2Connection extends AbstractConnection
private class FillableCallback implements Callback private class FillableCallback implements Callback
{ {
@Override
public InvocationType getInvocationType()
{
return InvocationType.EITHER;
}
@Override @Override
public void succeeded() public void succeeded()
{ {
@ -257,5 +255,11 @@ public class HTTP2Connection extends AbstractConnection
{ {
onFillInterestedFailed(x); onFillInterestedFailed(x);
} }
@Override
public InvocationType getInvocationType()
{
return InvocationType.EITHER;
}
} }
} }

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
@ -75,6 +76,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED); private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicLong bytesWritten = new AtomicLong();
private final Scheduler scheduler; private final Scheduler scheduler;
private final EndPoint endPoint; private final EndPoint endPoint;
private final Generator generator; private final Generator generator;
@ -197,6 +199,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return generator; return generator;
} }
@Override
public long getBytesWritten()
{
return bytesWritten.get();
}
@Override @Override
public void onData(final DataFrame frame) public void onData(final DataFrame frame)
{ {
@ -752,6 +760,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
else else
remoteStreamCount.decrementAndGet(); remoteStreamCount.decrementAndGet();
onStreamClosed(stream);
flowControl.onStreamDestroyed(stream); flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -940,6 +950,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade"); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
} }
protected void onStreamOpened(IStream stream)
{
}
protected void onStreamClosed(IStream stream)
{
}
public void disconnect() public void disconnect()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -1096,6 +1114,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class ControlEntry extends HTTP2Flusher.Entry private class ControlEntry extends HTTP2Flusher.Entry
{ {
private long bytes;
private ControlEntry(Frame frame, IStream stream, Callback callback) private ControlEntry(Frame frame, IStream stream, Callback callback)
{ {
super(frame, stream, callback); super(frame, stream, callback);
@ -1103,7 +1123,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
protected boolean generate(ByteBufferPool.Lease lease) protected boolean generate(ByteBufferPool.Lease lease)
{ {
generator.control(lease, frame); bytes = generator.control(lease, frame);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame); LOG.debug("Generated {}", frame);
prepare(); prepare();
@ -1145,10 +1165,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void succeeded() public void succeeded()
{ {
bytesWritten.addAndGet(bytes);
switch (frame.getType()) switch (frame.getType())
{ {
case HEADERS: case HEADERS:
{ {
onStreamOpened(stream);
HeadersFrame headersFrame = (HeadersFrame)frame; HeadersFrame headersFrame = (HeadersFrame)frame;
if (stream.updateClose(headersFrame.isEndStream(), true)) if (stream.updateClose(headersFrame.isEndStream(), true))
removeStream(stream); removeStream(stream);
@ -1198,8 +1220,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class DataEntry extends HTTP2Flusher.Entry private class DataEntry extends HTTP2Flusher.Entry
{ {
private int remaining; private long bytes;
private int generated; private int dataRemaining;
private int dataWritten;
private DataEntry(DataFrame frame, IStream stream, Callback callback) private DataEntry(DataFrame frame, IStream stream, Callback callback)
{ {
@ -1209,35 +1232,37 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// of data frames that cannot be completely written due to // of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case // the flow control window exhausting, since in that case
// we would have to count the padding only once. // we would have to count the padding only once.
remaining = frame.remaining(); dataRemaining = frame.remaining();
} }
@Override @Override
public int dataRemaining() public int dataRemaining()
{ {
return remaining; return dataRemaining;
} }
protected boolean generate(ByteBufferPool.Lease lease) protected boolean generate(ByteBufferPool.Lease lease)
{ {
int toWrite = dataRemaining(); int dataRemaining = dataRemaining();
int sessionSendWindow = getSendWindow(); int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0); int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow); int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && toWrite > 0) if (window <= 0 && dataRemaining > 0)
return false; return false;
int length = Math.min(toWrite, window); int length = Math.min(dataRemaining, window);
int generated = generator.data(lease, (DataFrame)frame, length); // Only one DATA frame is generated.
bytes = generator.data(lease, (DataFrame)frame, length);
int written = (int)bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, generated, window, toWrite); LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining);
this.generated += generated; this.dataWritten = written;
this.remaining -= generated; this.dataRemaining -= written;
flowControl.onDataSending(stream, generated); flowControl.onDataSending(stream, written);
return true; return true;
} }
@ -1245,8 +1270,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void succeeded() public void succeeded()
{ {
flowControl.onDataSent(stream, generated); bytesWritten.addAndGet(bytes);
generated = 0; flowControl.onDataSent(stream, dataWritten);
// Do we have more to send ? // Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame; DataFrame dataFrame = (DataFrame)frame;
if (dataRemaining() == 0) if (dataRemaining() == 0)

View File

@ -127,4 +127,9 @@ public interface ISession extends Session
* @param frame the synthetic frame to process * @param frame the synthetic frame to process
*/ */
public void onFrame(Frame frame); public void onFrame(Frame frame);
/**
* @return the number of bytes written by this session
*/
public long getBytesWritten();
} }

View File

@ -24,6 +24,8 @@ import org.eclipse.jetty.http2.ErrorCode;
public class ResetFrame extends Frame public class ResetFrame extends Frame
{ {
public static final int RESET_LENGTH = 4;
private final int streamId; private final int streamId;
private final int error; private final int error;

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.http2.frames;
public class WindowUpdateFrame extends Frame public class WindowUpdateFrame extends Frame
{ {
public static final int WINDOW_UPDATE_LENGTH = 4;
private final int streamId; private final int streamId;
private final int windowDelta; private final int windowDelta;

View File

@ -36,12 +36,12 @@ public class DataGenerator
this.headerGenerator = headerGenerator; this.headerGenerator = headerGenerator;
} }
public int generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) public long generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{ {
return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength); return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
} }
public int generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength) public long generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
{ {
if (streamId < 0) if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId); throw new IllegalArgumentException("Invalid stream id: " + streamId);
@ -63,7 +63,7 @@ public class DataGenerator
data.limit(limit); data.limit(limit);
generateFrame(lease, streamId, slice, false); generateFrame(lease, streamId, slice, false);
} }
return length; return Frame.HEADER_LENGTH + length;
} }
private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last) private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last)
@ -75,10 +75,9 @@ public class DataGenerator
flags |= Flags.END_STREAM; flags |= Flags.END_STREAM;
ByteBuffer header = headerGenerator.generate(lease, FrameType.DATA, Frame.HEADER_LENGTH + length, length, flags, streamId); ByteBuffer header = headerGenerator.generate(lease, FrameType.DATA, Frame.HEADER_LENGTH + length, length, flags, streamId);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
// Skip empty data buffers.
if (data.remaining() > 0) if (data.remaining() > 0)
lease.append(data, false); lease.append(data, false);
} }

View File

@ -29,7 +29,8 @@ public class DisconnectGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
return 0;
} }
} }

View File

@ -33,7 +33,7 @@ public abstract class FrameGenerator
this.headerGenerator = headerGenerator; this.headerGenerator = headerGenerator;
} }
public abstract void generate(ByteBufferPool.Lease lease, Frame frame); public abstract long generate(ByteBufferPool.Lease lease, Frame frame);
protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId) protected ByteBuffer generateHeader(ByteBufferPool.Lease lease, FrameType frameType, int length, int flags, int streamId)
{ {

View File

@ -75,12 +75,12 @@ public class Generator
headerGenerator.setMaxFrameSize(maxFrameSize); headerGenerator.setMaxFrameSize(maxFrameSize);
} }
public void control(ByteBufferPool.Lease lease, Frame frame) public long control(ByteBufferPool.Lease lease, Frame frame)
{ {
generators[frame.getType().getType()].generate(lease, frame); return generators[frame.getType().getType()].generate(lease, frame);
} }
public int data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) public long data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{ {
return dataGenerator.generate(lease, frame, maxLength); return dataGenerator.generate(lease, frame, maxLength);
} }

View File

@ -36,13 +36,13 @@ public class GoAwayGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
GoAwayFrame goAwayFrame = (GoAwayFrame)frame; GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload()); return generateGoAway(lease, goAwayFrame.getLastStreamId(), goAwayFrame.getError(), goAwayFrame.getPayload());
} }
public void generateGoAway(ByteBufferPool.Lease lease, int lastStreamId, int error, byte[] payload) public long generateGoAway(ByteBufferPool.Lease lease, int lastStreamId, int error, byte[] payload)
{ {
if (lastStreamId < 0) if (lastStreamId < 0)
throw new IllegalArgumentException("Invalid last stream id: " + lastStreamId); throw new IllegalArgumentException("Invalid last stream id: " + lastStreamId);
@ -62,11 +62,11 @@ public class GoAwayGenerator extends FrameGenerator
header.putInt(error); header.putInt(error);
if (payload != null) if (payload != null)
{
header.put(payload); header.put(payload);
}
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + length;
} }
} }

View File

@ -50,13 +50,13 @@ public class HeadersGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
HeadersFrame headersFrame = (HeadersFrame)frame; HeadersFrame headersFrame = (HeadersFrame)frame;
generateHeaders(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), headersFrame.getPriority(), headersFrame.isEndStream()); return generateHeaders(lease, headersFrame.getStreamId(), headersFrame.getMetaData(), headersFrame.getPriority(), headersFrame.isEndStream());
} }
public void generateHeaders(ByteBufferPool.Lease lease, int streamId, MetaData metaData, PriorityFrame priority, boolean endStream) public long generateHeaders(ByteBufferPool.Lease lease, int streamId, MetaData metaData, PriorityFrame priority, boolean endStream)
{ {
if (streamId < 0) if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId); throw new IllegalArgumentException("Invalid stream id: " + streamId);
@ -87,10 +87,11 @@ public class HeadersGenerator extends FrameGenerator
generatePriority(header, priority); generatePriority(header, priority);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
hpacked.limit(maxHeaderBlockFragment); hpacked.limit(maxHeaderBlockFragment);
lease.append(hpacked.slice(), false); lease.append(hpacked.slice(), false);
int totalLength = Frame.HEADER_LENGTH + length;
int position = maxHeaderBlockFragment; int position = maxHeaderBlockFragment;
int limit = position + maxHeaderBlockFragment; int limit = position + maxHeaderBlockFragment;
while (limit < hpackedLength) while (limit < hpackedLength)
@ -102,6 +103,7 @@ public class HeadersGenerator extends FrameGenerator
lease.append(hpacked.slice(), false); lease.append(hpacked.slice(), false);
position += maxHeaderBlockFragment; position += maxHeaderBlockFragment;
limit += maxHeaderBlockFragment; limit += maxHeaderBlockFragment;
totalLength += Frame.HEADER_LENGTH + maxHeaderBlockFragment;
} }
hpacked.position(position).limit(hpackedLength); hpacked.position(position).limit(hpackedLength);
@ -109,6 +111,9 @@ public class HeadersGenerator extends FrameGenerator
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
lease.append(hpacked, true); lease.append(hpacked, true);
totalLength += Frame.HEADER_LENGTH + hpacked.remaining();
return totalLength;
} }
else else
{ {
@ -125,6 +130,8 @@ public class HeadersGenerator extends FrameGenerator
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
lease.append(hpacked, true); lease.append(hpacked, true);
return Frame.HEADER_LENGTH + length;
} }
} }

View File

@ -35,13 +35,13 @@ public class PingGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
PingFrame pingFrame = (PingFrame)frame; PingFrame pingFrame = (PingFrame)frame;
generatePing(lease, pingFrame.getPayload(), pingFrame.isReply()); return generatePing(lease, pingFrame.getPayload(), pingFrame.isReply());
} }
public void generatePing(ByteBufferPool.Lease lease, byte[] payload, boolean reply) public long generatePing(ByteBufferPool.Lease lease, byte[] payload, boolean reply)
{ {
if (payload.length != PingFrame.PING_LENGTH) if (payload.length != PingFrame.PING_LENGTH)
throw new IllegalArgumentException("Invalid payload length: " + payload.length); throw new IllegalArgumentException("Invalid payload length: " + payload.length);
@ -52,5 +52,7 @@ public class PingGenerator extends FrameGenerator
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + PingFrame.PING_LENGTH;
} }
} }

View File

@ -32,8 +32,9 @@ public class PrefaceGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
lease.append(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false); lease.append(ByteBuffer.wrap(PrefaceFrame.PREFACE_BYTES), false);
return PrefaceFrame.PREFACE_BYTES.length;
} }
} }

View File

@ -35,18 +35,19 @@ public class PriorityGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
PriorityFrame priorityFrame = (PriorityFrame)frame; PriorityFrame priorityFrame = (PriorityFrame)frame;
generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getParentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive()); return generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getParentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive());
} }
public void generatePriority(ByteBufferPool.Lease lease, int streamId, int parentStreamId, int weight, boolean exclusive) public long generatePriority(ByteBufferPool.Lease lease, int streamId, int parentStreamId, int weight, boolean exclusive)
{ {
ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, PriorityFrame.PRIORITY_LENGTH, Flags.NONE, streamId); ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, PriorityFrame.PRIORITY_LENGTH, Flags.NONE, streamId);
generatePriorityBody(header, streamId, parentStreamId, weight, exclusive); generatePriorityBody(header, streamId, parentStreamId, weight, exclusive);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + PriorityFrame.PRIORITY_LENGTH;
} }
public void generatePriorityBody(ByteBuffer header, int streamId, int parentStreamId, int weight, boolean exclusive) public void generatePriorityBody(ByteBuffer header, int streamId, int parentStreamId, int weight, boolean exclusive)

View File

@ -40,13 +40,13 @@ public class PushPromiseGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
PushPromiseFrame pushPromiseFrame = (PushPromiseFrame)frame; PushPromiseFrame pushPromiseFrame = (PushPromiseFrame)frame;
generatePushPromise(lease, pushPromiseFrame.getStreamId(), pushPromiseFrame.getPromisedStreamId(), pushPromiseFrame.getMetaData()); return generatePushPromise(lease, pushPromiseFrame.getStreamId(), pushPromiseFrame.getPromisedStreamId(), pushPromiseFrame.getMetaData());
} }
public void generatePushPromise(ByteBufferPool.Lease lease, int streamId, int promisedStreamId, MetaData metaData) public long generatePushPromise(ByteBufferPool.Lease lease, int streamId, int promisedStreamId, MetaData metaData)
{ {
if (streamId < 0) if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId); throw new IllegalArgumentException("Invalid stream id: " + streamId);
@ -73,5 +73,7 @@ public class PushPromiseGenerator extends FrameGenerator
lease.append(header, true); lease.append(header, true);
lease.append(hpacked, true); lease.append(hpacked, true);
return Frame.HEADER_LENGTH + length;
} }
} }

View File

@ -35,22 +35,22 @@ public class ResetGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
ResetFrame resetFrame = (ResetFrame)frame; ResetFrame resetFrame = (ResetFrame)frame;
generateReset(lease, resetFrame.getStreamId(), resetFrame.getError()); return generateReset(lease, resetFrame.getStreamId(), resetFrame.getError());
} }
public void generateReset(ByteBufferPool.Lease lease, int streamId, int error) public long generateReset(ByteBufferPool.Lease lease, int streamId, int error)
{ {
if (streamId < 0) if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId); throw new IllegalArgumentException("Invalid stream id: " + streamId);
ByteBuffer header = generateHeader(lease, FrameType.RST_STREAM, 4, Flags.NONE, streamId); ByteBuffer header = generateHeader(lease, FrameType.RST_STREAM, ResetFrame.RESET_LENGTH, Flags.NONE, streamId);
header.putInt(error); header.putInt(error);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + ResetFrame.RESET_LENGTH;
} }
} }

View File

@ -36,13 +36,13 @@ public class SettingsGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
SettingsFrame settingsFrame = (SettingsFrame)frame; SettingsFrame settingsFrame = (SettingsFrame)frame;
generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply()); return generateSettings(lease, settingsFrame.getSettings(), settingsFrame.isReply());
} }
public void generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply) public long generateSettings(ByteBufferPool.Lease lease, Map<Integer, Integer> settings, boolean reply)
{ {
// Two bytes for the identifier, four bytes for the value. // Two bytes for the identifier, four bytes for the value.
int entryLength = 2 + 4; int entryLength = 2 + 4;
@ -60,5 +60,7 @@ public class SettingsGenerator extends FrameGenerator
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + length;
} }
} }

View File

@ -35,20 +35,22 @@ public class WindowUpdateGenerator extends FrameGenerator
} }
@Override @Override
public void generate(ByteBufferPool.Lease lease, Frame frame) public long generate(ByteBufferPool.Lease lease, Frame frame)
{ {
WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame; WindowUpdateFrame windowUpdateFrame = (WindowUpdateFrame)frame;
generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta()); return generateWindowUpdate(lease, windowUpdateFrame.getStreamId(), windowUpdateFrame.getWindowDelta());
} }
public void generateWindowUpdate(ByteBufferPool.Lease lease, int streamId, int windowUpdate) public long generateWindowUpdate(ByteBufferPool.Lease lease, int streamId, int windowUpdate)
{ {
if (windowUpdate < 0) if (windowUpdate < 0)
throw new IllegalArgumentException("Invalid window update: " + windowUpdate); throw new IllegalArgumentException("Invalid window update: " + windowUpdate);
ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, 4, Flags.NONE, streamId); ByteBuffer header = generateHeader(lease, FrameType.WINDOW_UPDATE, WindowUpdateFrame.WINDOW_UPDATE_LENGTH, Flags.NONE, streamId);
header.putInt(windowUpdate); header.putInt(windowUpdate);
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
return Frame.HEADER_LENGTH + WindowUpdateFrame.WINDOW_UPDATE_LENGTH;
} }
} }

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
@ -55,11 +56,9 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{ {
/** /**
* @param protocol A HTTP2 protocol variant * @param protocol A HTTP2 protocol variant
* @return True if the protocol version is supported * @return True if the protocol version is supported
@ -85,9 +84,11 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
} }
private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>(); private final Queue<HttpChannelOverHTTP2> channels = new ConcurrentArrayQueue<>();
private final List<Frame> upgradeFrames = new ArrayList<>();
private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong totalResponses = new AtomicLong();
private final ServerSessionListener listener; private final ServerSessionListener listener;
private final HttpConfiguration httpConfig; private final HttpConfiguration httpConfig;
private final List<Frame> upgradeFrames = new ArrayList<>();
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{ {
@ -96,6 +97,18 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
this.httpConfig = httpConfig; this.httpConfig = httpConfig;
} }
@Override
public long getMessagesIn()
{
return totalRequests.intValue();
}
@Override
public long getMessagesOut()
{
return totalResponses.intValue();
}
@Override @Override
protected ServerParser getParser() protected ServerParser getParser()
{ {
@ -264,21 +277,29 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
} }
@Override @Override
public void recycle() public Runnable onRequest(HeadersFrame frame)
{ {
getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE); totalRequests.incrementAndGet();
super.recycle(); return super.onRequest(frame);
channels.offer(this);
} }
@Override @Override
public void onCompleted() public void onCompleted()
{ {
totalResponses.incrementAndGet();
super.onCompleted(); super.onCompleted();
if (!getStream().isReset()) if (!getStream().isReset())
recycle(); recycle();
} }
@Override
public void recycle()
{
getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE);
super.recycle();
channels.offer(this);
}
@Override @Override
public void close() public void close()
{ {

View File

@ -89,6 +89,7 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
IStream stream = createRemoteStream(frame.getStreamId()); IStream stream = createRemoteStream(frame.getStreamId());
if (stream != null) if (stream != null)
{ {
onStreamOpened(stream);
stream.process(frame, Callback.NOOP); stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame); Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener); stream.setListener(listener);

View File

@ -85,19 +85,15 @@ public abstract class AbstractConnection implements Connection
protected void failedCallback(final Callback callback, final Throwable x) protected void failedCallback(final Callback callback, final Throwable x)
{ {
Runnable failCallback = new Runnable() Runnable failCallback = () ->
{ {
@Override try
public void run()
{ {
try callback.failed(x);
{ }
callback.failed(x); catch (Exception e)
} {
catch (Exception e) LOG.warn(e);
{
LOG.warn(e);
}
} }
}; };
@ -223,13 +219,13 @@ public abstract class AbstractConnection implements Connection
} }
@Override @Override
public int getMessagesIn() public long getMessagesIn()
{ {
return -1; return -1;
} }
@Override @Override
public int getMessagesOut() public long getMessagesOut()
{ {
return -1; return -1;
} }

View File

@ -60,7 +60,7 @@ public interface Connection extends Closeable
public void onClose(); public void onClose();
/** /**
* @return the {@link EndPoint} associated with this {@link Connection} * @return the {@link EndPoint} associated with this Connection.
*/ */
public EndPoint getEndPoint(); public EndPoint getEndPoint();
@ -86,8 +86,8 @@ public interface Connection extends Closeable
*/ */
public boolean onIdleExpired(); public boolean onIdleExpired();
public int getMessagesIn(); public long getMessagesIn();
public int getMessagesOut(); public long getMessagesOut();
public long getBytesIn(); public long getBytesIn();
public long getBytesOut(); public long getBytesOut();
public long getCreatedTimeStamp(); public long getCreatedTimeStamp();

View File

@ -0,0 +1,233 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.statistic.CounterStatistic;
import org.eclipse.jetty.util.statistic.SampleStatistic;
/**
* <p>A {@link Connection.Listener} that tracks connection statistics.</p>
* <p>Adding an instance of this class as a bean to a server Connector
* (for the server) or to HttpClient (for the client) will trigger the
* tracking of the connection statistics for all connections managed
* by the server Connector or by HttpClient.</p>
*/
@ManagedObject("Tracks statistics on connections")
public class ConnectionStatistics extends AbstractLifeCycle implements Connection.Listener, Dumpable
{
private final CounterStatistic _connections = new CounterStatistic();
private final SampleStatistic _connectionsDuration = new SampleStatistic();
private final LongAdder _rcvdBytes = new LongAdder();
private final AtomicLong _bytesInStamp = new AtomicLong();
private final LongAdder _sentBytes = new LongAdder();
private final AtomicLong _bytesOutStamp = new AtomicLong();
private final LongAdder _messagesIn = new LongAdder();
private final AtomicLong _messagesInStamp = new AtomicLong();
private final LongAdder _messagesOut = new LongAdder();
private final AtomicLong _messagesOutStamp = new AtomicLong();
@ManagedOperation(value = "Resets the statistics", impact = "ACTION")
public void reset()
{
_connections.reset();
_connectionsDuration.reset();
_rcvdBytes.reset();
_bytesInStamp.set(System.nanoTime());
_sentBytes.reset();
_bytesOutStamp.set(System.nanoTime());
_messagesIn.reset();
_messagesInStamp.set(System.nanoTime());
_messagesOut.reset();
_messagesOutStamp.set(System.nanoTime());
}
@Override
protected void doStart() throws Exception
{
reset();
}
@Override
public void onOpened(Connection connection)
{
if (!isStarted())
return;
_connections.increment();
}
@Override
public void onClosed(Connection connection)
{
if (!isStarted())
return;
_connections.decrement();
long elapsed = System.currentTimeMillis() - connection.getCreatedTimeStamp();
_connectionsDuration.set(elapsed);
long bytesIn = connection.getBytesIn();
if (bytesIn > 0)
_rcvdBytes.add(bytesIn);
long bytesOut = connection.getBytesOut();
if (bytesOut > 0)
_sentBytes.add(bytesOut);
long messagesIn = connection.getMessagesIn();
if (messagesIn > 0)
_messagesIn.add(messagesIn);
long messagesOut = connection.getMessagesOut();
if (messagesOut > 0)
_messagesOut.add(messagesOut);
}
@ManagedAttribute("Total number of bytes received by tracked connections")
public long getReceivedBytes()
{
return _rcvdBytes.sum();
}
@ManagedAttribute("Total number of bytes received per second since the last invocation of this method")
public long getReceivedBytesRate()
{
long now = System.nanoTime();
long then = _bytesInStamp.getAndSet(now);
long elapsed = TimeUnit.NANOSECONDS.toMillis(now - then);
return elapsed == 0 ? 0 : getReceivedBytes() * 1000 / elapsed;
}
@ManagedAttribute("Total number of bytes sent by tracked connections")
public long getSentBytes()
{
return _sentBytes.sum();
}
@ManagedAttribute("Total number of bytes sent per second since the last invocation of this method")
public long getSentBytesRate()
{
long now = System.nanoTime();
long then = _bytesOutStamp.getAndSet(now);
long elapsed = TimeUnit.NANOSECONDS.toMillis(now - then);
return elapsed == 0 ? 0 : getSentBytes() * 1000 / elapsed;
}
@ManagedAttribute("The max duration of a connection in ms")
public long getConnectionDurationMax()
{
return _connectionsDuration.getMax();
}
@ManagedAttribute("The mean duration of a connection in ms")
public double getConnectionDurationMean()
{
return _connectionsDuration.getMean();
}
@ManagedAttribute("The standard deviation of the duration of a connection")
public double getConnectionDurationStdDev()
{
return _connectionsDuration.getStdDev();
}
@ManagedAttribute("The total number of connections opened")
public long getConnectionsTotal()
{
return _connections.getTotal();
}
@ManagedAttribute("The current number of open connections")
public long getConnections()
{
return _connections.getCurrent();
}
@ManagedAttribute("The max number of open connections")
public long getConnectionsMax()
{
return _connections.getMax();
}
@ManagedAttribute("The total number of messages received")
public long getReceivedMessages()
{
return _messagesIn.sum();
}
@ManagedAttribute("Total number of messages received per second since the last invocation of this method")
public long getReceivedMessagesRate()
{
long now = System.nanoTime();
long then = _messagesInStamp.getAndSet(now);
long elapsed = TimeUnit.NANOSECONDS.toMillis(now - then);
return elapsed == 0 ? 0 : getReceivedMessages() * 1000 / elapsed;
}
@ManagedAttribute("The total number of messages sent")
public long getSentMessages()
{
return _messagesOut.sum();
}
@ManagedAttribute("Total number of messages sent per second since the last invocation of this method")
public long getSentMessagesRate()
{
long now = System.nanoTime();
long then = _messagesOutStamp.getAndSet(now);
long elapsed = TimeUnit.NANOSECONDS.toMillis(now - then);
return elapsed == 0 ? 0 : getSentMessages() * 1000 / elapsed;
}
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, this);
List<String> children = new ArrayList<>();
children.add(String.format("connections=%s", _connections));
children.add(String.format("durations=%s", _connectionsDuration));
children.add(String.format("bytes in/out=%s/%s", getReceivedBytes(), getSentBytes()));
children.add(String.format("messages in/out=%s/%s", getReceivedMessages(), getSentMessages()));
ContainerLifeCycle.dump(out, indent, children);
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
}

View File

@ -12,7 +12,7 @@
</New> </New>
</Arg> </Arg>
</Call> </Call>
<Call class="org.eclipse.jetty.server.ConnectorStatistics" name="addToAllConnectors"> <Call class="org.eclipse.jetty.server.ServerConnectionStatistics" name="addToAllConnectors">
<Arg><Ref refid="Server"/></Arg> <Arg><Ref refid="Server"/></Arg>
</Call> </Call>
</Configure> </Configure>

View File

@ -43,7 +43,10 @@ import org.eclipse.jetty.util.statistic.SampleStatistic;
/** A Connector.Listener that gathers Connector and Connections Statistics. /** A Connector.Listener that gathers Connector and Connections Statistics.
* Adding an instance of this class as with {@link AbstractConnector#addBean(Object)} * Adding an instance of this class as with {@link AbstractConnector#addBean(Object)}
* will register the listener with all connections accepted by that connector. * will register the listener with all connections accepted by that connector.
*
* @deprecated use {@link ServerConnectionStatistics} instead.
*/ */
@Deprecated
@ManagedObject("Connector Statistics") @ManagedObject("Connector Statistics")
public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable, Connection.Listener public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable, Connection.Listener
{ {
@ -75,8 +78,8 @@ public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable,
{ {
if (isStarted()) if (isStarted())
{ {
int msgsIn=connection.getMessagesIn(); long msgsIn=connection.getMessagesIn();
int msgsOut=connection.getMessagesOut(); long msgsOut=connection.getMessagesOut();
_messagesIn.set(msgsIn); _messagesIn.set(msgsIn);
_messagesOut.set(msgsOut); _messagesOut.set(msgsOut);
_connectionStats.decrement(); _connectionStats.decrement();
@ -302,7 +305,7 @@ public class ConnectorStatistics extends AbstractLifeCycle implements Dumpable,
_messagesOut=connection.getMessagesOut(); _messagesOut=connection.getMessagesOut();
} }
final int _messagesIn; final long _messagesIn;
final int _messagesOut; final long _messagesOut;
} }
} }

View File

@ -24,8 +24,7 @@ import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
@ -68,7 +67,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{ {
private static final Logger LOG = Log.getLogger(HttpChannel.class); private static final Logger LOG = Log.getLogger(HttpChannel.class);
private final AtomicBoolean _committed = new AtomicBoolean(); private final AtomicBoolean _committed = new AtomicBoolean();
private final AtomicInteger _requests = new AtomicInteger(); private final AtomicLong _requests = new AtomicLong();
private final Connector _connector; private final Connector _connector;
private final Executor _executor; private final Executor _executor;
private final HttpConfiguration _configuration; private final HttpConfiguration _configuration;
@ -125,7 +124,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
/** /**
* @return the number of requests handled by this connection * @return the number of requests handled by this connection
*/ */
public int getRequests() public long getRequests()
{ {
return _requests.get(); return _requests.get();
} }
@ -266,8 +265,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
handle(); handle();
} }
AtomicReference<Action> caller = new AtomicReference<>();
/** /**
* @return True if the channel is ready to continue handling (ie it is not suspended) * @return True if the channel is ready to continue handling (ie it is not suspended)
*/ */
@ -355,7 +352,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{ {
_response.reset(); _response.reset();
Integer icode = (Integer)_request.getAttribute(ERROR_STATUS_CODE); Integer icode = (Integer)_request.getAttribute(ERROR_STATUS_CODE);
int code = icode!=null?icode.intValue():HttpStatus.INTERNAL_SERVER_ERROR_500; int code = icode != null ? icode : HttpStatus.INTERNAL_SERVER_ERROR_500;
_response.setStatus(code); _response.setStatus(code);
_request.setAttribute(ERROR_STATUS_CODE,code); _request.setAttribute(ERROR_STATUS_CODE,code);
if (icode==null) if (icode==null)
@ -508,7 +505,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// Minimal response // Minimal response
Integer code=(Integer)_request.getAttribute(ERROR_STATUS_CODE); Integer code=(Integer)_request.getAttribute(ERROR_STATUS_CODE);
_response.reset(); _response.reset();
_response.setStatus(code==null?500:code.intValue()); _response.setStatus(code == null ? 500 : code);
_response.flushBuffer(); _response.flushBuffer();
} }
} }

View File

@ -44,7 +44,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/** /**
* <p>A {@link Connection} that handles the HTTP protocol.</p> * <p>A {@link Connection} that handles the HTTP protocol.</p>
@ -170,13 +169,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
@Override @Override
public int getMessagesIn() public long getMessagesIn()
{ {
return getHttpChannel().getRequests(); return getHttpChannel().getRequests();
} }
@Override @Override
public int getMessagesOut() public long getMessagesOut()
{ {
return getHttpChannel().getRequests(); return getHttpChannel().getRequests();
} }

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.util.component.Container;
public class ServerConnectionStatistics extends ConnectionStatistics
{
public static void addToAllConnectors(Server server)
{
for (Connector connector : server.getConnectors())
{
if (connector instanceof Container)
((Container)connector).addBean(new ConnectionStatistics());
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -55,11 +56,6 @@ public interface Promise<C>
*/ */
class Adapter<U> implements Promise<U> class Adapter<U> implements Promise<U>
{ {
@Override
public void succeeded(U result)
{
}
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
@ -119,13 +115,25 @@ public interface Promise<C>
} }
} }
public static abstract class Wrapper<W> implements Promise<W> class Wrapper<W> implements Promise<W>
{ {
private final Promise<W> promise; private final Promise<W> promise;
public Wrapper(Promise<W> promise) public Wrapper(Promise<W> promise)
{ {
this.promise = promise; this.promise = Objects.requireNonNull(promise);
}
@Override
public void succeeded(W result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
} }
public Promise<W> getPromise() public Promise<W> getPromise()

View File

@ -0,0 +1,94 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class ConnectionStatisticsTest extends AbstractTest
{
public ConnectionStatisticsTest(Transport transport)
{
super(transport);
}
@Test
public void testConnectionStatistics() throws Exception
{
Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2C, Transport.H2));
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
ConnectionStatistics serverStats = new ConnectionStatistics();
connector.addBean(serverStats);
serverStats.start();
ConnectionStatistics clientStats = new ConnectionStatistics();
client.addBean(clientStats);
clientStats.start();
byte[] content = new byte[3072];
long contentLength = content.length;
ContentResponse response = client.newRequest(newURI())
.content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
// Close all connections.
stop();
Assert.assertThat(serverStats.getConnectionsMax(), Matchers.greaterThan(0L));
Assert.assertThat(serverStats.getReceivedBytes(), Matchers.greaterThan(contentLength));
Assert.assertThat(serverStats.getSentBytes(), Matchers.greaterThan(contentLength));
Assert.assertThat(serverStats.getReceivedMessages(), Matchers.greaterThan(0L));
Assert.assertThat(serverStats.getSentMessages(), Matchers.greaterThan(0L));
Assert.assertThat(clientStats.getConnectionsMax(), Matchers.greaterThan(0L));
Assert.assertThat(clientStats.getReceivedBytes(), Matchers.greaterThan(contentLength));
Assert.assertThat(clientStats.getSentBytes(), Matchers.greaterThan(contentLength));
Assert.assertThat(clientStats.getReceivedMessages(), Matchers.greaterThan(0L));
Assert.assertThat(clientStats.getSentMessages(), Matchers.greaterThan(0L));
}
}