442950 - Embedded Jetty client requests to localhost hangs with high cpu usage (NIO OP_CONNECT Solaris/Sparc).
Now checking the return value of SocketChannel.connect() to determine whether to register the channel (true) or finish the connect (false).
This commit is contained in:
parent
ca2a23c35d
commit
9dbd2cd9d8
|
@ -88,11 +88,14 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
|
|||
channel.bind(bindAddress);
|
||||
configure(client, channel);
|
||||
channel.configureBlocking(false);
|
||||
channel.connect(address);
|
||||
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
|
||||
selectorManager.connect(channel, context);
|
||||
|
||||
if (channel.connect(address))
|
||||
selectorManager.accept(channel, context);
|
||||
else
|
||||
selectorManager.connect(channel, context);
|
||||
}
|
||||
// Must catch all exceptions, since some like
|
||||
// UnresolvedAddressException are not IOExceptions.
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.io;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.CancelledKeyException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
|
@ -178,11 +180,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
/**
|
||||
* <p>Registers a channel to perform a non-blocking connect.</p>
|
||||
* <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
|
||||
* must be called prior to calling this method.</p>
|
||||
* <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
|
||||
* must be called prior to calling this method, and the connect operation must not be completed
|
||||
* (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
|
||||
*
|
||||
* @param channel the channel to register
|
||||
* @param attachment the attachment object
|
||||
* @see #accept(SocketChannel, Object)
|
||||
*/
|
||||
public void connect(SocketChannel channel, Object attachment)
|
||||
{
|
||||
|
@ -190,17 +194,28 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
set.submit(set.new Connect(channel, attachment));
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #accept(SocketChannel, Object)
|
||||
*/
|
||||
public void accept(SocketChannel channel)
|
||||
{
|
||||
accept(channel, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Registers a channel to perform non-blocking read/write operations.</p>
|
||||
* <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
|
||||
* or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
|
||||
* or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
|
||||
* just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
|
||||
* successfully.</p>
|
||||
*
|
||||
* @param channel the channel to register
|
||||
* @param attachment the attachment object
|
||||
*/
|
||||
public void accept(final SocketChannel channel)
|
||||
public void accept(SocketChannel channel, Object attachment)
|
||||
{
|
||||
final ManagedSelector selector = chooseSelector();
|
||||
selector.submit(selector.new Accept(channel));
|
||||
selector.submit(selector.new Accept(channel, attachment));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -211,7 +226,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
*
|
||||
* @param server the server channel to register
|
||||
*/
|
||||
public void acceptor(final ServerSocketChannel server)
|
||||
public void acceptor(ServerSocketChannel server)
|
||||
{
|
||||
final ManagedSelector selector = chooseSelector();
|
||||
selector.submit(selector.new Acceptor(server));
|
||||
|
@ -856,11 +871,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
private class Accept implements Runnable
|
||||
{
|
||||
private final SocketChannel _channel;
|
||||
private final SocketChannel channel;
|
||||
private final Object attachment;
|
||||
|
||||
public Accept(SocketChannel channel)
|
||||
private Accept(SocketChannel channel, Object attachment)
|
||||
{
|
||||
this._channel = channel;
|
||||
this.channel = channel;
|
||||
this.attachment = attachment;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -868,13 +885,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
try
|
||||
{
|
||||
SelectionKey key = _channel.register(_selector, 0, null);
|
||||
EndPoint endpoint = createEndPoint(_channel, key);
|
||||
SelectionKey key = channel.register(_selector, 0, attachment);
|
||||
EndPoint endpoint = createEndPoint(channel, key);
|
||||
key.attach(endpoint);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
closeNoExceptions(_channel);
|
||||
closeNoExceptions(channel);
|
||||
LOG.debug(x);
|
||||
}
|
||||
}
|
||||
|
@ -887,7 +904,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private final Object attachment;
|
||||
private final Scheduler.Task timeout;
|
||||
|
||||
public Connect(SocketChannel channel, Object attachment)
|
||||
private Connect(SocketChannel channel, Object attachment)
|
||||
{
|
||||
this.channel = channel;
|
||||
this.attachment = attachment;
|
||||
|
@ -907,7 +924,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
protected void failed(Throwable failure)
|
||||
private void failed(Throwable failure)
|
||||
{
|
||||
if (failed.compareAndSet(false, true))
|
||||
{
|
||||
|
|
|
@ -250,15 +250,18 @@ public class ConnectHandler extends HandlerWrapper
|
|||
channel.socket().setTcpNoDelay(true);
|
||||
channel.configureBlocking(false);
|
||||
InetSocketAddress address = new InetSocketAddress(host, port);
|
||||
channel.connect(address);
|
||||
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connecting to {}", address);
|
||||
|
||||
ConnectContext connectContext = new ConnectContext(request, response, asyncContext, HttpConnection.getCurrentConnection());
|
||||
selector.connect(channel, connectContext);
|
||||
if (channel.connect(address))
|
||||
selector.accept(channel, connectContext);
|
||||
else
|
||||
selector.connect(channel, connectContext);
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
|
|
|
@ -156,7 +156,6 @@ public class SPDYClient
|
|||
channel.bind(bindAddress);
|
||||
configure(channel);
|
||||
channel.configureBlocking(false);
|
||||
channel.connect(address);
|
||||
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, ((InetSocketAddress)address).getHostString());
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, ((InetSocketAddress)address).getPort());
|
||||
|
@ -164,7 +163,10 @@ public class SPDYClient
|
|||
context.put(SPDYClientConnectionFactory.SPDY_SESSION_LISTENER_CONTEXT_KEY, listener);
|
||||
context.put(SPDYClientConnectionFactory.SPDY_SESSION_PROMISE_CONTEXT_KEY, promise);
|
||||
|
||||
factory.selector.connect(channel, context);
|
||||
if (channel.connect(address))
|
||||
factory.selector.accept(channel, context);
|
||||
else
|
||||
factory.selector.connect(channel, context);
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
|
|
|
@ -74,8 +74,14 @@ public class ConnectionManager extends ContainerLifeCycle
|
|||
|
||||
InetSocketAddress address = toSocketAddress(wsUri);
|
||||
|
||||
channel.connect(address);
|
||||
getSelector().connect(channel,this);
|
||||
if (channel.connect(address))
|
||||
{
|
||||
getSelector().accept(channel, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
getSelector().connect(channel, this);
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue