diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index a2a3ffaaf8a..c202c9f860b 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -43,10 +43,13 @@ import javax.net.ssl.SSLSocket; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.io.ssl.SslConnection; @@ -1003,4 +1006,49 @@ public class HttpClientTLSTest assertEquals(HttpStatus.OK_200, response3.getStatus()); } } + + @Test + public void testBytesInBytesOut() throws Exception + { + // Two connections will be closed: SslConnection and HttpConnection. + // Two on the server, two on the client. + CountDownLatch latch = new CountDownLatch(4); + SslContextFactory serverTLSFactory = createServerSslContextFactory(); + startServer(serverTLSFactory, new EmptyServerHandler()); + ConnectionStatistics serverStats = new ConnectionStatistics() + { + @Override + public void onClosed(Connection connection) + { + super.onClosed(connection); + latch.countDown(); + } + }; + connector.addManaged(serverStats); + + SslContextFactory clientTLSFactory = createClientSslContextFactory(); + startClient(clientTLSFactory); + ConnectionStatistics clientStats = new ConnectionStatistics() + { + @Override + public void onClosed(Connection connection) + { + super.onClosed(connection); + latch.countDown(); + } + }; + client.addManaged(clientStats); + + ContentResponse response = client.newRequest("https://localhost:" + connector.getLocalPort()) + .header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + assertThat(clientStats.getSentBytes(), Matchers.greaterThan(0L)); + assertEquals(clientStats.getSentBytes(), serverStats.getReceivedBytes()); + assertThat(clientStats.getReceivedBytes(), Matchers.greaterThan(0L)); + assertEquals(clientStats.getReceivedBytes(), serverStats.getSentBytes()); + } } diff --git a/jetty-io/pom.xml b/jetty-io/pom.xml index e98c88a2079..fa2bd61c7a9 100644 --- a/jetty-io/pom.xml +++ b/jetty-io/pom.xml @@ -18,6 +18,12 @@ jetty-util ${project.version} + + org.eclipse.jetty + jetty-jmx + ${project.version} + true + org.eclipse.jetty.toolchain jetty-test-helper diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectionStatistics.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectionStatistics.java index 5271dcfd7e6..ef4d3469b8d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectionStatistics.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectionStatistics.java @@ -19,6 +19,10 @@ package org.eclipse.jetty.io; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -32,39 +36,27 @@ import org.eclipse.jetty.util.statistic.SampleStatistic; /** *

A {@link Connection.Listener} that tracks connection statistics.

- *

Adding an instance of this class as a bean to a server Connector - * (for the server) or to HttpClient (for the client) will trigger the - * tracking of the connection statistics for all connections managed - * by the server Connector or by HttpClient.

+ *

Adding an instance of this class as a bean to a ServerConnector + * or ConnectionFactory (for the server) or to HttpClient (for the client) + * will trigger the tracking of the connection statistics for all + * connections managed by the server or by the client.

+ *

The statistics for a connection are gathered when the connection + * is closed.

+ *

ConnectionStatistics instances must be {@link #start() started} + * to collect statistics, either as part of starting the whole component + * tree, or explicitly if the component tree has already been started.

*/ @ManagedObject("Tracks statistics on connections") public class ConnectionStatistics extends AbstractLifeCycle implements Connection.Listener, Dumpable { - private final CounterStatistic _connections = new CounterStatistic(); - private final SampleStatistic _connectionsDuration = new SampleStatistic(); - - private final LongAdder _bytesIn = new LongAdder(); - private final LongAdder _bytesOut = new LongAdder(); - private final LongAdder _messagesIn = new LongAdder(); - private final LongAdder _messagesOut = new LongAdder(); - private final RateCounter _bytesInRate = new RateCounter(); - private final RateCounter _bytesOutRate = new RateCounter(); - private final RateCounter _messagesInRate = new RateCounter(); - private final RateCounter _messagesOutRate = new RateCounter(); + private final Stats _stats = new Stats("total"); + private final Map _statsMap = new ConcurrentHashMap<>(); @ManagedOperation(value = "Resets the statistics", impact = "ACTION") public void reset() { - _connections.reset(); - _connectionsDuration.reset(); - _bytesIn.reset(); - _bytesOut.reset(); - _messagesIn.reset(); - _messagesOut.reset(); - _bytesInRate.reset(); - _bytesOutRate.reset(); - _messagesInRate.reset(); - _messagesOutRate.reset(); + _stats.reset(); + _statsMap.clear(); } @Override @@ -78,8 +70,18 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio { if (!isStarted()) return; + onTotalOpened(connection); + onConnectionOpened(connection); + } - _connections.increment(); + protected void onTotalOpened(Connection connection) + { + _stats.incrementCount(); + } + + protected void onConnectionOpened(Connection connection) + { + _statsMap.computeIfAbsent(connection.getClass().getName(), Stats::new).incrementCount(); } @Override @@ -87,145 +89,136 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio { if (!isStarted()) return; + onTotalClosed(connection); + onConnectionClosed(connection); + } - _connections.decrement(); - _connectionsDuration.record(System.currentTimeMillis() - connection.getCreatedTimeStamp()); + protected void onTotalClosed(Connection connection) + { + onClosed(_stats, connection); + } + protected void onConnectionClosed(Connection connection) + { + Stats stats = _statsMap.get(connection.getClass().getName()); + if (stats != null) + onClosed(stats, connection); + } + + private void onClosed(Stats stats, Connection connection) + { + stats.decrementCount(); + stats.recordDuration(System.currentTimeMillis() - connection.getCreatedTimeStamp()); long bytesIn = connection.getBytesIn(); if (bytesIn > 0) - { - _bytesIn.add(bytesIn); - _bytesInRate.add(bytesIn); - } - + stats.recordBytesIn(bytesIn); long bytesOut = connection.getBytesOut(); if (bytesOut > 0) - { - _bytesOut.add(bytesOut); - _bytesOutRate.add(bytesOut); - } - + stats.recordBytesOut(bytesOut); long messagesIn = connection.getMessagesIn(); if (messagesIn > 0) - { - _messagesIn.add(messagesIn); - _messagesInRate.add(messagesIn); - } - + stats.recordMessagesIn(messagesIn); long messagesOut = connection.getMessagesOut(); if (messagesOut > 0) - { - _messagesOut.add(messagesOut); - _messagesOutRate.add(messagesOut); - } + stats.recordMessagesOut(messagesOut); } @ManagedAttribute("Total number of bytes received by tracked connections") public long getReceivedBytes() { - return _bytesIn.sum(); + return _stats.getReceivedBytes(); } @ManagedAttribute("Total number of bytes received per second since the last invocation of this method") public long getReceivedBytesRate() { - long rate = _bytesInRate.getRate(); - _bytesInRate.reset(); - return rate; + return _stats.getReceivedBytesRate(); } @ManagedAttribute("Total number of bytes sent by tracked connections") public long getSentBytes() { - return _bytesOut.sum(); + return _stats.getSentBytes(); } @ManagedAttribute("Total number of bytes sent per second since the last invocation of this method") public long getSentBytesRate() { - long rate = _bytesOutRate.getRate(); - _bytesOutRate.reset(); - return rate; + return _stats.getSentBytesRate(); } @ManagedAttribute("The max duration of a connection in ms") public long getConnectionDurationMax() { - return _connectionsDuration.getMax(); + return _stats.getConnectionDurationMax(); } @ManagedAttribute("The mean duration of a connection in ms") public double getConnectionDurationMean() { - return _connectionsDuration.getMean(); + return _stats.getConnectionDurationMean(); } @ManagedAttribute("The standard deviation of the duration of a connection") public double getConnectionDurationStdDev() { - return _connectionsDuration.getStdDev(); + return _stats.getConnectionDurationStdDev(); } @ManagedAttribute("The total number of connections opened") public long getConnectionsTotal() { - return _connections.getTotal(); + return _stats.getConnectionsTotal(); } @ManagedAttribute("The current number of open connections") public long getConnections() { - return _connections.getCurrent(); + return _stats.getConnections(); } @ManagedAttribute("The max number of open connections") public long getConnectionsMax() { - return _connections.getMax(); + return _stats.getConnectionsMax(); } @ManagedAttribute("The total number of messages received") public long getReceivedMessages() { - return _messagesIn.sum(); + return _stats.getReceivedMessages(); } @ManagedAttribute("Total number of messages received per second since the last invocation of this method") public long getReceivedMessagesRate() { - long rate = _messagesInRate.getRate(); - _messagesInRate.reset(); - return rate; + return _stats.getReceivedMessagesRate(); } @ManagedAttribute("The total number of messages sent") public long getSentMessages() { - return _messagesOut.sum(); + return _stats.getSentMessages(); } @ManagedAttribute("Total number of messages sent per second since the last invocation of this method") public long getSentMessagesRate() { - long rate = _messagesOutRate.getRate(); - _messagesOutRate.reset(); - return rate; + return _stats.getSentMessagesRate(); } - @Override - public String dump() + public Map getConnectionStatisticsGroups() { - return Dumpable.dump(this); + return _statsMap; } @Override public void dump(Appendable out, String indent) throws IOException { - Dumpable.dumpObjects(out, indent, this, - String.format("connections=%s", _connections), - String.format("durations=%s", _connectionsDuration), - String.format("bytes in/out=%s/%s", getReceivedBytes(), getSentBytes()), - String.format("messages in/out=%s/%s", getReceivedMessages(), getSentMessages())); + List children = new ArrayList<>(); + children.add(_stats); + children.addAll(_statsMap.values()); + Dumpable.dumpObjects(out, indent, this, children.toArray()); } @Override @@ -233,4 +226,176 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio { return String.format("%s@%x", getClass().getSimpleName(), hashCode()); } + + public static class Stats implements Dumpable + { + private final CounterStatistic _connections = new CounterStatistic(); + private final SampleStatistic _connectionsDuration = new SampleStatistic(); + private final LongAdder _bytesIn = new LongAdder(); + private final RateCounter _bytesInRate = new RateCounter(); + private final LongAdder _bytesOut = new LongAdder(); + private final RateCounter _bytesOutRate = new RateCounter(); + private final LongAdder _messagesIn = new LongAdder(); + private final RateCounter _messagesInRate = new RateCounter(); + private final LongAdder _messagesOut = new LongAdder(); + private final RateCounter _messagesOutRate = new RateCounter(); + private final String _name; + + public Stats(String name) + { + _name = name; + } + + public void reset() + { + _connections.reset(); + _connectionsDuration.reset(); + _bytesIn.reset(); + _bytesInRate.reset(); + _bytesOut.reset(); + _bytesOutRate.reset(); + _messagesIn.reset(); + _messagesInRate.reset(); + _messagesOut.reset(); + _messagesOutRate.reset(); + } + + public String getName() + { + return _name; + } + + public long getReceivedBytes() + { + return _bytesIn.sum(); + } + + public long getReceivedBytesRate() + { + long rate = _bytesInRate.getRate(); + _bytesInRate.reset(); + return rate; + } + + public long getSentBytes() + { + return _bytesOut.sum(); + } + + public long getSentBytesRate() + { + long rate = _bytesOutRate.getRate(); + _bytesOutRate.reset(); + return rate; + } + + public long getConnectionDurationMax() + { + return _connectionsDuration.getMax(); + } + + public double getConnectionDurationMean() + { + return _connectionsDuration.getMean(); + } + + public double getConnectionDurationStdDev() + { + return _connectionsDuration.getStdDev(); + } + + public long getConnectionsTotal() + { + return _connections.getTotal(); + } + + public long getConnections() + { + return _connections.getCurrent(); + } + + public long getConnectionsMax() + { + return _connections.getMax(); + } + + public long getReceivedMessages() + { + return _messagesIn.sum(); + } + + public long getReceivedMessagesRate() + { + long rate = _messagesInRate.getRate(); + _messagesInRate.reset(); + return rate; + } + + public long getSentMessages() + { + return _messagesOut.sum(); + } + + public long getSentMessagesRate() + { + long rate = _messagesOutRate.getRate(); + _messagesOutRate.reset(); + return rate; + } + + public void incrementCount() + { + _connections.increment(); + } + + public void decrementCount() + { + _connections.decrement(); + } + + public void recordDuration(long duration) + { + _connectionsDuration.record(duration); + } + + public void recordBytesIn(long bytesIn) + { + _bytesIn.add(bytesIn); + _bytesInRate.add(bytesIn); + } + + public void recordBytesOut(long bytesOut) + { + _bytesOut.add(bytesOut); + _bytesOutRate.add(bytesOut); + } + + public void recordMessagesIn(long messagesIn) + { + _messagesIn.add(messagesIn); + _messagesInRate.add(messagesIn); + } + + public void recordMessagesOut(long messagesOut) + { + _messagesOut.add(messagesOut); + _messagesOutRate.add(messagesOut); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + Dumpable.dumpObjects(out, indent, this, + String.format("connections=%s", _connections), + String.format("durations=%s", _connectionsDuration), + String.format("bytes in/out=%s/%s", getReceivedBytes(), getSentBytes()), + String.format("messages in/out=%s/%s", getReceivedMessages(), getSentMessages())); + } + + @Override + public String toString() + { + return String.format("%s[%s]", getClass().getSimpleName(), getName()); + } + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/jmx/ConnectionStatisticsMBean.java b/jetty-io/src/main/java/org/eclipse/jetty/io/jmx/ConnectionStatisticsMBean.java new file mode 100644 index 00000000000..a2178b02bcd --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/jmx/ConnectionStatisticsMBean.java @@ -0,0 +1,50 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io.jmx; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.stream.Collectors; + +import org.eclipse.jetty.io.ConnectionStatistics; +import org.eclipse.jetty.jmx.ObjectMBean; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; + +@ManagedObject +public class ConnectionStatisticsMBean extends ObjectMBean +{ + public ConnectionStatisticsMBean(Object object) + { + super(object); + } + + @ManagedAttribute("ConnectionStatistics grouped by connection class") + public Collection getConnectionStatisticsGroups() + { + ConnectionStatistics delegate = (ConnectionStatistics)getManagedObject(); + Map groups = delegate.getConnectionStatisticsGroups(); + return groups.values().stream() + .sorted(Comparator.comparing(ConnectionStatistics.Stats::getName)) + .map(stats -> stats.dump()) + .map(dump -> dump.replaceAll("[\r\n]", " ")) + .collect(Collectors.toList()); + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 3b66b4d305f..ae38bb7685e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToIntFunction; import javax.net.ssl.SSLEngine; @@ -104,7 +105,10 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr WAIT_FOR_FILL // Waiting for a fill to happen } + private final AtomicReference _handshake = new AtomicReference<>(HandshakeState.INITIAL); private final List handshakeListeners = new ArrayList<>(); + private final AtomicLong _bytesIn = new AtomicLong(); + private final AtomicLong _bytesOut = new AtomicLong(); private final ByteBufferPool _bufferPool; private final SSLEngine _sslEngine; private final DecryptedEndPoint _decryptedEndPoint; @@ -119,7 +123,6 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr private boolean _requireCloseMessage; private FlushState _flushState = FlushState.IDLE; private FillState _fillState = FillState.IDLE; - private AtomicReference _handshake = new AtomicReference<>(HandshakeState.INITIAL); private boolean _underflown; private abstract class RunnableTask implements Runnable, Invocable @@ -198,6 +201,18 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr this._decryptedDirectBuffers = useDirectBuffersForDecryption; } + @Override + public long getBytesIn() + { + return _bytesIn.get(); + } + + @Override + public long getBytesOut() + { + return _bytesOut.get(); + } + public void addHandshakeListener(SslHandshakeListener listener) { handshakeListeners.add(listener); @@ -675,6 +690,8 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr // Let's try reading some encrypted data... even if we have some already. int netFilled = networkFill(_encryptedInput); + if (netFilled > 0) + _bytesIn.addAndGet(netFilled); if (LOG.isDebugEnabled()) LOG.debug("net filled={}", netFilled); @@ -997,8 +1014,19 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr } // finish of any previous flushes - if (BufferUtil.hasContent(_encryptedOutput) && !networkFlush(_encryptedOutput)) - return false; + if (_encryptedOutput != null) + { + int remaining = _encryptedOutput.remaining(); + if (remaining > 0) + { + boolean flushed = networkFlush(_encryptedOutput); + int written = remaining - _encryptedOutput.remaining(); + if (written > 0) + _bytesOut.addAndGet(written); + if (!flushed) + return false; + } + } boolean isEmpty = BufferUtil.isEmpty(appOuts); @@ -1076,8 +1104,17 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr // if we have net bytes, let's try to flush them boolean flushed = true; - if (BufferUtil.hasContent(_encryptedOutput)) - flushed = networkFlush(_encryptedOutput); + if (_encryptedOutput != null) + { + int remaining = _encryptedOutput.remaining(); + if (remaining > 0) + { + flushed = networkFlush(_encryptedOutput); + int written = remaining - _encryptedOutput.remaining(); + if (written > 0) + _bytesOut.addAndGet(written); + } + } if (LOG.isDebugEnabled()) LOG.debug("net flushed={}, ac={}", flushed, isEmpty);