Issue #1732 Connection Limit (#1745)

* Issue #1732 Connection Limit

Added a listener to stop accepting when a limit is reached

Update LowResourceMonitor to not accept in low resources
This commit is contained in:
Greg Wilkins 2017-09-05 15:21:51 +10:00 committed by GitHub
parent bc47942d17
commit dd20272c48
7 changed files with 390 additions and 31 deletions

View File

@ -523,8 +523,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (_key==null)
{
_key = _channel.register(_selector, SelectionKey.OP_ACCEPT, this);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} acceptor={}", this, _key);

View File

@ -0,0 +1,13 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.ConnectionLimit">
<Arg type="int"><Property name="jetty.connection.limit" default="1000"/></Arg>
<Arg><Ref refid="Server"/></Arg>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,14 @@
[description]
Enable a server wide connection limit
[tags]
connector
[depend]
server
[xml]
etc/jetty-connectionlimit.xml
[ini-template]
jetty.connection.limit=1000

View File

@ -0,0 +1,150 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Connection.Listener;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* A Connection Listener that limits the number of Connections.
* <p>This listener applies a limit to the number of connections, which when
* exceeded results in a call to {@link AbstractConnector#setAccepting(boolean)}
* to prevent further connections being received. It can be applied to an
* entire server or to a specific connector.
* <p>
* @see Connection.Listener
*/
@ManagedObject
public class ConnectionLimit extends AbstractLifeCycle implements Listener
{
private static final Logger LOG = Log.getLogger(ConnectionLimit.class);
private final Server _server;
private final List<AbstractConnector> _connectors = new ArrayList<>();
private int _connections;
private int _maxConnections;
private boolean _accepting = true;
public ConnectionLimit(int maxConnections, Server server)
{
_maxConnections = maxConnections;
_server = server;
}
public ConnectionLimit(int maxConnections, Connector...connectors)
{
_maxConnections = maxConnections;
_server = null;
for (Connector c: connectors)
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnection. Connections not limited",c);
}
}
@ManagedAttribute("The maximum number of connections allowed")
public synchronized int getMaxConnections()
{
return _maxConnections;
}
public synchronized void setMaxConnections(int max)
{
_maxConnections = max;
}
@ManagedAttribute("The current number of connections ")
public synchronized int getConnections()
{
return _connections;
}
@Override
protected synchronized void doStart() throws Exception
{
if (_server!=null)
{
for (Connector c: _server.getConnectors())
{
if (c instanceof AbstractConnector)
_connectors.add((AbstractConnector)c);
else
LOG.warn("Connector {} is not an AbstractConnection. Connections not limited",c);
}
}
if (LOG.isDebugEnabled())
LOG.debug("ConnectionLimit {} for {}",_maxConnections,_connectors);
_connections = 0;
_accepting = true;
for (AbstractConnector c : _connectors)
c.addBean(this);
}
@Override
protected synchronized void doStop() throws Exception
{
for (AbstractConnector c : _connectors)
c.removeBean(this);
_connections = 0;
if (_server!=null)
_connectors.clear();
}
@Override
public synchronized void onOpened(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {} < {} {}",_connections, _maxConnections, connection);
if ( ++_connections >= _maxConnections && _accepting)
{
_accepting = false;
LOG.info("Connection Limit({}) reached for {}",_maxConnections,_connectors);
for (AbstractConnector c : _connectors)
c.setAccepting(false);
}
}
@Override
public synchronized void onClosed(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("onClosed {} < {} {}",_connections, _maxConnections, connection);
if ( --_connections < _maxConnections && !_accepting)
{
_accepting = true;
LOG.info("Connection Limit({}) cleared for {}",_maxConnections,_connectors);
for (AbstractConnector c : _connectors)
c.setAccepting(true);
}
}
}

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.server;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -46,7 +48,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* Low resources can be detected by:
* <ul>
* <li>{@link ThreadPool#isLowOnThreads()} if {@link Connector#getExecutor()} is
* an instance of {@link ThreadPool} and {@link #setMonitorThreads(boolean)} is true.<li>
* an instance of {@link ThreadPool} and {@link #setMonitorThreads(boolean)} is true.</li>
* <li>If {@link #setMaxMemory(long)} is non zero then low resources is detected if the JVMs
* {@link Runtime} instance has {@link Runtime#totalMemory()} minus {@link Runtime#freeMemory()}
* greater than {@link #getMaxMemory()}</li>
@ -60,6 +62,9 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* resources state persists for more than {@link #getMaxLowResourcesTime()}, then the
* {@link #getLowResourcesIdleTimeout()} to all connections again. Once the low resources state is
* cleared, the idle timeout is reset to the connector default given by {@link Connector#getIdleTimeout()}.
* <p>
* If {@link #setAcceptingInLowResources(boolean)} is set to true, then no new connections are accepted
* when in low resources state.
*/
@ManagedObject ("Monitor for low resource conditions and activate a low resource mode if detected")
public class LowResourceMonitor extends AbstractLifeCycle
@ -68,6 +73,7 @@ public class LowResourceMonitor extends AbstractLifeCycle
private final Server _server;
private Scheduler _scheduler;
private Connector[] _monitoredConnectors;
private Set<AbstractConnector> _acceptingConnectors = new HashSet<>();
private int _period=1000;
private int _maxConnections;
private long _maxMemory;
@ -78,6 +84,7 @@ public class LowResourceMonitor extends AbstractLifeCycle
private String _cause;
private String _reasons;
private long _lowStarted;
private boolean _acceptingInLowResources;
private final Runnable _monitor = new Runnable()
{
@ -134,6 +141,17 @@ public class LowResourceMonitor extends AbstractLifeCycle
_monitoredConnectors = monitoredConnectors.toArray(new Connector[monitoredConnectors.size()]);
}
@ManagedAttribute("If false, new connections are not accepted while in low resources")
public boolean isAcceptingInLowResources()
{
return _acceptingInLowResources;
}
public void setAcceptingInLowResources(boolean acceptingInLowResources)
{
_acceptingInLowResources = acceptingInLowResources;
}
@ManagedAttribute("The monitor period in ms")
public int getPeriod()
{
@ -329,6 +347,15 @@ public class LowResourceMonitor extends AbstractLifeCycle
{
for(Connector connector : getMonitoredOrServerConnectors())
{
if (connector instanceof AbstractConnector)
{
AbstractConnector c = (AbstractConnector)connector;
if (c.isAccepting())
{
_acceptingConnectors.add(c);
c.setAccepting(false);
}
}
for (EndPoint endPoint : connector.getConnectedEndPoints())
endPoint.setIdleTimeout(_lowResourcesIdleTimeout);
}
@ -341,6 +368,12 @@ public class LowResourceMonitor extends AbstractLifeCycle
for (EndPoint endPoint : connector.getConnectedEndPoints())
endPoint.setIdleTimeout(connector.getIdleTimeout());
}
for (AbstractConnector connector : _acceptingConnectors)
{
connector.setAccepting(true);
}
_acceptingConnectors.clear();
}
private String low(String reasons, String newReason)

View File

@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -118,6 +119,51 @@ public class LowResourcesMonitorTest
}
@Test
public void testNotAccepting() throws Exception
{
_lowResourcesMonitor.setAcceptingInLowResources(false);
Thread.sleep(1200);
_threadPool.setMaxThreads(_threadPool.getThreads()-_threadPool.getIdleThreads()+10);
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
for (AbstractConnector c : _server.getBeans(AbstractConnector.class))
assertThat(c.isAccepting(),Matchers.is(true));
final CountDownLatch latch = new CountDownLatch(1);
for (int i=0;i<100;i++)
{
_threadPool.execute(new Runnable()
{
@Override
public void run()
{
try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
});
}
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
for (AbstractConnector c : _server.getBeans(AbstractConnector.class))
assertThat(c.isAccepting(),Matchers.is(false));
latch.countDown();
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
for (AbstractConnector c : _server.getBeans(AbstractConnector.class))
assertThat(c.isAccepting(),Matchers.is(true));
}
@Ignore ("not reliable")
@Test
public void testLowOnMemory() throws Exception
@ -155,18 +201,18 @@ public class LowResourcesMonitorTest
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Socket newSocket = new Socket("localhost",_connector.getLocalPort());
// wait for low idle time to close sockets, but not new Socket
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
try(Socket newSocket = new Socket("localhost",_connector.getLocalPort()))
{
// wait for low idle time to close sockets, but not new Socket
Thread.sleep(1200);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
for (int i=0;i<socket.length;i++)
Assert.assertEquals(-1,socket[i].getInputStream().read());
newSocket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals('H',newSocket.getInputStream().read());
for (int i=0;i<socket.length;i++)
Assert.assertEquals(-1,socket[i].getInputStream().read());
newSocket.getOutputStream().write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals('H',newSocket.getInputStream().read());
}
}
@Test
@ -175,26 +221,28 @@ public class LowResourcesMonitorTest
_lowResourcesMonitor.setMaxLowResourcesTime(2000);
Assert.assertFalse(_lowResourcesMonitor.isLowOnResources());
Socket socket0 = new Socket("localhost",_connector.getLocalPort());
_lowResourcesMonitor.setMaxMemory(1);
try(Socket socket0 = new Socket("localhost",_connector.getLocalPort()))
{
_lowResourcesMonitor.setMaxMemory(1);
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Socket socket1 = new Socket("localhost",_connector.getLocalPort());
try(Socket socket1 = new Socket("localhost",_connector.getLocalPort()))
{
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket0.getInputStream().read());
socket1.getOutputStream().write("G".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket0.getInputStream().read());
socket1.getOutputStream().write("G".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
socket1.getOutputStream().write("E".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket1.getInputStream().read());
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
socket1.getOutputStream().write("E".getBytes(StandardCharsets.UTF_8));
Thread.sleep(1200);
Assert.assertTrue(_lowResourcesMonitor.isLowOnResources());
Assert.assertEquals(-1,socket1.getInputStream().read());
}
}
}
}

View File

@ -278,4 +278,106 @@ public class NotAcceptingTest
}
}
@Test
public void testConnectionLimit() throws Exception
{
Server server = new Server();
server.addBean(new ConnectionLimit(9,server));
server.setHandler(new HelloHandler());
LocalConnector localConnector = new LocalConnector(server);
localConnector.setIdleTimeout(60000);
server.addConnector(localConnector);
ServerConnector blockingConnector = new ServerConnector(server,1,1);
blockingConnector.setPort(0);
blockingConnector.setIdleTimeout(60000);
blockingConnector.setAcceptQueueSize(10);
server.addConnector(blockingConnector);
ServerConnector asyncConnector = new ServerConnector(server,0,1);
asyncConnector.setPort(0);
asyncConnector.setIdleTimeout(60000);
asyncConnector.setAcceptQueueSize(10);
server.addConnector(asyncConnector);
server.start();
try (
LocalEndPoint local0 = localConnector.connect();
LocalEndPoint local1 = localConnector.connect();
LocalEndPoint local2 = localConnector.connect();
Socket blocking0 = new Socket("localhost",blockingConnector.getLocalPort());
Socket blocking1 = new Socket("localhost",blockingConnector.getLocalPort());
Socket blocking2 = new Socket("localhost",blockingConnector.getLocalPort());
Socket async0 = new Socket("localhost",asyncConnector.getLocalPort());
Socket async1 = new Socket("localhost",asyncConnector.getLocalPort());
Socket async2 = new Socket("localhost",asyncConnector.getLocalPort());
)
{
for (LocalEndPoint client: new LocalEndPoint[] {local0,local1,local2})
{
client.addInputAndExecute(BufferUtil.toBuffer("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n"));
HttpTester.Response response = HttpTester.parseResponse(client.getResponse());
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("Hello\n"));
}
for (Socket client : new Socket[]{blocking0,blocking1,blocking2,async0,async1,async2})
{
HttpTester.Input in = HttpTester.from(client.getInputStream());
client.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("Hello\n"));
}
assertThat(localConnector.isAccepting(),is(false));
assertThat(blockingConnector.isAccepting(),is(false));
assertThat(asyncConnector.isAccepting(),is(false));
{
// Close an async connection
HttpTester.Input in = HttpTester.from(async1.getInputStream());
async1.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\nConnection: close\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("Hello\n"));
}
// make a new connection and request
try (Socket blocking3 = new Socket("localhost",blockingConnector.getLocalPort());)
{
HttpTester.Input in = HttpTester.from(blocking3.getInputStream());
blocking3.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
HttpTester.Response response = HttpTester.parseResponse(in);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("Hello\n"));
}
}
Thread.sleep(500); // TODO avoid lame sleep ???
assertThat(localConnector.isAccepting(),is(true));
assertThat(blockingConnector.isAccepting(),is(true));
assertThat(asyncConnector.isAccepting(),is(true));
}
public static class HelloHandler extends AbstractHandler
{
public HelloHandler()
{
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println("Hello");
}
}
}