Issue #1732 Stop accepting new connections

This commit is contained in:
Greg Wilkins 2017-08-12 10:26:17 +10:00
parent 829fa4fe9b
commit 5197ce4f54
7 changed files with 456 additions and 75 deletions

View File

@ -281,10 +281,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (task != null) if (task != null)
return task; return task;
} }
else if (key.isAcceptable())
{
processAccept(key);
}
else else
{ {
throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps()); throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
@ -386,27 +382,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
private void processAccept(SelectionKey key)
{
SelectableChannel server = key.channel();
SelectableChannel channel = null;
try
{
while(true)
{
channel = _selectorManager.doAccept(server);
if (channel==null)
break;
_selectorManager.accepted(channel);
}
}
catch (Throwable x)
{
closeNoExceptions(channel);
LOG.warn("Accept failed for channel " + channel, x);
}
}
private void closeNoExceptions(Closeable closeable) private void closeNoExceptions(Closeable closeable)
{ {
try try
@ -530,9 +505,10 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
} }
} }
class Acceptor extends NonBlockingAction class Acceptor extends NonBlockingAction implements Selectable, Closeable
{ {
private final SelectableChannel _channel; private final SelectableChannel _channel;
private SelectionKey _key;
public Acceptor(SelectableChannel channel) public Acceptor(SelectableChannel channel)
{ {
@ -544,9 +520,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{ {
try try
{ {
SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, "Acceptor"); if (_key==null)
{
_key = _channel.register(_selector, SelectionKey.OP_ACCEPT, this);
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} acceptor={}", this, key); LOG.debug("{} acceptor={}", this, _key);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -554,6 +535,44 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.warn(x); LOG.warn(x);
} }
} }
@Override
public Runnable onSelected()
{
SelectableChannel server = _key.channel();
SelectableChannel channel = null;
try
{
while(true)
{
channel = _selectorManager.doAccept(server);
if (channel==null)
break;
_selectorManager.accepted(channel);
}
}
catch (Throwable x)
{
closeNoExceptions(channel);
LOG.warn("Accept failed for channel " + channel, x);
}
return null;
}
@Override
public void updateKey()
{
}
@Override
public void close() throws IOException
{
SelectionKey key = _key;
_key = null;
if (key!=null && key.isValid())
key.cancel();
}
} }
class Accept extends NonBlockingAction implements Closeable class Accept extends NonBlockingAction implements Closeable

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
@ -267,11 +268,14 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
* overridden by a derivation of this class to handle the accepted channel * overridden by a derivation of this class to handle the accepted channel
* *
* @param server the server channel to register * @param server the server channel to register
* @return A Closable that allows the acceptor to be cancelled
*/ */
public void acceptor(SelectableChannel server) public Closeable acceptor(SelectableChannel server)
{ {
final ManagedSelector selector = chooseSelector(null); final ManagedSelector selector = chooseSelector(null);
selector.submit(selector.new Acceptor(server)); ManagedSelector.Acceptor acceptor = selector.new Acceptor(server);
selector.submit(acceptor);
return acceptor;
} }
/** /**
@ -435,4 +439,5 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/ */
public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException; public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException;
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -34,6 +35,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -48,6 +50,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -137,8 +140,10 @@ import org.eclipse.jetty.util.thread.Scheduler;
public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable public abstract class AbstractConnector extends ContainerLifeCycle implements Connector, Dumpable
{ {
protected final Logger LOG = Log.getLogger(AbstractConnector.class); protected final Logger LOG = Log.getLogger(AbstractConnector.class);
// Order is important on server side, so we use a LinkedHashMap
private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); private final Locker _locker = new Locker();
private final Condition _setAccepting = _locker.newCondition();
private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); // Order is important on server side, so we use a LinkedHashMap
private final Server _server; private final Server _server;
private final Executor _executor; private final Executor _executor;
private final Scheduler _scheduler; private final Scheduler _scheduler;
@ -146,12 +151,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
private final Thread[] _acceptors; private final Thread[] _acceptors;
private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints); private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
private volatile CountDownLatch _stopping; private CountDownLatch _stopping;
private long _idleTimeout = 30000; private long _idleTimeout = 30000;
private String _defaultProtocol; private String _defaultProtocol;
private ConnectionFactory _defaultConnectionFactory; private ConnectionFactory _defaultConnectionFactory;
private String _name; private String _name;
private int _acceptorPriorityDelta=-2; private int _acceptorPriorityDelta=-2;
private boolean _accepting = true;
/** /**
@ -283,7 +289,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
protected void interruptAcceptors() protected void interruptAcceptors()
{ {
synchronized (this) try (Locker.Lock lock = _locker.lockIfNotHeld())
{ {
for (Thread thread : _acceptors) for (Thread thread : _acceptors)
{ {
@ -327,7 +333,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void join(long timeout) throws InterruptedException public void join(long timeout) throws InterruptedException
{ {
synchronized (this) try (Locker.Lock lock = _locker.lock())
{ {
for (Thread thread : _acceptors) for (Thread thread : _acceptors)
if (thread != null) if (thread != null)
@ -338,19 +344,31 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
protected abstract void accept(int acceptorID) throws IOException, InterruptedException; protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
/* ------------------------------------------------------------ */
/** /**
* @return Is the connector accepting new connections * @return Is the connector accepting new connections
*/ */
protected boolean isAccepting() public boolean isAccepting()
{ {
return isRunning(); try (Locker.Lock lock = _locker.lock())
{
return _accepting;
}
} }
public void setAccepting(boolean accepting)
{
try (Locker.Lock lock = _locker.lock())
{
_accepting=accepting;
_setAccepting.signalAll();
}
}
@Override @Override
public ConnectionFactory getConnectionFactory(String protocol) public ConnectionFactory getConnectionFactory(String protocol)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
return _factories.get(StringUtil.asciiToLowerCase(protocol)); return _factories.get(StringUtil.asciiToLowerCase(protocol));
} }
@ -359,7 +377,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
@Override @Override
public <T> T getConnectionFactory(Class<T> factoryType) public <T> T getConnectionFactory(Class<T> factoryType)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
for (ConnectionFactory f : _factories.values()) for (ConnectionFactory f : _factories.values())
if (factoryType.isAssignableFrom(f.getClass())) if (factoryType.isAssignableFrom(f.getClass()))
@ -370,7 +388,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void addConnectionFactory(ConnectionFactory factory) public void addConnectionFactory(ConnectionFactory factory)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lockIfNotHeld())
{ {
Set<ConnectionFactory> to_remove = new HashSet<>(); Set<ConnectionFactory> to_remove = new HashSet<>();
for (String key:factory.getProtocols()) for (String key:factory.getProtocols())
@ -409,7 +427,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void addFirstConnectionFactory(ConnectionFactory factory) public void addFirstConnectionFactory(ConnectionFactory factory)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
List<ConnectionFactory> existings = new ArrayList<>(_factories.values()); List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
_factories.clear(); _factories.clear();
@ -422,7 +440,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void addIfAbsentConnectionFactory(ConnectionFactory factory) public void addIfAbsentConnectionFactory(ConnectionFactory factory)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
String key=StringUtil.asciiToLowerCase(factory.getProtocol()); String key=StringUtil.asciiToLowerCase(factory.getProtocol());
if (_factories.containsKey(key)) if (_factories.containsKey(key))
@ -444,7 +462,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public ConnectionFactory removeConnectionFactory(String protocol) public ConnectionFactory removeConnectionFactory(String protocol)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol)); ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol));
removeBean(factory); removeBean(factory);
@ -455,7 +473,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
@Override @Override
public Collection<ConnectionFactory> getConnectionFactories() public Collection<ConnectionFactory> getConnectionFactories()
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
return _factories.values(); return _factories.values();
} }
@ -463,7 +481,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void setConnectionFactories(Collection<ConnectionFactory> factories) public void setConnectionFactories(Collection<ConnectionFactory> factories)
{ {
synchronized (_factories) try (Locker.Lock lock = _locker.lock())
{ {
List<ConnectionFactory> existing = new ArrayList<>(_factories.values()); List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
for (ConnectionFactory factory: existing) for (ConnectionFactory factory: existing)
@ -538,14 +556,23 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
return getConnectionFactory(_defaultProtocol); return getConnectionFactory(_defaultProtocol);
} }
protected boolean handleAcceptFailure(Throwable previous, Throwable current) protected boolean handleAcceptFailure(Throwable ex)
{ {
if (isAccepting()) if (isRunning())
{ {
if (previous == null) if (ex instanceof InterruptedException)
LOG.warn(current); {
else LOG.debug(ex);
LOG.debug(current); return true;
}
if (ex instanceof ClosedByInterruptException)
{
LOG.debug(ex);
return false;
}
LOG.warn(ex);
try try
{ {
// Arbitrary sleep to avoid spin looping. // Arbitrary sleep to avoid spin looping.
@ -556,12 +583,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
} }
catch (Throwable x) catch (Throwable x)
{ {
return false; LOG.ignore(x);
} }
return false;
} }
else else
{ {
LOG.ignore(current); LOG.ignore(ex);
return false; return false;
} }
} }
@ -595,19 +623,28 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
try try
{ {
Throwable exception = null; while (isRunning())
while (isAccepting())
{ {
try (Locker.Lock lock = _locker.lock())
{
if (!_accepting && isRunning())
{
_setAccepting.await();
continue;
}
}
catch (InterruptedException e)
{
continue;
}
try try
{ {
accept(_id); accept(_id);
exception = null;
} }
catch (Throwable x) catch (Throwable x)
{ {
if (handleAcceptFailure(exception, x)) if (!handleAcceptFailure(x))
exception = x;
else
break; break;
} }
} }
@ -636,12 +673,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
return String.format("acceptor-%d@%x", _id, hashCode()); return String.format("acceptor-%d@%x", _id, hashCode());
return name; return name;
} }
} }
// protected void connectionOpened(Connection connection) // protected void connectionOpened(Connection connection)
// { // {
// _stats.connectionOpened(); // _stats.connectionOpened();

View File

@ -96,8 +96,6 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme
@Override @Override
public void close() public void close()
{ {
// Interrupting is often sufficient to close the channel
interruptAcceptors();
} }
@ -108,10 +106,12 @@ public abstract class AbstractNetworkConnector extends AbstractConnector impleme
return super.shutdown(); return super.shutdown();
} }
@Override protected boolean handleAcceptFailure(Throwable ex)
protected boolean isAccepting()
{ {
return super.isAccepting() && isOpen(); if (isOpen())
return super.handleAcceptFailure(ex);
LOG.ignore(ex);
return false;
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
@ -31,6 +32,7 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ChannelEndPoint;
@ -79,6 +81,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class ServerConnector extends AbstractNetworkConnector public class ServerConnector extends AbstractNetworkConnector
{ {
private final SelectorManager _manager; private final SelectorManager _manager;
private final AtomicReference<Closeable> _acceptor = new AtomicReference<>();
private volatile ServerSocketChannel _acceptChannel; private volatile ServerSocketChannel _acceptChannel;
private volatile boolean _inheritChannel = false; private volatile boolean _inheritChannel = false;
private volatile int _localPort = -1; private volatile int _localPort = -1;
@ -237,7 +240,7 @@ public class ServerConnector extends AbstractNetworkConnector
if (getAcceptors()==0) if (getAcceptors()==0)
{ {
_acceptChannel.configureBlocking(false); _acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel); _acceptor.set(_manager.acceptor(_acceptChannel));
} }
} }
@ -344,14 +347,14 @@ public class ServerConnector extends AbstractNetworkConnector
@Override @Override
public void close() public void close()
{ {
super.close();
ServerSocketChannel serverChannel = _acceptChannel; ServerSocketChannel serverChannel = _acceptChannel;
_acceptChannel = null; _acceptChannel = null;
if (serverChannel != null) if (serverChannel != null)
{ {
removeBean(serverChannel); removeBean(serverChannel);
// If the interrupt did not close it, we should close it
if (serverChannel.isOpen()) if (serverChannel.isOpen())
{ {
try try
@ -364,7 +367,6 @@ public class ServerConnector extends AbstractNetworkConnector
} }
} }
} }
// super.close();
_localPort = -2; _localPort = -2;
} }
@ -375,6 +377,7 @@ public class ServerConnector extends AbstractNetworkConnector
if (serverChannel != null && serverChannel.isOpen()) if (serverChannel != null && serverChannel.isOpen())
{ {
SocketChannel channel = serverChannel.accept(); SocketChannel channel = serverChannel.accept();
System.err.println("Accepted "+channel);
accepted(channel); accepted(channel);
} }
} }
@ -483,6 +486,38 @@ public class ServerConnector extends AbstractNetworkConnector
_reuseAddress = reuseAddress; _reuseAddress = reuseAddress;
} }
@Override
public void setAccepting(boolean accepting)
{
super.setAccepting(accepting);
if (getAcceptors()>0)
return;
try
{
if (accepting)
{
if (_acceptor.get()==null)
{
Closeable acceptor = _manager.acceptor(_acceptChannel);
if (!_acceptor.compareAndSet(null,acceptor))
acceptor.close();
}
}
else
{
Closeable acceptor = _acceptor.get();
if (acceptor!=null && _acceptor.compareAndSet(acceptor,null))
acceptor.close();
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
protected class ServerConnectorManager extends SelectorManager protected class ServerConnectorManager extends SelectorManager
{ {
public ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors) public ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors)

View File

@ -0,0 +1,281 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class NotAcceptingTest
{
@Test
public void testServerConnectorBlockingAccept() throws Exception
{
Server server = new Server();
ServerConnector connector = new ServerConnector(server,1,1);
connector.setPort(0);
connector.setIdleTimeout(500);
connector.setAcceptQueueSize(10);
server.addConnector(connector);
TestHandler handler = new TestHandler();
server.setHandler(handler);
server.start();
try(Socket client0 = new Socket("localhost",connector.getLocalPort());)
{
HttpTester.Input in0 = HttpTester.from(client0.getInputStream());
client0.getOutputStream().write("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
String uri = handler.exchange.exchange("data");
assertThat(uri,is("/one"));
HttpTester.Response response = HttpTester.parseResponse(in0);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("data"));
connector.setAccepting(false);
// 0th connection still working
client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
uri = handler.exchange.exchange("more data");
assertThat(uri,is("/two"));
response = HttpTester.parseResponse(in0);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("more data"));
try(Socket client1 = new Socket("localhost",connector.getLocalPort());)
{
// can't stop next connection being accepted
HttpTester.Input in1 = HttpTester.from(client1.getInputStream());
client1.getOutputStream().write("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
uri = handler.exchange.exchange("new connection");
assertThat(uri,is("/three"));
response = HttpTester.parseResponse(in1);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("new connection"));
try(Socket client2 = new Socket("localhost",connector.getLocalPort());)
{
HttpTester.Input in2 = HttpTester.from(client2.getInputStream());
client2.getOutputStream().write("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
try
{
uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS);
Assert.fail(uri);
}
catch(TimeoutException e)
{
// Can we accept the original?
connector.setAccepting(true);
uri = handler.exchange.exchange("delayed connection");
assertThat(uri,is("/four"));
response = HttpTester.parseResponse(in2);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("delayed connection"));
}
}
}
}
}
@Test
public void testLocalConnector() throws Exception
{
Server server = new Server();
LocalConnector connector = new LocalConnector(server);
connector.setIdleTimeout(500);
server.addConnector(connector);
TestHandler handler = new TestHandler();
server.setHandler(handler);
server.start();
try(LocalEndPoint client0 = connector.connect())
{
client0.addInputAndExecute(BufferUtil.toBuffer("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n"));
String uri = handler.exchange.exchange("data");
assertThat(uri,is("/one"));
HttpTester.Response response = HttpTester.parseResponse(client0.getResponse());
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("data"));
connector.setAccepting(false);
// 0th connection still working
client0.addInputAndExecute(BufferUtil.toBuffer("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n"));
uri = handler.exchange.exchange("more data");
assertThat(uri,is("/two"));
response = HttpTester.parseResponse(client0.getResponse());
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("more data"));
try(LocalEndPoint client1 = connector.connect())
{
// can't stop next connection being accepted
client1.addInputAndExecute(BufferUtil.toBuffer("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n"));
uri = handler.exchange.exchange("new connection");
assertThat(uri,is("/three"));
response = HttpTester.parseResponse(client1.getResponse());
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("new connection"));
try(LocalEndPoint client2 = connector.connect())
{
client2.addInputAndExecute(BufferUtil.toBuffer("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n"));
try
{
uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS);
Assert.fail(uri);
}
catch(TimeoutException e)
{
// Can we accept the original?
connector.setAccepting(true);
uri = handler.exchange.exchange("delayed connection");
assertThat(uri,is("/four"));
response = HttpTester.parseResponse(client2.getResponse());
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("delayed connection"));
}
}
}
}
}
@Test
public void testServerConnectorAsyncAccept() throws Exception
{
Server server = new Server();
ServerConnector connector = new ServerConnector(server,0,1);
connector.setPort(0);
connector.setIdleTimeout(500);
connector.setAcceptQueueSize(10);
server.addConnector(connector);
TestHandler handler = new TestHandler();
server.setHandler(handler);
server.start();
try(Socket client0 = new Socket("localhost",connector.getLocalPort());)
{
HttpTester.Input in0 = HttpTester.from(client0.getInputStream());
client0.getOutputStream().write("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
String uri = handler.exchange.exchange("data");
assertThat(uri,is("/one"));
HttpTester.Response response = HttpTester.parseResponse(in0);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("data"));
connector.setAccepting(false);
// 0th connection still working
client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
uri = handler.exchange.exchange("more data");
assertThat(uri,is("/two"));
response = HttpTester.parseResponse(in0);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("more data"));
try(Socket client1 = new Socket("localhost",connector.getLocalPort());)
{
HttpTester.Input in1 = HttpTester.from(client1.getInputStream());
client1.getOutputStream().write("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes());
try
{
uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS);
Assert.fail(uri);
}
catch(TimeoutException e)
{
// Can we accept the original?
connector.setAccepting(true);
uri = handler.exchange.exchange("delayed connection");
assertThat(uri,is("/three"));
response = HttpTester.parseResponse(in1);
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is("delayed connection"));
}
}
}
}
public static class TestHandler extends AbstractHandler
{
final Exchanger<String> exchange = new Exchanger<>();
transient int handled;
public TestHandler()
{
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
String content = exchange.exchange(baseRequest.getRequestURI());
baseRequest.setHandled(true);
handled++;
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(content);
}
catch (InterruptedException e)
{
throw new ServletException(e);
}
}
public int getHandled()
{
return handled;
}
}
}

View File

@ -54,11 +54,18 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest
startServer(connector); startServer(connector);
} }
@Test(timeout=60000)
public void testStartStopStart() throws Exception
{
_server.stop();
_server.start();
}
@Test(timeout=60000) @Test(timeout=60000)
public void testIdleTimeoutAfterSuspend() throws Exception public void testIdleTimeoutAfterSuspend() throws Exception
{ {
SuspendHandler _handler = new SuspendHandler();
_server.stop(); _server.stop();
SuspendHandler _handler = new SuspendHandler();
SessionHandler session = new SessionHandler(); SessionHandler session = new SessionHandler();
session.setHandler(_handler); session.setHandler(_handler);
_server.setHandler(session); _server.setHandler(session);