jetty-9: Added support for connect timeout.

This commit is contained in:
Simone Bordet 2012-10-05 10:51:19 -07:00
parent f4d12412eb
commit 2748a9381e
10 changed files with 281 additions and 125 deletions

View File

@ -169,7 +169,7 @@ public class HttpClient extends ContainerLifeCycle
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager();
return new ClientSelectorManager(getExecutor(), getScheduler());
}
@Override
@ -516,20 +516,20 @@ public class HttpClient extends ContainerLifeCycle
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager()
public ClientSelectorManager(Executor executor, Scheduler scheduler)
{
this(1);
this(executor, scheduler, 1);
}
public ClientSelectorManager(int selectors)
public ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(selectors);
super(executor, scheduler, selectors);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
{
return new SelectChannelEndPoint(channel, selector, key, scheduler, getIdleTimeout());
return new SelectChannelEndPoint(channel, selector, key, getScheduler(), getIdleTimeout());
}
@Override
@ -574,19 +574,12 @@ public class HttpClient extends ContainerLifeCycle
}
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
ConnectionCallback callback = (ConnectionCallback)attachment;
callback.callback.failed(null, ex);
}
@Override
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
}
private class ConnectionCallback extends FutureCallback<Connection>

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
@ -35,16 +36,18 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
@ -56,25 +59,53 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
protected static final Logger LOG = Log.getLogger(SelectorManager.class);
private final Executor executor;
private final Scheduler scheduler;
private final ManagedSelector[] _selectors;
private volatile long _selectorIndex;
private long _connectTimeout = 15000;
private long _selectorIndex;
protected SelectorManager()
protected SelectorManager(Executor executor, Scheduler scheduler)
{
this((Runtime.getRuntime().availableProcessors() + 1) / 2);
this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
}
protected SelectorManager(@Name(value="selectors") int selectors)
protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
this.executor = executor;
this.scheduler = scheduler;
_selectors = new ManagedSelector[selectors];
}
public Executor getExecutor()
{
return executor;
}
public Scheduler getScheduler()
{
return scheduler;
}
public long getConnectTimeout()
{
return _connectTimeout;
}
public void setConnectTimeout(long connectTimeout)
{
_connectTimeout = connectTimeout;
}
/**
* Executes the given task in a different thread.
*
* @param task the task to execute
*/
protected abstract void execute(Runnable task);
protected void execute(Runnable task)
{
executor.execute(task);
}
/**
* @return the number of selectors in use
@ -207,6 +238,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
protected boolean finishConnect(SocketChannel channel) throws IOException
{
return channel.finishConnect();
}
/**
* <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
* <p>By default it just logs with level warning.</p>
@ -416,27 +452,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
else if (key.isConnectable())
{
// Complete a connection of a registered channel
SocketChannel channel = (SocketChannel)key.channel();
try
{
boolean connected = channel.finishConnect();
if (connected)
{
key.interestOps(0);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
else
{
throw new ConnectException();
}
}
catch (Exception x)
{
connectionFailed(channel, x, attachment);
closeNoExceptions(channel);
}
processConnect(key, (Connect)attachment);
}
else
{
@ -457,6 +473,32 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
private void processConnect(SelectionKey key, Connect connect)
{
key.attach(connect.attachment);
SocketChannel channel = (SocketChannel)key.channel();
try
{
boolean connected = finishConnect(channel);
if (connected)
{
connect.timeout.cancel();
key.interestOps(0);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
else
{
throw new ConnectException();
}
}
catch (Exception x)
{
connect.failed(x);
closeNoExceptions(channel);
}
}
private void closeNoExceptions(Closeable closeable)
{
try
@ -469,11 +511,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
public SelectorManager getSelectorManager()
{
return SelectorManager.this;
}
public void wakeup()
{
_selector.wakeup();
@ -653,13 +690,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private class Connect implements Runnable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SocketChannel channel;
private final Object attachment;
private final Scheduler.Task timeout;
public Connect(SocketChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
}
@Override
@ -667,13 +707,52 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
try
{
channel.register(_selector, SelectionKey.OP_CONNECT, attachment);
channel.register(_selector, SelectionKey.OP_CONNECT, this);
}
catch (ClosedChannelException x)
{
LOG.debug(x);
}
}
protected void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
connectionFailed(channel, failure, attachment);
}
}
private class ConnectTimeout implements Runnable
{
private final Connect connect;
private ConnectTimeout(Connect connect)
{
this.connect = connect;
}
@Override
public void run()
{
SocketChannel channel = connect.channel;
if (channel.isConnectionPending())
{
LOG.debug("Channel {} timed out while connecting, closing it", channel);
try
{
// This will unregister the channel from the selector
channel.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
finally
{
connect.failed(new SocketTimeoutException());
}
}
}
}
private class Stop implements Runnable

View File

@ -60,18 +60,12 @@ public class SelectChannelEndPointInterestsTest
connector = ServerSocketChannel.open();
connector.bind(new InetSocketAddress("localhost", 0));
selectorManager = new SelectorManager()
selectorManager = new SelectorManager(threadPool, scheduler)
{
@Override
protected void execute(Runnable task)
{
threadPool.execute(task);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000)
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), 60000)
{
@Override
protected void onIncompleteFlush()
@ -85,7 +79,7 @@ public class SelectChannelEndPointInterestsTest
@Override
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
{
return new AbstractConnection(endPoint, threadPool)
return new AbstractConnection(endPoint, getExecutor())
{
@Override
public void onOpen()

View File

@ -60,14 +60,8 @@ public class SelectChannelEndPointTest
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager()
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
{
@Override
protected void execute(Runnable task)
{
_threadPool.execute(task);
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
@ -77,7 +71,7 @@ public class SelectChannelEndPointTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000);
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
_lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp;

View File

@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SelectorManagerTest
{
private QueuedThreadPool executor = new QueuedThreadPool();
private TimerScheduler scheduler = new TimerScheduler();
@Before
public void prepare() throws Exception
{
executor.start();
scheduler.start();
}
@After
public void dispose() throws Exception
{
scheduler.stop();
executor.stop();
}
@Slow
@Test
public void testConnectTimeoutBeforeSuccessfulConnect() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
SocketAddress address = server.getLocalAddress();
SocketChannel client = SocketChannel.open();
client.configureBlocking(false);
client.connect(address);
final long connectTimeout = 1000;
SelectorManager selectorManager = new SelectorManager(executor, scheduler)
{
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), connectTimeout / 2);
}
@Override
protected boolean finishConnect(SocketChannel channel) throws IOException
{
try
{
TimeUnit.MILLISECONDS.sleep(connectTimeout * 2);
return super.finishConnect(channel);
}
catch (InterruptedException e)
{
return false;
}
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return new AbstractConnection(endpoint, executor)
{
@Override
public void onFillable()
{
}
};
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
((Callback<Void>)attachment).failed(null, ex);
}
};
selectorManager.setConnectTimeout(connectTimeout);
selectorManager.start();
try
{
final CountDownLatch latch = new CountDownLatch(1);
selectorManager.connect(client, new Callback.Empty<Void>()
{
@Override
public void failed(Void context, Throwable x)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(connectTimeout * 3, TimeUnit.MILLISECONDS));
}
finally
{
selectorManager.stop();
}
}
}

View File

@ -30,7 +30,6 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket;
@ -69,23 +68,17 @@ public class SslConnectionTest
_dispatches.incrementAndGet();
return super.dispatch(job);
}
};
protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager()
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
{
@Override
protected void execute(Runnable task)
{
_threadPool.execute(task);
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
SSLEngine engine = __sslCtxFactory.newSSLEngine();
engine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine);
SslConnection sslConnection = new SslConnection(__byteBufferPool, getExecutor(), endpoint, engine);
Connection appConnection = new TestConnection(sslConnection.getDecryptedEndPoint());
sslConnection.getDecryptedEndPoint().setConnection(appConnection);
@ -96,7 +89,7 @@ public class SslConnectionTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, _scheduler, 60000);
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, getScheduler(), 60000);
_lastEndp=endp;
return endp;
}
@ -254,7 +247,7 @@ public class SslConnectionTest
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(1, _dispatches.get());
client.close();
}

View File

@ -133,7 +133,7 @@ public class ServerConnector extends AbstractNetworkConnector
@Name("factories") ConnectionFactory... factories)
{
super(server,executor,scheduler,pool,acceptors,factories);
_manager = new ServerConnectorManager(selectors > 0 ? selectors : Runtime.getRuntime().availableProcessors());
_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors > 0 ? selectors : Runtime.getRuntime().availableProcessors());
addBean(_manager, true);
}
@ -346,31 +346,11 @@ public class ServerConnector extends AbstractNetworkConnector
private final class ServerConnectorManager extends SelectorManager
{
private ServerConnectorManager(int selectors)
private ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(selectors);
super(executor, scheduler, selectors);
}
@Override
protected void execute(Runnable task)
{
getExecutor().execute(task);
}
// TODO
// @Override
// public void connectionOpened(Connection connection)
// {
// ServerConnector.this.connectionOpened(connection);
// }
// TODO
// @Override
// public void connectionClosed(Connection connection)
// {
// ServerConnector.this.connectionClosed(connection);
// }
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{

View File

@ -156,9 +156,9 @@ public class SPDYClient
public static class Factory extends ContainerLifeCycle
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
final ByteBufferPool bufferPool = new MappedByteBufferPool();
final Scheduler scheduler = new TimerScheduler();
final Executor executor;
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Scheduler scheduler = new TimerScheduler();
private final Executor executor;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
private final long idleTimeout;
@ -198,9 +198,9 @@ public class SPDYClient
if (sslContextFactory != null)
addBean(sslContextFactory);
selector = new ClientSelectorManager();
// TODO: configure connect timeout
selector = new ClientSelectorManager(executor, scheduler);
addBean(selector);
}
public SPDYClient newSPDYClient(short version)
@ -242,6 +242,11 @@ public class SPDYClient
private class ClientSelectorManager extends SelectorManager
{
private ClientSelectorManager(Executor executor, Scheduler scheduler)
{
super(executor, scheduler);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
@ -251,13 +256,7 @@ public class SPDYClient
if (clientIdleTimeout < 0)
clientIdleTimeout = idleTimeout;
return new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout);
}
@Override
protected void execute(Runnable task)
{
executor.execute(task);
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), clientIdleTimeout);
}
@Override
@ -271,9 +270,9 @@ public class SPDYClient
if (sslContextFactory != null)
{
final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(bufferPool, executor, endPoint, engine);
SslConnection sslConnection = new SslConnection(bufferPool, getExecutor(), endPoint, engine);
DecryptedEndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(channel, sslEndPoint, attachment, client.factory.executor, client);
NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(channel, sslEndPoint, attachment, getExecutor(), client);
sslEndPoint.setConnection(connection);
return sslConnection;
}

View File

@ -85,6 +85,7 @@ public class ConnectionManager extends ContainerLifeCycle
public ConnectionManager(ByteBufferPool bufferPool, Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory,
WebSocketPolicy policy)
{
// TODO: configure connect timeout
selector = new WebSocketClientSelectorManager(bufferPool,executor,scheduler,policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);

View File

@ -41,27 +41,17 @@ import org.eclipse.jetty.websocket.core.api.WebSocketPolicy;
public class WebSocketClientSelectorManager extends SelectorManager
{
private static final Logger LOG = Log.getLogger(WebSocketClientSelectorManager.class);
private final Executor executor;
private final Scheduler scheduler;
private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool;
private SslContextFactory sslContextFactory;
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, Scheduler scheduler, WebSocketPolicy policy)
{
super();
super(executor, scheduler);
this.bufferPool = bufferPool;
this.executor = executor;
this.scheduler = scheduler;
this.policy = policy;
}
@Override
protected void execute(Runnable task)
{
this.executor.execute(task);
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
@ -83,7 +73,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
if (sslContextFactory != null)
{
SSLEngine engine = newSSLEngine(sslContextFactory,channel);
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine);
SslConnection sslConnection = new SslConnection(bufferPool,getExecutor(),endPoint,engine);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = newUpgradeConnection(channel,sslEndPoint,client);
@ -116,7 +106,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
LOG.debug("newEndPoint({}, {}, {})",channel,selectSet,selectionKey);
return new SelectChannelEndPoint(channel,selectSet,selectionKey,scheduler,policy.getIdleTimeout());
return new SelectChannelEndPoint(channel,selectSet,selectionKey,getScheduler(),policy.getIdleTimeout());
}
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)