445167 - Allow configuration of dispatch after select.

Introduced parameter "dispatchIO" in the relevant factories so that
they can be configured by users and connections will be created
taking into account this parameter.

For less configurable connection factories, this parameter is
currently hardcoded to either true or false depending on the case.
For example, ALPN and NPN connections have it to false, since they
don't do any blocking operation in onFillable().
This commit is contained in:
Simone Bordet 2014-09-26 09:46:14 +02:00
parent a8b461fe91
commit 8d2efaf7eb
27 changed files with 120 additions and 121 deletions

View File

@ -85,7 +85,7 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
public Socks4ProxyConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
super(endPoint, executor, false);
this.connectionFactory = connectionFactory;
this.context = context;
}

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -153,7 +152,7 @@ public class HttpClientCustomProxyTest
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
super(endPoint, executor, true);
this.connectionFactory = connectionFactory;
this.context = context;
}
@ -212,7 +211,7 @@ public class HttpClientCustomProxyTest
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor());
super(endPoint, connector.getExecutor(), true);
this.connectionFactory = connectionFactory;
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.client.ssl;
import static org.hamcrest.Matchers.nullValue;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.File;
@ -42,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket;
@ -80,6 +77,8 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.nullValue;
public class SslBytesServerTest extends SslBytesTest
{
private final AtomicInteger sslFills = new AtomicInteger();
@ -111,7 +110,7 @@ public class SslBytesServerTest extends SslBytesTest
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint)
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, true)
{
@Override
protected HttpParser newHttpParser()

View File

@ -49,7 +49,7 @@ public class ServerFCGIConnection extends AbstractConnection
public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
super(endPoint, connector.getExecutor());
super(endPoint, connector.getExecutor(), false);
this.connector = connector;
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
@ -163,8 +163,9 @@ public class ServerFCGIConnection extends AbstractConnection
LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel);
if (channel != null)
{
// TODO avoid creating content all the time
channel.onContent(new HttpInput.Content(buffer));
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer).flip();
channel.onContent(new HttpInput.Content(copy));
}
return false;
}

View File

@ -66,7 +66,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(FlowControl.DEFAULT_WINDOW_SIZE));
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, false, promise, listener);
}
public int getInitialSessionWindow()
@ -85,9 +85,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, boolean dispatchIO, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
super(byteBufferPool, executor, endpoint, parser, session, bufferSize, dispatchIO);
this.client = client;
this.promise = promise;
this.listener = listener;

View File

@ -38,9 +38,9 @@ public class HTTP2Connection extends AbstractConnection
private final ISession session;
private final int bufferSize;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, boolean dispatchIO)
{
super(endPoint, executor);
super(endPoint, executor, dispatchIO);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.server.Connector;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{
private boolean dispatchIO = true;
private int maxHeaderTableSize = 4096;
private int initialStreamWindow = FlowControl.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
@ -45,6 +46,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
super("h2-14");
}
public boolean isDispatchIO()
{
return dispatchIO;
}
public void setDispatchIO(boolean dispatchIO)
{
this.dispatchIO = dispatchIO;
}
public int getMaxHeaderTableSize()
{
return maxHeaderTableSize;
@ -92,7 +103,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
Parser parser = newServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, parser, session, getInputBufferSize(), listener);
endPoint, parser, session, getInputBufferSize(), isDispatchIO(), listener);
return configure(connection, connector, endPoint);
}
@ -101,13 +112,13 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
protected abstract ServerParser newServerParser(ByteBufferPool byteBufferPool, ServerParser.Listener listener);
private class HTTP2ServerConnection extends HTTP2Connection
private static class HTTP2ServerConnection extends HTTP2Connection
{
private final ServerSessionListener listener;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
private HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
this.listener = listener;
}

View File

@ -41,30 +41,23 @@ public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
public static final boolean EXECUTE_ONFILLABLE=true;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
private final long _created=System.currentTimeMillis();
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback _readCallback;
private final boolean _executeOnfillable;
private final boolean _dispatchIO;
private int _inputBufferSize=2048;
protected AbstractConnection(EndPoint endp, Executor executor)
{
this(endp,executor,EXECUTE_ONFILLABLE);
}
protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
protected AbstractConnection(EndPoint endp, Executor executor, boolean dispatchIO)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_executor = executor;
_readCallback = new ReadCallback();
_executeOnfillable=executeOnfillable;
_dispatchIO = dispatchIO;
_state.set(IDLE);
}
@ -88,10 +81,16 @@ public abstract class AbstractConnection implements Connection
{
return _executor;
}
public boolean isDispatchIO()
{
return _dispatchIO;
}
protected void failedCallback(final Callback callback, final Throwable x)
{
if (NonBlockingThread.isNonBlockingThread())
boolean dispatchFailure = isDispatchIO() && NonBlockingThread.isNonBlockingThread();
if (dispatchFailure)
{
try
{
@ -375,7 +374,7 @@ public abstract class AbstractConnection implements Connection
@Override
public void onEnter(AbstractConnection connection)
{
if (connection._executeOnfillable)
if (connection.isDispatchIO())
connection.getExecutor().execute(connection._runOnFillable);
else
connection._runOnFillable.run();

View File

@ -38,7 +38,7 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
protected NegotiatingClientConnection(EndPoint endp, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endp, executor);
super(endp, executor, false);
this.engine = sslEngine;
this.connectionFactory = connectionFactory;
this.context = context;

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@ -100,9 +99,9 @@ public class SslConnection extends AbstractConnection
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine)
{
// This connection does not execute calls to onfillable, so they will be called by the selector thread.
// onfillable does not block and will only wakeup another thread to do the actual reading and handling.
super(endPoint, executor, !EXECUTE_ONFILLABLE);
// This connection does not execute calls to onFillable(), so they will be called by the selector thread.
// onFillable() does not block and will only wakeup another thread to do the actual reading and handling.
super(endPoint, executor, false);
this._bufferPool = byteBufferPool;
this._sslEngine = sslEngine;
this._decryptedEndPoint = newDecryptedEndPoint();

View File

@ -79,7 +79,7 @@ public class SelectChannelEndPointInterestsTest
@Override
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
{
return new AbstractConnection(endPoint, getExecutor())
return new AbstractConnection(endPoint, getExecutor(), true)
{
@Override
public void onOpen()

View File

@ -18,12 +18,6 @@
package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
@ -52,6 +46,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SelectChannelEndPointTest
{
private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class);
@ -122,7 +122,7 @@ public class SelectChannelEndPointTest
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
super(endp, _threadPool, true);
}
@Override

View File

@ -94,7 +94,7 @@ public class SelectorManagerTest
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
((Callback)attachment).succeeded();
return new AbstractConnection(endpoint, executor)
return new AbstractConnection(endpoint, executor, true)
{
@Override
public void onFillable()

View File

@ -41,7 +41,7 @@ public abstract class ProxyConnection extends AbstractConnection
protected ProxyConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
{
super(endp, executor);
super(endp, executor, false);
this.bufferPool = bufferPool;
this.context = context;
}

View File

@ -60,12 +60,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private volatile ByteBuffer _chunk = null;
private final SendCallback _sendCallback = new SendCallback();
/* ------------------------------------------------------------ */
/** Get the current connection that this thread is dispatched to.
* Note that a thread may be processing a request asynchronously and
/**
* Get the current connection that this thread is dispatched to.
* Note that a thread may be processing a request asynchronously and
* thus not be dispatched to the connection.
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
* @return the current HttpConnection or null
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
*/
public static HttpConnection getCurrentConnection()
{
@ -82,17 +82,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return last;
}
public HttpConfiguration getHttpConfiguration()
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint, boolean dispatchIO)
{
return _config;
}
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
// Tell AbstractConnector executeOnFillable==true because we want the same thread that
// does the HTTP parsing to handle the request so its cache is hot
super(endPoint, connector.getExecutor(),true);
super(endPoint, connector.getExecutor(), dispatchIO);
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
@ -104,6 +96,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.debug("New HTTP Connection {}", this);
}
public HttpConfiguration getHttpConfiguration()
{
return _config;
}
protected HttpGenerator newHttpGenerator()
{
return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());

View File

@ -16,17 +16,13 @@
// ========================================================================
//
package org.eclipse.jetty.server;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.annotation.Name;
/* ------------------------------------------------------------ */
/** A Connection Factory for HTTP Connections.
* <p>Accepts connections either directly or via SSL and/or NPN chained connection factories. The accepted
* {@link HttpConnection}s are configured by a {@link HttpConfiguration} instance that is either created by
@ -35,11 +31,11 @@ import org.eclipse.jetty.util.annotation.Name;
public class HttpConnectionFactory extends AbstractConnectionFactory implements HttpConfiguration.ConnectionFactory
{
private final HttpConfiguration _config;
private boolean _dispatchIO = true;
public HttpConnectionFactory()
{
this(new HttpConfiguration());
setInputBufferSize(16384);
}
public HttpConnectionFactory(@Name("config") HttpConfiguration config)
@ -55,10 +51,19 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
return _config;
}
public boolean isDispatchIO()
{
return _dispatchIO;
}
public void setDispatchIO(boolean dispatchIO)
{
_dispatchIO = dispatchIO;
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(_config, connector, endPoint), connector, endPoint);
return configure(new HttpConnection(_config, connector, endPoint, isDispatchIO()), connector, endPoint);
}
}

View File

@ -42,7 +42,7 @@ public abstract class NegotiatingServerConnection extends AbstractConnection
protected NegotiatingServerConnection(Connector connector, EndPoint endPoint, SSLEngine engine, List<String> protocols, String defaultProtocol)
{
super(endPoint, connector.getExecutor());
super(endPoint, connector.getExecutor(), false);
this.connector = connector;
this.protocols = protocols;
this.defaultProtocol = defaultProtocol;
@ -109,9 +109,8 @@ public abstract class NegotiatingServerConnection extends AbstractConnection
ConnectionFactory connectionFactory = connector.getConnectionFactory(protocol);
if (connectionFactory == null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} application selected protocol '{}', but no correspondent {} has been configured",
this, protocol, ConnectionFactory.class.getName());
LOG.info("{} application selected protocol '{}', but no correspondent {} has been configured",
this, protocol, ConnectionFactory.class.getName());
close();
}
else
@ -131,7 +130,7 @@ public abstract class NegotiatingServerConnection extends AbstractConnection
{
// Something went bad, we need to close.
if (LOG.isDebugEnabled())
LOG.debug("{} closing on client close", this);
LOG.debug("{} detected close on client side", this);
close();
}
else

View File

@ -21,11 +21,9 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -95,7 +93,7 @@ public class ExtendedServerTest extends HttpServerTestBase
{
public ExtendedHttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
super(config,connector,endPoint);
super(config,connector,endPoint,true);
}
@Override

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.lessThan;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -28,7 +26,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -40,6 +37,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.lessThan;
public class SlowClientWithPipelinedRequestTest
{
private final AtomicInteger handles = new AtomicInteger();
@ -54,7 +53,7 @@ public class SlowClientWithPipelinedRequestTest
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint)
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint,true)
{
@Override
public void onFillable()

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -169,7 +168,7 @@ public class HttpClientCustomProxyTest
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor);
super(endPoint, executor, true);
this.connectionFactory = connectionFactory;
this.context = context;
}
@ -228,7 +227,7 @@ public class HttpClientCustomProxyTest
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor());
super(endPoint, connector.getExecutor(), true);
this.connectionFactory = connectionFactory;
}

View File

@ -67,7 +67,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
public ProxyHTTPSPDYConnection(Connector connector, HttpConfiguration config, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
{
super(config, connector, endPoint);
super(config, connector, endPoint, true);
this.version = version;
this.proxyEngineSelector = proxyEngineSelector;
this.session = new HTTPSession(version, connector);

View File

@ -22,9 +22,7 @@ import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.Executor;
@ -45,7 +43,6 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.io.ConnectPromise;
@ -82,6 +79,7 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
private Masker masker;
private SocketAddress bindAddress;
private long connectTimeout = SelectorManager.DEFAULT_CONNECT_TIMEOUT;
private boolean dispatchIO = true;
public WebSocketClient()
{
@ -184,7 +182,7 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
LOG.debug("connect websocket {} to {}",websocket,toUri);
// Grab Connection Manager
initialiseClient();
initializeClient();
ConnectionManager manager = getConnectionManager();
// Setup Driver for user provided websocket
@ -282,6 +280,11 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
LOG.debug("Stopped {}",this);
}
public boolean isDispatchIO()
{
return dispatchIO;
}
/**
* Return the number of milliseconds for a timeout of an attempted write operation.
*
@ -416,29 +419,7 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
return sslContextFactory;
}
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
for (ExtensionConfig cfg : requested)
{
Extension extension = extensionRegistry.newInstance(cfg);
if (extension == null)
{
continue;
}
if (LOG.isDebugEnabled())
LOG.debug("added {}",extension);
extensions.add(extension);
}
if (LOG.isDebugEnabled())
LOG.debug("extensions={}",extensions);
return extensions;
}
private synchronized void initialiseClient() throws IOException
private synchronized void initializeClient() throws IOException
{
if (!ShutdownThread.isRegistered(this))
{
@ -496,7 +477,16 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
this.policy.setAsyncWriteTimeout(ms);
}
/**
* @deprecated use {@link #setBindAddress(SocketAddress)} instead
*/
@Deprecated
public void setBindAdddress(SocketAddress bindAddress)
{
setBindAddress(bindAddress);
}
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
@ -531,6 +521,11 @@ public class WebSocketClient extends ContainerLifeCycle implements SessionListen
this.daemon = daemon;
}
public void setDispatchIO(boolean dispatchIO)
{
this.dispatchIO = dispatchIO;
}
public void setEventDriverFactory(EventDriverFactory factory)
{
this.eventDriverFactory = factory;

View File

@ -103,7 +103,7 @@ public class UpgradeConnection extends AbstractConnection
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor);
super(endp,executor,connectPromise.getClient().isDispatchIO());
this.connectPromise = connectPromise;
this.bufferPool = connectPromise.getClient().getBufferPool();
this.request = connectPromise.getRequest();

View File

@ -48,7 +48,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise, WebSocketPolicy policy)
{
super(endp,executor,connectPromise.getClient().getScheduler(),policy,connectPromise.getClient().getBufferPool());
super(endp,executor,connectPromise.getClient().getScheduler(),policy,connectPromise.getClient().getBufferPool(),connectPromise.getClient().isDispatchIO());
this.connectPromise = connectPromise;
this.masker = connectPromise.getMasker();
assert (this.masker != null);

View File

@ -219,9 +219,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private IOState ioState;
private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, boolean dispatchIO)
{
super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
super(endp,executor,dispatchIO);
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);

View File

@ -33,9 +33,9 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
{
private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, boolean dispatchIO)
{
super(endp,executor,scheduler,policy,bufferPool);
super(endp,executor,scheduler,policy,bufferPool,dispatchIO);
if (policy.getIdleTimeout() > 0)
{
endp.setIdleTimeout(policy.getIdleTimeout());

View File

@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -514,9 +513,9 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
// Setup websocket connection
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool);
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool, http.isDispatchIO());
extensionStack.setPolicy(driver.getPolicy());
extensionStack.configure(wsConnection.getParser());