Jetty 9.4.x 2501 accept listener (#2511)

* Issue #2501 - Accept Listener
* Issue #2501 - Include accepting connections in connection limit.
* AcceptRateLimit minimal delay

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2018-06-19 09:03:54 +02:00 committed by GitHub
parent 92ba1375be
commit 4f54447585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1010 additions and 76 deletions

View File

@ -45,6 +45,18 @@ tags: [connector]
Modules for tag 'connector':
----------------------------
Module: acceptratelimit
: Enable a server wide accept rate limit
Tags: connector
Depend: server
XML: etc/jetty-acceptratelimit.xml
Module: connectionlimit
: Enable a server wide connection limit
Tags: connector
Depend: server
XML: etc/jetty-connectionlimit.xml
Module: http
: Enables a HTTP connector on the server.
: By default HTTP/1 is support, but HTTP2C can
@ -52,7 +64,6 @@ Modules for tag 'connector':
Tags: connector, http
Depend: server
XML: etc/jetty-http.xml
Enabled: ${jetty.base}/start.ini
Module: http-forwarded
: Adds a forwarded request customizer to the HTTP Connector
@ -285,6 +296,20 @@ Making changes to the associated Jetty XML file for connectors is *not* recommen
If you do wish to edit Jetty XML, please see our section on managing link:#[Jetty Home and Jetty Base] to ensure your Jetty Home remains a standard of truth for your implementation.
____
==== Limiting Connections
Jetty also provides the means by which to limit connections to the server and/or contexts.
This is provided by two different modules in the distribution.
`connectionlimit`::
Applies a limit to the number of connections.
If this limit is exceeded, new connections are suspended for the time specified (in milliseconds).
`acceptratelimit`::
Limits the rate at which new connections are accepted.
If this limit is exceeded, new connections are suspended for the time specified (in milliseconds).
As with the modules listed above, these can be enabled by adding `--add-to-start=<module-name>` to the command line.
==== Advanced Configuration
Jetty primarily uses a single connector type called link:{JDURL}/org/eclipse/jetty/server/ServerConnector.html[ServerConnector].
@ -430,7 +455,7 @@ This example HttpConfiguration may be used by reference to the ID "`httpConfig`"
This same `httpConfig` is referenced by the link:{JDURL}/org/eclipse/jetty/server/handler/SecuredRedirectHandler.html[`SecuredRedirectHandler`] when redirecting secure requests.
Please note that if your `httpConfig` does not include a `secureScheme` or `securePort` or there is no `HttpConfiguration` present these types of secured requests will be returned a `403` error.
For SSL based connectors (in `jetty-https.xml` and `jetty-http2.xml`), the common "`httpConfig`" instance is used as the basis to create an SSL specific configuration with ID "`sslHttpConfig`":
For SSL-based connectors (in `jetty-https.xml` and `jetty-http2.xml`), the common "`httpConfig`" instance is used as the basis to create an SSL specific configuration with ID "`sslHttpConfig`":
[source, xml, subs="{sub-order}"]
----

View File

@ -40,7 +40,7 @@ public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final List<Listener> _listeners = new CopyOnWriteArrayList<>();
private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
@ -59,13 +59,13 @@ public abstract class AbstractConnection implements Connection
@Override
public void addListener(Listener listener)
{
listeners.add(listener);
_listeners.add(listener);
}
@Override
public void removeListener(Listener listener)
{
listeners.remove(listener);
_listeners.remove(listener);
}
public int getInputBufferSize()
@ -198,7 +198,7 @@ public abstract class AbstractConnection implements Connection
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", this);
for (Listener listener : listeners)
for (Listener listener : _listeners)
listener.onOpened(this);
}
@ -208,7 +208,7 @@ public abstract class AbstractConnection implements Connection
if (LOG.isDebugEnabled())
LOG.debug("onClose {}",this);
for (Listener listener : listeners)
for (Listener listener : _listeners)
listener.onClosed(this);
}

View File

@ -635,6 +635,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
this.channel = channel;
this.attachment = attachment;
_selectorManager.onAccepting(channel);
}
@Override
@ -655,6 +656,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
catch (Throwable x)
{
closeNoExceptions(channel);
_selectorManager.onAcceptFailed(channel,x);
LOG.debug(x);
}
}
@ -665,6 +667,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
try
{
createEndPoint(channel, key);
_selectorManager.onAccepted(channel);
}
catch (Throwable x)
{
@ -678,6 +681,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
closeNoExceptions(channel);
LOG.warn(String.valueOf(failure));
LOG.debug(failure);
_selectorManager.onAcceptFailed(channel,failure);
}
}

View File

@ -27,7 +27,10 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EventListener;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
@ -61,6 +64,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
private final ManagedSelector[] _selectors;
private final AtomicInteger _selectorIndex = new AtomicInteger();
private final IntUnaryOperator _selectorIndexUpdate;
private final List<AcceptListener> _acceptListeners = new ArrayList<>();
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private ThreadPoolBudget.Lease _lease;
@ -405,5 +409,106 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException;
public void addEventListener(EventListener listener)
{
if (isRunning())
throw new IllegalStateException(this.toString());
if (listener instanceof AcceptListener)
addAcceptListener(AcceptListener.class.cast(listener));
}
public void removeEventListener(EventListener listener)
{
if (isRunning())
throw new IllegalStateException(this.toString());
if (listener instanceof AcceptListener)
removeAcceptListener(AcceptListener.class.cast(listener));
}
public void addAcceptListener(AcceptListener listener)
{
if (!_acceptListeners.contains(listener))
_acceptListeners.add(listener);
}
public void removeAcceptListener(AcceptListener listener)
{
_acceptListeners.remove(listener);
}
protected void onAccepting(SelectableChannel channel)
{
for (AcceptListener l : _acceptListeners)
{
try
{
l.onAccepting(channel);
}
catch (Throwable x)
{
LOG.warn(x);
}
}
}
protected void onAcceptFailed(SelectableChannel channel, Throwable cause)
{
for (AcceptListener l : _acceptListeners)
{
try
{
l.onAcceptFailed(channel,cause);
}
catch (Throwable x)
{
LOG.warn(x);
}
}
}
protected void onAccepted(SelectableChannel channel)
{
for (AcceptListener l : _acceptListeners)
{
try
{
l.onAccepted(channel);
}
catch (Throwable x)
{
LOG.warn(x);
}
}
}
/**
* <p>A listener for accept events.</p>
* <p>This listener is called from either the selector or acceptor thread
* and implementations must be non blocking and fast.</p>
*/
public interface AcceptListener extends EventListener
{
/**
* Called immediately after a new SelectableChannel is accepted, but
* before it has been submitted to the {@link SelectorManager}.
* @param channel the accepted channel
*/
default void onAccepting(SelectableChannel channel) {}
/**
* Called if the processing of the accepted channel fails prior to calling
* {@link #onAccepted(SelectableChannel)}.
* @param channel the accepted channel
* @param cause the cause of the failure
*/
default void onAcceptFailed(SelectableChannel channel, Throwable cause) {}
/**
* Called after the accepted channel has been allocated an {@link EndPoint}
* and associated {@link Connection}, and after the onOpen notifications have
* been called on both endPoint and connection.
* @param channel the accepted channel
*/
default void onAccepted(SelectableChannel channel) {}
}
}

View File

@ -0,0 +1,16 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.AcceptRateLimit">
<Arg name="maxRate" type="int"><Property name="jetty.acceptratelimit.acceptRateLimit" default="1000" /></Arg>
<Arg name="period" type="long"><Property name="jetty.acceptratelimit.period" default="1000" /></Arg>
<Arg name="units"><Call class="java.util.concurrent.TimeUnit" name="valueOf"><Arg>
<Property name="jetty.acceptratelimit.units" default="MILLISECONDS" />
</Arg></Call></Arg>
<Arg name="server"><Ref refid="Server" /></Arg>
</New>
</Arg>
</Call>
</Configure>

View File

@ -1,12 +1,16 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.ConnectionLimit">
<Arg type="int"><Property name="jetty.connection.limit" default="1000"/></Arg>
<Arg><Ref refid="Server"/></Arg>
<Arg name= "maxConnections" type="int">
<Property name="jetty.connectionlimit.maxConnections" deprecated="jetty.connection.limit" default="1000" />
</Arg>
<Arg name="server">
<Ref refid="Server" />
</Arg>
<Set name="idleTimeout"><Property name="jetty.connectionlimit.idleTimeout" default="1000" /></Set>
</New>
</Arg>
</Call>

View File

@ -0,0 +1,23 @@
DO NOT EDIT - See: https://www.eclipse.org/jetty/documentation/current/startup-modules.html
[description]
Enable a server wide accept rate limit
[tags]
connector
[depend]
server
[xml]
etc/jetty-acceptratelimit.xml
[ini-template]
## The limit of accepted connections
#jetty.acceptratelimit.acceptRateLimit=1000
## The period over which the rate applies
#jetty.acceptratelimit.period=1000
# The unit of time for the period
#jetty.acceptratelimit.units=MILLISECONDS

View File

@ -13,4 +13,9 @@ server
etc/jetty-connectionlimit.xml
[ini-template]
jetty.connection.limit=1000
## The limit of connections to apply
#jetty.connectionlimit.maxConnections=1000
## The idle timeout to apply (in milliseconds) when connections are limited
#jetty.connectionlimit.idleTimeout=1000

View File

@ -0,0 +1,265 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.statistic.RateStatistic;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>A Listener that limits the rate at which new connections are accepted</p>
* <p>
* If the limits are exceeded, accepting is suspended until the rate is again below
* the limit, so incoming connections are held in the operating system accept
* queue (no syn ack sent), where they may either timeout or wait for the server
* to resume accepting.
* </p>
* <p>
* It can be applied to an entire server or to a specific connector by adding it
* via {@link Container#addBean(Object)}
* </p>
* <p>
* <b>Usage:</b>
* </p>
* <pre>
* Server server = new Server();
* server.addBean(new AcceptLimit(100,5,TimeUnit.SECONDS,server));
* ...
* server.start();
* </pre>
* @see SelectorManager.AcceptListener
*/
@ManagedObject
public class AcceptRateLimit extends AbstractLifeCycle implements SelectorManager.AcceptListener, Runnable
{
private static final Logger LOG = Log.getLogger(AcceptRateLimit.class);
private final Server _server;
private final List<AbstractConnector> _connectors = new ArrayList<>();
private final Rate _rate;
private final int _acceptRateLimit;
private boolean _limiting;
private Scheduler.Task _task;
public AcceptRateLimit(@Name("acceptRateLimit") int acceptRateLimit, @Name("period") long period, @Name("units") TimeUnit units, @Name("server") Server server)
{
_server = server;
_acceptRateLimit = acceptRateLimit;
_rate = new Rate(period,units);
}
public AcceptRateLimit(@Name("limit") int limit, @Name("period") long period, @Name("units") TimeUnit units, @Name("connectors") Connector...connectors)
{
this(limit, period, units, (Server)null);
for (Connector c: connectors)
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnector. Connections not limited",c);
}
}
@ManagedAttribute("The accept rate limit")
public int getAcceptRateLimit()
{
return _acceptRateLimit;
}
@ManagedAttribute("The accept rate period")
public long getPeriod()
{
return _rate.getPeriod();
}
@ManagedAttribute("The accept rate period units")
public TimeUnit getUnits()
{
return _rate.getUnits();
}
@ManagedAttribute("The current accept rate")
public int getRate()
{
return _rate.getRate();
}
@ManagedAttribute("The maximum accept rate achieved")
public long getMaxRate()
{
return _rate.getMax();
}
@ManagedOperation(value = "Resets the accept rate", impact = "ACTION")
public void reset()
{
synchronized (_rate)
{
_rate.reset();
if (_limiting)
{
_limiting = false;
unlimit();
}
}
}
protected void age(long period, TimeUnit units)
{
_rate.age(period,units);
}
@Override
protected void doStart() throws Exception
{
synchronized (_rate)
{
if (_server!=null)
{
for (Connector c: _server.getConnectors())
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnector. Connections not limited",c);
}
}
if (LOG.isDebugEnabled())
LOG.debug("AcceptLimit accept<{} rate<{} in {} for {}", _acceptRateLimit, _rate, _connectors);
for (AbstractConnector c : _connectors)
c.addBean(this);
}
}
@Override
protected void doStop() throws Exception
{
synchronized (_rate)
{
if (_task!=null)
_task.cancel();
_task = null;
for (AbstractConnector c : _connectors)
c.removeBean(this);
if (_server != null)
_connectors.clear();
_limiting = false;
}
}
protected void limit()
{
for (AbstractConnector c : _connectors)
c.setAccepting(false);
schedule();
}
protected void unlimit()
{
for (AbstractConnector c : _connectors)
c.setAccepting(true);
}
@Override
public void onAccepting(SelectableChannel channel)
{
synchronized (_rate)
{
int rate = _rate.record();
if (LOG.isDebugEnabled())
{
LOG.debug("onAccepting rate {}/{} for {} {}",rate,_acceptRateLimit,_rate,channel);
}
if (rate > _acceptRateLimit)
{
if (!_limiting)
{
_limiting = true;
LOG.warn("AcceptLimit rate exceeded {}>{} on {}",rate,_acceptRateLimit,_connectors);
limit();
}
}
}
}
private void schedule()
{
long oldest = _rate.getOldest(TimeUnit.MILLISECONDS);
long period = TimeUnit.MILLISECONDS.convert(_rate.getPeriod(),_rate.getUnits());
long delay = period-(oldest>0?oldest:0);
if (delay < 0)
delay = 0;
if (LOG.isDebugEnabled())
LOG.debug("schedule {} {}",delay,TimeUnit.MILLISECONDS);
_task = _connectors.get(0).getScheduler().schedule(this,delay,TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
synchronized (_rate)
{
_task = null;
if (!isRunning())
return;
int rate = _rate.getRate();
if (rate > _acceptRateLimit)
{
schedule();
return;
}
if (_limiting)
{
_limiting = false;
LOG.warn("AcceptLimit rate OK {}<={} on {}",rate,_acceptRateLimit,_connectors);
unlimit();
}
}
}
private final class Rate extends RateStatistic
{
private Rate(long period, TimeUnit units)
{
super(period,units);
}
@Override
protected void age(long period, TimeUnit units)
{
super.age(period,units);
}
}
}

View File

@ -18,47 +18,66 @@
package org.eclipse.jetty.server;
import java.nio.channels.SelectableChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Connection.Listener;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A Connection Listener that limits the number of Connections.</p>
* <p>A Listener that limits the number of Connections.</p>
* <p>This listener applies a limit to the number of connections, which when
* exceeded results in a call to {@link AbstractConnector#setAccepting(boolean)}
* to prevent further connections being received. It can be applied to an
* entire server or to a specific connector.
* entire server or to a specific connector by adding it via {@link Container#addBean(Object)}
* </p>
* <p>
* <b>Usage:</b>
* </p>
* <pre>
* Server server = new Server();
* server.addBean(new ConnectionLimit(5000,server));
* ...
* server.start();
* </pre>
* @see LowResourceMonitor
* @see Connection.Listener
* @see SelectorManager.AcceptListener
*/
@ManagedObject
public class ConnectionLimit extends AbstractLifeCycle implements Listener
public class ConnectionLimit extends AbstractLifeCycle implements Listener, SelectorManager.AcceptListener
{
private static final Logger LOG = Log.getLogger(ConnectionLimit.class);
private final Server _server;
private final List<AbstractConnector> _connectors = new ArrayList<>();
private final Set<SelectableChannel> _accepting = new HashSet<>();
private int _connections;
private int _maxConnections;
private boolean _accepting = true;
private long _idleTimeout;
private boolean _limiting = false;
public ConnectionLimit(int maxConnections, Server server)
public ConnectionLimit(@Name("maxConnections") int maxConnections, @Name("server") Server server)
{
_maxConnections = maxConnections;
_server = server;
}
public ConnectionLimit(int maxConnections, Connector...connectors)
public ConnectionLimit(@Name("maxConnections") int maxConnections, @Name("connectors") Connector...connectors)
{
_maxConnections = maxConnections;
_server = null;
this(maxConnections, (Server)null);
for (Connector c: connectors)
{
if (c instanceof AbstractConnector)
@ -67,84 +86,188 @@ public class ConnectionLimit extends AbstractLifeCycle implements Listener
LOG.warn("Connector {} is not an AbstractConnection. Connections not limited",c);
}
}
@ManagedAttribute("The maximum number of connections allowed")
public synchronized int getMaxConnections()
/**
* @return If &gt;= 0, the endpoint idle timeout in ms to apply when the connection limit is reached
*/
@ManagedAttribute("The endpoint idle timeout in ms to apply when the connection limit is reached")
public long getIdleTimeout()
{
return _maxConnections;
return _idleTimeout;
}
/**
* @param idleTimeout If &gt;= 0 the endpoint idle timeout in ms to apply when the connection limit is reached
*/
public void setIdleTimeout(long idleTimeout)
{
_idleTimeout = idleTimeout;
}
@ManagedAttribute("The maximum number of connections allowed")
public int getMaxConnections()
{
synchronized (this)
{
return _maxConnections;
}
}
public synchronized void setMaxConnections(int max)
public void setMaxConnections(int max)
{
_maxConnections = max;
synchronized (this)
{
_maxConnections = max;
}
}
@ManagedAttribute("The current number of connections ")
public synchronized int getConnections()
public int getConnections()
{
return _connections;
synchronized (this)
{
return _connections;
}
}
@Override
protected synchronized void doStart() throws Exception
protected void doStart() throws Exception
{
if (_server!=null)
synchronized (this)
{
for (Connector c: _server.getConnectors())
if (_server != null)
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnection. Connections not limited",c);
for (Connector c : _server.getConnectors())
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnector. Connections not limited",c);
}
}
if (LOG.isDebugEnabled())
LOG.debug("ConnectionLimit {} for {}",_maxConnections,_connectors);
_connections = 0;
_limiting = false;
for (AbstractConnector c : _connectors)
c.addBean(this);
}
}
@Override
protected void doStop() throws Exception
{
synchronized (this)
{
for (AbstractConnector c : _connectors)
c.removeBean(this);
_connections = 0;
if (_server != null)
_connectors.clear();
}
}
protected void check()
{
if ( (_accepting.size()+_connections) >= _maxConnections)
{
if (!_limiting)
{
_limiting = true;
LOG.info("Connection Limit({}) reached for {}",_maxConnections,_connectors);
limit();
}
}
else
{
if (_limiting)
{
_limiting = false;
LOG.info("Connection Limit({}) cleared for {}",_maxConnections,_connectors);
unlimit();
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("ConnectionLimit {} for {}",_maxConnections,_connectors);
_connections = 0;
_accepting = true;
protected void limit()
{
for (AbstractConnector c : _connectors)
c.addBean(this);
{
c.setAccepting(false);
if (_idleTimeout>0)
{
for (EndPoint endPoint : c.getConnectedEndPoints())
endPoint.setIdleTimeout(_idleTimeout);
}
}
}
protected void unlimit()
{
for (AbstractConnector c : _connectors)
{
c.setAccepting(true);
if (_idleTimeout>0)
{
for (EndPoint endPoint : c.getConnectedEndPoints())
endPoint.setIdleTimeout(c.getIdleTimeout());
}
}
}
@Override
public void onAccepting(SelectableChannel channel)
{
synchronized (this)
{
_accepting.add(channel);
if (LOG.isDebugEnabled())
LOG.debug("onAccepting ({}+{}) < {} {}",_accepting.size(),_connections,_maxConnections,channel);
check();
}
}
@Override
protected synchronized void doStop() throws Exception
public void onAcceptFailed(SelectableChannel channel, Throwable cause)
{
synchronized (this)
{
_accepting.remove(channel);
if (LOG.isDebugEnabled())
LOG.debug("onAcceptFailed ({}+{}) < {} {} {}",_accepting.size(),_connections,_maxConnections,channel,cause);
check();
}
}
@Override
public void onAccepted(SelectableChannel channel)
{
for (AbstractConnector c : _connectors)
c.removeBean(this);
_connections = 0;
if (_server!=null)
_connectors.clear();
}
@Override
public synchronized void onOpened(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {} < {} {}",_connections, _maxConnections, connection);
if ( ++_connections >= _maxConnections && _accepting)
{
_accepting = false;
LOG.info("Connection Limit({}) reached for {}",_maxConnections,_connectors);
for (AbstractConnector c : _connectors)
c.setAccepting(false);
public void onOpened(Connection connection)
{
synchronized (this)
{
_accepting.remove(connection.getEndPoint().getTransport());
_connections++;
if (LOG.isDebugEnabled())
LOG.debug("onOpened ({}+{}) < {} {}",_accepting.size(),_connections,_maxConnections,connection);
check();
}
}
@Override
public synchronized void onClosed(Connection connection)
public void onClosed(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("onClosed {} < {} {}",_connections, _maxConnections, connection);
if ( --_connections < _maxConnections && !_accepting)
synchronized (this)
{
_accepting = true;
LOG.info("Connection Limit({}) cleared for {}",_maxConnections,_connectors);
for (AbstractConnector c : _connectors)
c.setAccepting(true);
_connections--;
if (LOG.isDebugEnabled())
LOG.debug("onClosed ({}+{}) < {} {}",_accepting.size(),_connections,_maxConnections,connection);
check();
}
}
}

View File

@ -53,7 +53,8 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* {@link Runtime} instance has {@link Runtime#totalMemory()} minus {@link Runtime#freeMemory()}
* greater than {@link #getMaxMemory()}</li>
* <li>If {@link #setMaxConnections(int)} is non zero then low resources is dected if the total number
* of connections exceeds {@link #getMaxConnections()}</li>
* of connections exceeds {@link #getMaxConnections()}. This feature is deprecated and replaced by
* {@link ConnectionLimit}</li>
* </ul>
* <p>
* Once low resources state is detected, the cause is logged and all existing connections returned
@ -181,7 +182,12 @@ public class LowResourceMonitor extends AbstractLifeCycle
_monitorThreads = monitorThreads;
}
/**
* @return The maximum connections allowed for the monitored connectors before low resource handling is activated
* @deprecated Replaced by ConnectionLimit
*/
@ManagedAttribute("The maximum connections allowed for the monitored connectors before low resource handling is activated")
@Deprecated
public int getMaxConnections()
{
return _maxConnections;
@ -189,9 +195,13 @@ public class LowResourceMonitor extends AbstractLifeCycle
/**
* @param maxConnections The maximum connections before low resources state is triggered
* @deprecated Replaced by ConnectionLimit
*/
@Deprecated
public void setMaxConnections(int maxConnections)
{
if (maxConnections>0)
LOG.warn("LowResourceMonitor.setMaxConnections is deprecated. Use ConnectionLimit.");
_maxConnections = maxConnections;
}

View File

@ -29,6 +29,7 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.EventListener;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@ -228,6 +229,9 @@ public class ServerConnector extends AbstractNetworkConnector
@Override
protected void doStart() throws Exception
{
for (EventListener l: getBeans(EventListener.class))
_manager.addEventListener(l);
super.doStart();
if (getAcceptors()==0)
@ -236,6 +240,14 @@ public class ServerConnector extends AbstractNetworkConnector
_acceptor.set(_manager.acceptor(_acceptChannel));
}
}
@Override
protected void doStop() throws Exception
{
super.doStop();
for (EventListener l: getBeans(EventListener.class))
_manager.removeEventListener(l);
}
@Override
public boolean isOpen()

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
@ -304,6 +305,71 @@ public class NotAcceptingTest
}
}
@Test
public void testAcceptRateLimit() throws Exception
{
AcceptRateLimit limit = new AcceptRateLimit(4,1,TimeUnit.HOURS, server);
server.addBean(limit);
server.setHandler(new HelloHandler());
server.start();
try (
Socket async0 = new Socket("localhost",asyncConnector.getLocalPort());
Socket async1 = new Socket("localhost",asyncConnector.getLocalPort());
Socket async2 = new Socket("localhost",asyncConnector.getLocalPort());
)
{
String expectedContent = "Hello" + System.lineSeparator();
for (Socket client : new Socket[]{async2})
{
HttpTester.Input in = HttpTester.from(client.getInputStream());
client.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is(expectedContent));
}
assertThat(localConnector.isAccepting(),is(true));
assertThat(blockingConnector.isAccepting(),is(true));
assertThat(asyncConnector.isAccepting(),is(true));
}
limit.age(45,TimeUnit.MINUTES);
try (
Socket async0 = new Socket("localhost",asyncConnector.getLocalPort());
Socket async1 = new Socket("localhost",asyncConnector.getLocalPort());
)
{
String expectedContent = "Hello" + System.lineSeparator();
for (Socket client : new Socket[]{async1})
{
HttpTester.Input in = HttpTester.from(client.getInputStream());
client.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is(expectedContent));
}
assertThat(localConnector.isAccepting(),is(false));
assertThat(blockingConnector.isAccepting(),is(false));
assertThat(asyncConnector.isAccepting(),is(false));
}
limit.age(45,TimeUnit.MINUTES);
assertThat(localConnector.isAccepting(),is(false));
assertThat(blockingConnector.isAccepting(),is(false));
assertThat(asyncConnector.isAccepting(),is(false));
limit.run();
assertThat(localConnector.isAccepting(),is(true));
assertThat(blockingConnector.isAccepting(),is(true));
assertThat(asyncConnector.isAccepting(),is(true));
}
@Test
public void testConnectionLimit() throws Exception
{
@ -311,7 +377,8 @@ public class NotAcceptingTest
server.setHandler(new HelloHandler());
server.start();
Log.getLogger(ConnectionLimit.class).debug("CONNECT:");
try (
LocalEndPoint local0 = localConnector.connect();
LocalEndPoint local1 = localConnector.connect();
@ -326,6 +393,7 @@ public class NotAcceptingTest
{
String expectedContent = "Hello" + System.lineSeparator();
Log.getLogger(ConnectionLimit.class).debug("LOCAL:");
for (LocalEndPoint client: new LocalEndPoint[] {local0,local1,local2})
{
client.addInputAndExecute(BufferUtil.toBuffer("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n"));
@ -334,6 +402,7 @@ public class NotAcceptingTest
assertThat(response.getContent(),is(expectedContent));
}
Log.getLogger(ConnectionLimit.class).debug("NETWORK:");
for (Socket client : new Socket[]{blocking0,blocking1,blocking2,async0,async1,async2})
{
HttpTester.Input in = HttpTester.from(client.getInputStream());
@ -361,7 +430,7 @@ public class NotAcceptingTest
waitFor(blockingConnector::isAccepting,is(true),2*IDLE_TIMEOUT,TimeUnit.MILLISECONDS);
waitFor(asyncConnector::isAccepting,is(true),2*IDLE_TIMEOUT,TimeUnit.MILLISECONDS);
}
public static class HelloHandler extends AbstractHandler
{
public HelloHandler()
@ -404,8 +473,5 @@ public class NotAcceptingTest
catch(InterruptedException e)
{}
}
}
}

View File

@ -1,3 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.server.LEVEL=DEBUG
#org.eclipse.jetty.server.ConnectionLimit.LEVEL=DEBUG
#org.eclipse.jetty.server.AcceptRateLimit.LEVEL=DEBUG

View File

@ -0,0 +1,203 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.statistic;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* <p>Statistics on a time sequence rate.</p>
* <p>Calculates the rate at which the {@link #record()} method is called
* over the configured period, retaining also the total count and maximum
* rate achieved.</p>
* <p>The implementation keeps a Deque of timestamps for all records for
* the last time period, so this method is not suitable for large rates
* unless a small time period is used.</p>
*/
public class RateStatistic
{
private final Deque<Long> _samples = new ArrayDeque<>();
private final long _nanoPeriod;
private final TimeUnit _units;
private long _max;
private long _count;
public RateStatistic(long period, TimeUnit units)
{
_nanoPeriod = TimeUnit.NANOSECONDS.convert(period,units);
_units = units;
}
public long getPeriod()
{
return _units.convert(_nanoPeriod,TimeUnit.NANOSECONDS);
}
public TimeUnit getUnits()
{
return _units;
}
/**
* Resets the statistics.
*/
public void reset()
{
synchronized(this)
{
_samples.clear();
_max = 0;
_count = 0;
}
}
private void update()
{
update(System.nanoTime());
}
private void update(long now)
{
long expire = now - _nanoPeriod;
Long head = _samples.peekFirst();
while (head != null && head < expire)
{
_samples.removeFirst();
head = _samples.peekFirst();
}
}
protected void age(long period, TimeUnit units)
{
long increment = TimeUnit.NANOSECONDS.convert(period,units);
synchronized(this)
{
int size = _samples.size();
for (int i=0; i<size; i++)
_samples.addLast(_samples.removeFirst()-increment);
update();
}
}
/**
* Records a sample value.
* @return the number of records in the current period.
*/
public int record()
{
long now = System.nanoTime();
synchronized(this)
{
_count++;
_samples.add(now);
update(now);
int rate = _samples.size();
if (rate>_max)
_max = rate;
return rate;
}
}
/**
* @return the number of records in the current period
*/
public int getRate()
{
synchronized(this)
{
update();
return _samples.size();
}
}
/**
* @return the max number of samples per period.
*/
public long getMax()
{
synchronized(this)
{
return _max;
}
}
/**
* @param units the units of the return
* @return the age of the oldest sample in the requested units
*/
public long getOldest(TimeUnit units)
{
synchronized(this)
{
Long head = _samples.peekFirst();
if (head==null)
return -1;
return units.convert(System.nanoTime()-head,TimeUnit.NANOSECONDS);
}
}
/**
* @return the number of samples recorded
*/
public long getCount()
{
synchronized(this)
{
return _count;
}
}
public String dump()
{
return dump(TimeUnit.MINUTES);
}
public String dump(TimeUnit units)
{
long now = System.nanoTime();
synchronized(this)
{
String samples = _samples.stream()
.mapToLong(t -> units.convert(now - t, TimeUnit.NANOSECONDS))
.mapToObj(Long::toString)
.collect(Collectors.joining(System.lineSeparator()));
return String.format("%s%n%s", toString(now), samples);
}
}
@Override
public String toString()
{
return toString(System.nanoTime());
}
private String toString(long nanoTime)
{
synchronized(this)
{
update(nanoTime);
return String.format("%s@%x{count=%d,max=%d,rate=%d per %d %s}",
getClass().getSimpleName(), hashCode(),
_count, _max, _samples.size(),
_units.convert(_nanoPeriod,TimeUnit.NANOSECONDS), _units);
}
}
}

View File

@ -90,10 +90,7 @@ public class CounterStatisticTest
final CountDownLatch decBarrier = new CountDownLatch(N/2);
for (int i=N;i-->0;)
{
final int I = i;
{
threads[i]=(i>=N/2)
?new Thread()
{

View File

@ -0,0 +1,74 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.statistic;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
/* ------------------------------------------------------------ */
public class RateStatisticTest
{
@Test
public void testRate()
throws Exception
{
RateStatistic rs = new RateStatistic(1,TimeUnit.HOURS);
assertThat(rs.getCount(),equalTo(0L));
assertThat(rs.getRate(),equalTo(0));
assertThat(rs.getMax(),equalTo(0L));
rs.record();
assertThat(rs.getCount(),equalTo(1L));
assertThat(rs.getRate(),equalTo(1));
assertThat(rs.getMax(),equalTo(1L));
rs.age(35,TimeUnit.MINUTES);
assertThat(rs.getCount(),equalTo(1L));
assertThat(rs.getRate(),equalTo(1));
assertThat(rs.getMax(),equalTo(1L));
assertThat(rs.getOldest(TimeUnit.MINUTES),Matchers.is(35L));
rs.record();
assertThat(rs.getCount(),equalTo(2L));
assertThat(rs.getRate(),equalTo(2));
assertThat(rs.getMax(),equalTo(2L));
rs.age(35,TimeUnit.MINUTES);
assertThat(rs.getCount(),equalTo(2L));
assertThat(rs.getRate(),equalTo(1));
assertThat(rs.getMax(),equalTo(2L));
rs.record();
assertThat(rs.getCount(),equalTo(3L));
assertThat(rs.getRate(),equalTo(2));
assertThat(rs.getMax(),equalTo(2L));
rs.age(35,TimeUnit.MINUTES);
assertThat(rs.getCount(),equalTo(3L));
assertThat(rs.getRate(),equalTo(1));
assertThat(rs.getMax(),equalTo(2L));
}
}