337678 Readded optional async connection mode for HttpClient

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2809 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2011-02-22 03:28:43 +00:00
parent 4d93ffd0c8
commit 02e75add4f
8 changed files with 233 additions and 49 deletions

View File

@ -9,6 +9,7 @@ jetty-7.3.1-SNAPSHOT
+ 337268 Allow specifying alias of a certificate to be used by SSL connector
+ 337270 Shared Timer for session management
+ 337271 Flush SSL endpoint when dispatch thread held forever
+ 337678 Readded optional async connection mode for HttpClient
+ JETTY-1331 Allow alternate XML configuration processors (eg spring)
jetty-7.3.0.v20110203 3 February 2011

View File

@ -75,6 +75,7 @@ public class HttpClient extends HttpBuffers implements Attributes
private int _connectorType = CONNECTOR_SELECT_CHANNEL;
private boolean _useDirectBuffers = true;
private boolean _asyncConnects = false;
private int _maxConnectionsPerAddress = Integer.MAX_VALUE;
private ConcurrentMap<Address, HttpDestination> _destinations = new ConcurrentHashMap<Address, HttpDestination>();
ThreadPool _threadPool;
@ -107,6 +108,24 @@ public class HttpClient extends HttpBuffers implements Attributes
_sslContextFactory = sslContextFactory;
}
/* ------------------------------------------------------------------------------- */
/**
* @return True if connects will be in blocking mode.
*/
public boolean isAsyncConnects()
{
return _asyncConnects;
}
/* ------------------------------------------------------------------------------- */
/**
* @param blockingConnects True if connects will be in blocking mode.
*/
public void setAsyncConnects(boolean blockingConnects)
{
_asyncConnects = blockingConnects;
}
/* ------------------------------------------------------------------------------- */
public void dump()
{

View File

@ -17,6 +17,9 @@ import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
@ -42,6 +45,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{
private final HttpClient _httpClient;
private final Manager _selectorManager=new Manager();
private final Timeout _connectTimer = new Timeout();
private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
private SSLContext _sslContext;
private Buffers _sslBuffers;
private boolean _blockingConnect;
@ -77,7 +82,32 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
protected void doStart() throws Exception
{
super.doStart();
_connectTimer.setDuration(_httpClient.getConnectTimeout());
_connectTimer.setNow();
if (_httpClient.isAsyncConnects())
{
_httpClient._threadPool.dispatch(new Runnable()
{
public void run()
{
while (isRunning())
{
_connectTimer.tick(System.currentTimeMillis());
try
{
Thread.sleep(200);
}
catch (InterruptedException x)
{
Thread.currentThread().interrupt();
break;
}
}
}
});
}
_selectorManager.start();
final boolean direct=_httpClient.getUseDirectBuffers();
@ -126,6 +156,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
@Override
protected void doStop() throws Exception
{
_connectTimer.cancelAll();
_selectorManager.stop();
}
@ -137,14 +168,28 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{
SocketChannel channel = SocketChannel.open();
Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
channel.configureBlocking( true );
channel.socket().setTcpNoDelay(true);
channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
channel.connect(address.toSocketAddress());
channel.configureBlocking(false);
channel.socket().setSoTimeout((int)_httpClient.getTimeout());
_selectorManager.register( channel, destination );
if (_httpClient.isAsyncConnects())
{
channel.configureBlocking( false );
channel.connect(address.toSocketAddress());
_selectorManager.register( channel, destination );
ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination);
_connectTimer.schedule(connectTimeout);
_connectingChannels.put(channel, connectTimeout);
}
else
{
channel.configureBlocking( true );
channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
channel.socket().connect(address.toSocketAddress(),_httpClient.getConnectTimeout());
channel.configureBlocking(false);
channel.socket().setSoTimeout((int)_httpClient.getTimeout());
_selectorManager.register( channel, destination );
}
}
catch(IOException ex)
{
@ -207,6 +252,12 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{
// We're connected, cancel the connect timeout
Timeout.Task connectTimeout = _connectingChannels.remove(channel);
if (connectTimeout != null)
connectTimeout.cancel();
Log.debug("Channels with connection pending: {}", _connectingChannels.size());
// key should have destination at this point (will be replaced by endpoint after this call)
HttpDestination dest=(HttpDestination)key.attachment();
@ -249,6 +300,19 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
return sslEngine;
}
/* ------------------------------------------------------------ */
/* (non-Javadoc)
* @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
*/
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (attachment instanceof HttpDestination)
((HttpDestination)attachment).onConnectionFailed(ex);
else
super.connectionFailed(channel,ex,attachment);
}
}
private class ConnectTimeout extends Timeout.Task

View File

@ -28,8 +28,16 @@ import static org.junit.Assert.assertTrue;
/**
* @version $Revision$ $Date$
*/
public class ConnectionTest
public abstract class AbstractConnectionTest
{
protected HttpClient newHttpClient()
{
HttpClient httpClient = new HttpClient();
// httpClient.setConnectorType(HttpClient.CONNECTOR_SOCKET);
return httpClient;
}
@Test
public void testServerClosedConnection() throws Exception
{
@ -37,7 +45,7 @@ public class ConnectionTest
serverSocket.bind(null);
int port=serverSocket.getLocalPort();
HttpClient httpClient = new HttpClient();
HttpClient httpClient = newHttpClient();
httpClient.setMaxConnectionsPerAddress(1);
httpClient.start();
try
@ -94,7 +102,7 @@ public class ConnectionTest
int port=serverSocket.getLocalPort();
serverSocket.close();
HttpClient httpClient = new HttpClient();
HttpClient httpClient = newHttpClient();
httpClient.start();
try
{
@ -132,7 +140,7 @@ public class ConnectionTest
int port=serverSocket.getLocalPort();
serverSocket.close();
HttpClient httpClient = new HttpClient();
HttpClient httpClient = newHttpClient();
httpClient.setMaxConnectionsPerAddress(1);
httpClient.start();
try
@ -169,39 +177,9 @@ public class ConnectionTest
}
@Test
public void testConnectionTimeoutWithSocketConnector() throws Exception
public void testConnectionTimeout() throws Exception
{
HttpClient httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SOCKET);
int connectTimeout = 5000;
httpClient.setConnectTimeout(connectTimeout);
httpClient.start();
try
{
CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = new ConnectionExchange(latch);
// Using a IP address has a different behavior than using a host name
exchange.setAddress(new Address("127.0.0.1", 1));
exchange.setURI("/");
httpClient.send(exchange);
boolean passed = latch.await(connectTimeout * 2L, TimeUnit.MILLISECONDS);
assertTrue(passed);
int status = exchange.waitForDone();
assertEquals(HttpExchange.STATUS_EXCEPTED, status);
}
finally
{
httpClient.stop();
}
}
@Test
public void testConnectionTimeoutWithSelectConnector() throws Exception
{
HttpClient httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
HttpClient httpClient = newHttpClient();
int connectTimeout = 5000;
httpClient.setConnectTimeout(connectTimeout);
httpClient.start();
@ -233,7 +211,7 @@ public class ConnectionTest
serverSocket.bind(null);
int port=serverSocket.getLocalPort();
HttpClient httpClient = new HttpClient();
HttpClient httpClient = newHttpClient();
httpClient.setIdleTimeout(700);
httpClient.start();
try

View File

@ -0,0 +1,25 @@
// ========================================================================
// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
public class AsyncSelectConnectionTest extends AbstractConnectionTest
{
protected HttpClient newHttpClient()
{
HttpClient httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
httpClient.setAsyncConnects(true);
return httpClient;
}
}

View File

@ -0,0 +1,25 @@
// ========================================================================
// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
public class SelectConnectionTest extends AbstractConnectionTest
{
protected HttpClient newHttpClient()
{
HttpClient httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
httpClient.setAsyncConnects(false);
return httpClient;
}
}

View File

@ -0,0 +1,30 @@
// ========================================================================
// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
public class SocketConnectionTest extends AbstractConnectionTest
{
protected HttpClient newHttpClient()
{
HttpClient httpClient = new HttpClient();
httpClient.setConnectorType(HttpClient.CONNECTOR_SOCKET);
return httpClient;
}
@Override
public void testServerClosedConnection()
{
// TODO work out why this does not work
}
}

View File

@ -289,7 +289,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
/* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
{
Log.warn(ex+","+channel+","+attachment);
Log.debug(ex);
}
/* ------------------------------------------------------------ */
public String dump()
{
@ -397,10 +403,18 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
final ChannelAndAttachment asc = (ChannelAndAttachment)change;
final SelectableChannel channel=asc._channel;
final Object att = asc._attachment;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
{
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (channel.isOpen())
{
channel.register(selector,SelectionKey.OP_CONNECT,att);
}
}
else if (change instanceof SocketChannel)
{
@ -503,6 +517,34 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
((SelectChannelEndPoint)att).schedule();
}
else if (key.isConnectable())
{
// Complete a connection of a registered channel
SocketChannel channel = (SocketChannel)key.channel();
boolean connected=false;
try
{
connected=channel.finishConnect();
}
catch(Exception e)
{
connectionFailed(channel,e,att);
}
finally
{
if (connected)
{
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else
{
key.cancel();
}
}
}
else
{
// Wrap readable registered channel in an endpoint