Fixes #5931 - SslConnection should implement getBytesIn()/getBytesOut(). (#6335)

* Fixes #5931 - SslConnection should implement getBytesIn()/getBytesOut().

Updated ConnectionStatistics to report both the stats of all connections,
and the stats grouped by connection class.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-06-03 11:57:10 +02:00 committed by GitHub
parent 121d8c27ef
commit f902d12fe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 388 additions and 82 deletions

View File

@ -43,10 +43,13 @@ import javax.net.ssl.SSLSocket;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection;
@ -1003,4 +1006,49 @@ public class HttpClientTLSTest
assertEquals(HttpStatus.OK_200, response3.getStatus());
}
}
@Test
public void testBytesInBytesOut() throws Exception
{
// Two connections will be closed: SslConnection and HttpConnection.
// Two on the server, two on the client.
CountDownLatch latch = new CountDownLatch(4);
SslContextFactory serverTLSFactory = createServerSslContextFactory();
startServer(serverTLSFactory, new EmptyServerHandler());
ConnectionStatistics serverStats = new ConnectionStatistics()
{
@Override
public void onClosed(Connection connection)
{
super.onClosed(connection);
latch.countDown();
}
};
connector.addManaged(serverStats);
SslContextFactory clientTLSFactory = createClientSslContextFactory();
startClient(clientTLSFactory);
ConnectionStatistics clientStats = new ConnectionStatistics()
{
@Override
public void onClosed(Connection connection)
{
super.onClosed(connection);
latch.countDown();
}
};
client.addManaged(clientStats);
ContentResponse response = client.newRequest("https://localhost:" + connector.getLocalPort())
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertThat(clientStats.getSentBytes(), Matchers.greaterThan(0L));
assertEquals(clientStats.getSentBytes(), serverStats.getReceivedBytes());
assertThat(clientStats.getReceivedBytes(), Matchers.greaterThan(0L));
assertEquals(clientStats.getReceivedBytes(), serverStats.getSentBytes());
}
}

View File

@ -18,6 +18,12 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jmx</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -19,6 +19,10 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -32,39 +36,27 @@ import org.eclipse.jetty.util.statistic.SampleStatistic;
/**
* <p>A {@link Connection.Listener} that tracks connection statistics.</p>
* <p>Adding an instance of this class as a bean to a server Connector
* (for the server) or to HttpClient (for the client) will trigger the
* tracking of the connection statistics for all connections managed
* by the server Connector or by HttpClient.</p>
* <p>Adding an instance of this class as a bean to a ServerConnector
* or ConnectionFactory (for the server) or to HttpClient (for the client)
* will trigger the tracking of the connection statistics for all
* connections managed by the server or by the client.</p>
* <p>The statistics for a connection are gathered when the connection
* is closed.</p>
* <p>ConnectionStatistics instances must be {@link #start() started}
* to collect statistics, either as part of starting the whole component
* tree, or explicitly if the component tree has already been started.</p>
*/
@ManagedObject("Tracks statistics on connections")
public class ConnectionStatistics extends AbstractLifeCycle implements Connection.Listener, Dumpable
{
private final CounterStatistic _connections = new CounterStatistic();
private final SampleStatistic _connectionsDuration = new SampleStatistic();
private final LongAdder _bytesIn = new LongAdder();
private final LongAdder _bytesOut = new LongAdder();
private final LongAdder _messagesIn = new LongAdder();
private final LongAdder _messagesOut = new LongAdder();
private final RateCounter _bytesInRate = new RateCounter();
private final RateCounter _bytesOutRate = new RateCounter();
private final RateCounter _messagesInRate = new RateCounter();
private final RateCounter _messagesOutRate = new RateCounter();
private final Stats _stats = new Stats("total");
private final Map<String, Stats> _statsMap = new ConcurrentHashMap<>();
@ManagedOperation(value = "Resets the statistics", impact = "ACTION")
public void reset()
{
_connections.reset();
_connectionsDuration.reset();
_bytesIn.reset();
_bytesOut.reset();
_messagesIn.reset();
_messagesOut.reset();
_bytesInRate.reset();
_bytesOutRate.reset();
_messagesInRate.reset();
_messagesOutRate.reset();
_stats.reset();
_statsMap.clear();
}
@Override
@ -78,8 +70,18 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio
{
if (!isStarted())
return;
onTotalOpened(connection);
onConnectionOpened(connection);
}
_connections.increment();
protected void onTotalOpened(Connection connection)
{
_stats.incrementCount();
}
protected void onConnectionOpened(Connection connection)
{
_statsMap.computeIfAbsent(connection.getClass().getName(), Stats::new).incrementCount();
}
@Override
@ -87,145 +89,136 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio
{
if (!isStarted())
return;
onTotalClosed(connection);
onConnectionClosed(connection);
}
_connections.decrement();
_connectionsDuration.record(System.currentTimeMillis() - connection.getCreatedTimeStamp());
protected void onTotalClosed(Connection connection)
{
onClosed(_stats, connection);
}
protected void onConnectionClosed(Connection connection)
{
Stats stats = _statsMap.get(connection.getClass().getName());
if (stats != null)
onClosed(stats, connection);
}
private void onClosed(Stats stats, Connection connection)
{
stats.decrementCount();
stats.recordDuration(System.currentTimeMillis() - connection.getCreatedTimeStamp());
long bytesIn = connection.getBytesIn();
if (bytesIn > 0)
{
_bytesIn.add(bytesIn);
_bytesInRate.add(bytesIn);
}
stats.recordBytesIn(bytesIn);
long bytesOut = connection.getBytesOut();
if (bytesOut > 0)
{
_bytesOut.add(bytesOut);
_bytesOutRate.add(bytesOut);
}
stats.recordBytesOut(bytesOut);
long messagesIn = connection.getMessagesIn();
if (messagesIn > 0)
{
_messagesIn.add(messagesIn);
_messagesInRate.add(messagesIn);
}
stats.recordMessagesIn(messagesIn);
long messagesOut = connection.getMessagesOut();
if (messagesOut > 0)
{
_messagesOut.add(messagesOut);
_messagesOutRate.add(messagesOut);
}
stats.recordMessagesOut(messagesOut);
}
@ManagedAttribute("Total number of bytes received by tracked connections")
public long getReceivedBytes()
{
return _bytesIn.sum();
return _stats.getReceivedBytes();
}
@ManagedAttribute("Total number of bytes received per second since the last invocation of this method")
public long getReceivedBytesRate()
{
long rate = _bytesInRate.getRate();
_bytesInRate.reset();
return rate;
return _stats.getReceivedBytesRate();
}
@ManagedAttribute("Total number of bytes sent by tracked connections")
public long getSentBytes()
{
return _bytesOut.sum();
return _stats.getSentBytes();
}
@ManagedAttribute("Total number of bytes sent per second since the last invocation of this method")
public long getSentBytesRate()
{
long rate = _bytesOutRate.getRate();
_bytesOutRate.reset();
return rate;
return _stats.getSentBytesRate();
}
@ManagedAttribute("The max duration of a connection in ms")
public long getConnectionDurationMax()
{
return _connectionsDuration.getMax();
return _stats.getConnectionDurationMax();
}
@ManagedAttribute("The mean duration of a connection in ms")
public double getConnectionDurationMean()
{
return _connectionsDuration.getMean();
return _stats.getConnectionDurationMean();
}
@ManagedAttribute("The standard deviation of the duration of a connection")
public double getConnectionDurationStdDev()
{
return _connectionsDuration.getStdDev();
return _stats.getConnectionDurationStdDev();
}
@ManagedAttribute("The total number of connections opened")
public long getConnectionsTotal()
{
return _connections.getTotal();
return _stats.getConnectionsTotal();
}
@ManagedAttribute("The current number of open connections")
public long getConnections()
{
return _connections.getCurrent();
return _stats.getConnections();
}
@ManagedAttribute("The max number of open connections")
public long getConnectionsMax()
{
return _connections.getMax();
return _stats.getConnectionsMax();
}
@ManagedAttribute("The total number of messages received")
public long getReceivedMessages()
{
return _messagesIn.sum();
return _stats.getReceivedMessages();
}
@ManagedAttribute("Total number of messages received per second since the last invocation of this method")
public long getReceivedMessagesRate()
{
long rate = _messagesInRate.getRate();
_messagesInRate.reset();
return rate;
return _stats.getReceivedMessagesRate();
}
@ManagedAttribute("The total number of messages sent")
public long getSentMessages()
{
return _messagesOut.sum();
return _stats.getSentMessages();
}
@ManagedAttribute("Total number of messages sent per second since the last invocation of this method")
public long getSentMessagesRate()
{
long rate = _messagesOutRate.getRate();
_messagesOutRate.reset();
return rate;
return _stats.getSentMessagesRate();
}
@Override
public String dump()
public Map<String, Stats> getConnectionStatisticsGroups()
{
return Dumpable.dump(this);
return _statsMap;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
String.format("connections=%s", _connections),
String.format("durations=%s", _connectionsDuration),
String.format("bytes in/out=%s/%s", getReceivedBytes(), getSentBytes()),
String.format("messages in/out=%s/%s", getReceivedMessages(), getSentMessages()));
List<Stats> children = new ArrayList<>();
children.add(_stats);
children.addAll(_statsMap.values());
Dumpable.dumpObjects(out, indent, this, children.toArray());
}
@Override
@ -233,4 +226,176 @@ public class ConnectionStatistics extends AbstractLifeCycle implements Connectio
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
public static class Stats implements Dumpable
{
private final CounterStatistic _connections = new CounterStatistic();
private final SampleStatistic _connectionsDuration = new SampleStatistic();
private final LongAdder _bytesIn = new LongAdder();
private final RateCounter _bytesInRate = new RateCounter();
private final LongAdder _bytesOut = new LongAdder();
private final RateCounter _bytesOutRate = new RateCounter();
private final LongAdder _messagesIn = new LongAdder();
private final RateCounter _messagesInRate = new RateCounter();
private final LongAdder _messagesOut = new LongAdder();
private final RateCounter _messagesOutRate = new RateCounter();
private final String _name;
public Stats(String name)
{
_name = name;
}
public void reset()
{
_connections.reset();
_connectionsDuration.reset();
_bytesIn.reset();
_bytesInRate.reset();
_bytesOut.reset();
_bytesOutRate.reset();
_messagesIn.reset();
_messagesInRate.reset();
_messagesOut.reset();
_messagesOutRate.reset();
}
public String getName()
{
return _name;
}
public long getReceivedBytes()
{
return _bytesIn.sum();
}
public long getReceivedBytesRate()
{
long rate = _bytesInRate.getRate();
_bytesInRate.reset();
return rate;
}
public long getSentBytes()
{
return _bytesOut.sum();
}
public long getSentBytesRate()
{
long rate = _bytesOutRate.getRate();
_bytesOutRate.reset();
return rate;
}
public long getConnectionDurationMax()
{
return _connectionsDuration.getMax();
}
public double getConnectionDurationMean()
{
return _connectionsDuration.getMean();
}
public double getConnectionDurationStdDev()
{
return _connectionsDuration.getStdDev();
}
public long getConnectionsTotal()
{
return _connections.getTotal();
}
public long getConnections()
{
return _connections.getCurrent();
}
public long getConnectionsMax()
{
return _connections.getMax();
}
public long getReceivedMessages()
{
return _messagesIn.sum();
}
public long getReceivedMessagesRate()
{
long rate = _messagesInRate.getRate();
_messagesInRate.reset();
return rate;
}
public long getSentMessages()
{
return _messagesOut.sum();
}
public long getSentMessagesRate()
{
long rate = _messagesOutRate.getRate();
_messagesOutRate.reset();
return rate;
}
public void incrementCount()
{
_connections.increment();
}
public void decrementCount()
{
_connections.decrement();
}
public void recordDuration(long duration)
{
_connectionsDuration.record(duration);
}
public void recordBytesIn(long bytesIn)
{
_bytesIn.add(bytesIn);
_bytesInRate.add(bytesIn);
}
public void recordBytesOut(long bytesOut)
{
_bytesOut.add(bytesOut);
_bytesOutRate.add(bytesOut);
}
public void recordMessagesIn(long messagesIn)
{
_messagesIn.add(messagesIn);
_messagesInRate.add(messagesIn);
}
public void recordMessagesOut(long messagesOut)
{
_messagesOut.add(messagesOut);
_messagesOutRate.add(messagesOut);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
String.format("connections=%s", _connections),
String.format("durations=%s", _connectionsDuration),
String.format("bytes in/out=%s/%s", getReceivedBytes(), getSentBytes()),
String.format("messages in/out=%s/%s", getReceivedMessages(), getSentMessages()));
}
@Override
public String toString()
{
return String.format("%s[%s]", getClass().getSimpleName(), getName());
}
}
}

View File

@ -0,0 +1,50 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io.jmx;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.jmx.ObjectMBean;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class ConnectionStatisticsMBean extends ObjectMBean
{
public ConnectionStatisticsMBean(Object object)
{
super(object);
}
@ManagedAttribute("ConnectionStatistics grouped by connection class")
public Collection<String> getConnectionStatisticsGroups()
{
ConnectionStatistics delegate = (ConnectionStatistics)getManagedObject();
Map<String, ConnectionStatistics.Stats> groups = delegate.getConnectionStatisticsGroups();
return groups.values().stream()
.sorted(Comparator.comparing(ConnectionStatistics.Stats::getName))
.map(stats -> stats.dump())
.map(dump -> dump.replaceAll("[\r\n]", " "))
.collect(Collectors.toList());
}
}

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToIntFunction;
import javax.net.ssl.SSLEngine;
@ -104,7 +105,10 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
WAIT_FOR_FILL // Waiting for a fill to happen
}
private final AtomicReference<HandshakeState> _handshake = new AtomicReference<>(HandshakeState.INITIAL);
private final List<SslHandshakeListener> handshakeListeners = new ArrayList<>();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
private final ByteBufferPool _bufferPool;
private final SSLEngine _sslEngine;
private final DecryptedEndPoint _decryptedEndPoint;
@ -119,7 +123,6 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
private boolean _requireCloseMessage;
private FlushState _flushState = FlushState.IDLE;
private FillState _fillState = FillState.IDLE;
private AtomicReference<HandshakeState> _handshake = new AtomicReference<>(HandshakeState.INITIAL);
private boolean _underflown;
private abstract class RunnableTask implements Runnable, Invocable
@ -198,6 +201,18 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
this._decryptedDirectBuffers = useDirectBuffersForDecryption;
}
@Override
public long getBytesIn()
{
return _bytesIn.get();
}
@Override
public long getBytesOut()
{
return _bytesOut.get();
}
public void addHandshakeListener(SslHandshakeListener listener)
{
handshakeListeners.add(listener);
@ -675,6 +690,8 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
// Let's try reading some encrypted data... even if we have some already.
int netFilled = networkFill(_encryptedInput);
if (netFilled > 0)
_bytesIn.addAndGet(netFilled);
if (LOG.isDebugEnabled())
LOG.debug("net filled={}", netFilled);
@ -997,8 +1014,19 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
}
// finish of any previous flushes
if (BufferUtil.hasContent(_encryptedOutput) && !networkFlush(_encryptedOutput))
return false;
if (_encryptedOutput != null)
{
int remaining = _encryptedOutput.remaining();
if (remaining > 0)
{
boolean flushed = networkFlush(_encryptedOutput);
int written = remaining - _encryptedOutput.remaining();
if (written > 0)
_bytesOut.addAndGet(written);
if (!flushed)
return false;
}
}
boolean isEmpty = BufferUtil.isEmpty(appOuts);
@ -1076,8 +1104,17 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
// if we have net bytes, let's try to flush them
boolean flushed = true;
if (BufferUtil.hasContent(_encryptedOutput))
flushed = networkFlush(_encryptedOutput);
if (_encryptedOutput != null)
{
int remaining = _encryptedOutput.remaining();
if (remaining > 0)
{
flushed = networkFlush(_encryptedOutput);
int written = remaining - _encryptedOutput.remaining();
if (written > 0)
_bytesOut.addAndGet(written);
}
}
if (LOG.isDebugEnabled())
LOG.debug("net flushed={}, ac={}", flushed, isEmpty);