Deprecated the AbstractConnection dispatchIO mechanism

This commit is contained in:
Greg Wilkins 2015-01-01 16:18:13 +01:00
parent 20fc880c3e
commit 3af9b145a3
35 changed files with 77 additions and 78 deletions

View File

@ -131,7 +131,6 @@ public class HttpClient extends ContainerLifeCycle
private volatile long addressResolutionTimeout = 15000;
private volatile long idleTimeout;
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
private volatile boolean strictEventOrdering = false;
private volatile HttpField encodingField;
private volatile boolean removeIdleDestinations = false;
@ -836,9 +835,11 @@ public class HttpClient extends ContainerLifeCycle
* @return true to dispatch I/O operations in a different thread, false to execute them in the selector thread
* @see #setDispatchIO(boolean)
*/
@Deprecated
public boolean isDispatchIO()
{
return dispatchIO;
// TODO this did default to true, so usage needs to be evaluated.
return false;
}
/**
@ -854,9 +855,9 @@ public class HttpClient extends ContainerLifeCycle
* @param dispatchIO true to dispatch I/O operations in a different thread,
* false to execute them in the selector thread
*/
@Deprecated
public void setDispatchIO(boolean dispatchIO)
{
this.dispatchIO = dispatchIO;
}
/**

View File

@ -85,7 +85,7 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
public Socks4ProxyConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor, false);
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
}

View File

@ -44,7 +44,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination)
{
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
super(endPoint, destination.getHttpClient().getExecutor());
this.delegate = new Delegate(destination);
this.channel = new HttpChannelOverHTTP(this);
}

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -152,9 +153,11 @@ public class HttpClientCustomProxyTest
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor, true);
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
throw new IllegalStateException("This was calling super dispatchIO=true. Needs to be reviewed");
}
@Override
@ -211,8 +214,9 @@ public class HttpClientCustomProxyTest
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor(), true);
super(endPoint, connector.getExecutor());
this.connectionFactory = connectionFactory;
throw new IllegalStateException("This was calling super dispatchIO=true. Needs to be reviewed");
}
@Override

View File

@ -111,7 +111,7 @@ public class SslBytesServerTest extends SslBytesTest
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, false)
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint)
{
@Override
protected HttpParser newHttpParser()

View File

@ -65,7 +65,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
{
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
super(endPoint, destination.getHttpClient().getExecutor());
this.destination = destination;
this.multiplexed = multiplexed;
this.flusher = new Flusher(endPoint);

View File

@ -48,7 +48,7 @@ public class ServerFCGIConnection extends AbstractConnection
public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
super(endPoint, connector.getExecutor(), false);
super(endPoint, connector.getExecutor());
this.connector = connector;
this.flusher = new Flusher(endPoint);
this.configuration = configuration;

View File

@ -66,7 +66,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
Generator generator = new Generator(byteBufferPool, 4096);
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, new HTTP2FlowControl(FlowControl.DEFAULT_WINDOW_SIZE));
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, false, promise, listener);
return new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, 8192, promise, listener);
}
public int getInitialSessionWindow()
@ -85,9 +85,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, boolean dispatchIO, Promise<Session> promise, Session.Listener listener)
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize, dispatchIO);
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
this.promise = promise;
this.listener = listener;

View File

@ -44,9 +44,9 @@ public class HTTP2Connection extends AbstractConnection
private final int bufferSize;
private final ExecutionStrategy executionStrategy; // TODO: make it pluggable from outside ?
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, boolean dispatchIO)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor, dispatchIO);
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.annotation.Name;
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
{
private boolean dispatchIO = true;
private int maxDynamicTableSize = 4096;
private int initialStreamWindow = FlowControl.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
@ -47,14 +46,15 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.httpConfiguration = httpConfiguration;
}
@Deprecated
public boolean isDispatchIO()
{
return dispatchIO;
return false;
}
@Deprecated
public void setDispatchIO(boolean dispatchIO)
{
this.dispatchIO = dispatchIO;
}
public int getMaxDynamicTableSize()
@ -109,7 +109,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
Parser parser = newServerParser(connector.getByteBufferPool(), session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), isDispatchIO(), listener);
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
return configure(connection, connector, endPoint);
}

View File

@ -40,9 +40,9 @@ public class HTTP2ServerConnection extends HTTP2Connection
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, boolean dispatchIO, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, Parser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, dispatchIO);
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
}

View File

@ -45,17 +45,15 @@ public abstract class AbstractConnection implements Connection
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback _readCallback;
private final boolean _dispatchIO;
private int _inputBufferSize=2048;
protected AbstractConnection(EndPoint endp, Executor executor, boolean dispatchIO)
protected AbstractConnection(EndPoint endp, Executor executor)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_executor = executor;
_readCallback = new ReadCallback();
_dispatchIO = dispatchIO;
}
@Override
@ -79,35 +77,29 @@ public abstract class AbstractConnection implements Connection
return _executor;
}
@Deprecated
public boolean isDispatchIO()
{
return _dispatchIO;
return false;
}
protected void failedCallback(final Callback callback, final Throwable x)
{
boolean dispatchFailure = isDispatchIO();
if (dispatchFailure)
// TODO always dispatch failure ?
try
{
try
getExecutor().execute(new Runnable()
{
getExecutor().execute(new Runnable()
@Override
public void run()
{
@Override
public void run()
{
callback.failed(x);
}
});
}
catch(RejectedExecutionException e)
{
LOG.debug(e);
callback.failed(x);
}
callback.failed(x);
}
});
}
else
catch(RejectedExecutionException e)
{
LOG.debug(e);
callback.failed(x);
}
}

View File

@ -38,7 +38,7 @@ public abstract class NegotiatingClientConnection extends AbstractConnection
protected NegotiatingClientConnection(EndPoint endp, Executor executor, SSLEngine sslEngine, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endp, executor, false);
super(endp, executor);
this.engine = sslEngine;
this.connectionFactory = connectionFactory;
this.context = context;

View File

@ -110,7 +110,7 @@ public class SslConnection extends AbstractConnection
{
// This connection does not execute calls to onFillable(), so they will be called by the selector thread.
// onFillable() does not block and will only wakeup another thread to do the actual reading and handling.
super(endPoint, executor, false);
super(endPoint, executor);
this._bufferPool = byteBufferPool;
this._sslEngine = sslEngine;
this._decryptedEndPoint = newDecryptedEndPoint();

View File

@ -79,7 +79,7 @@ public class SelectChannelEndPointInterestsTest
@Override
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
{
return new AbstractConnection(endPoint, getExecutor(), false)
return new AbstractConnection(endPoint, getExecutor())
{
@Override
public void onOpen()

View File

@ -124,7 +124,7 @@ public class SelectChannelEndPointTest
public TestConnection(EndPoint endp)
{
super(endp, _threadPool, false);
super(endp, _threadPool);
}
@Override

View File

@ -94,7 +94,7 @@ public class SelectorManagerTest
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
((Callback)attachment).succeeded();
return new AbstractConnection(endpoint, executor, false)
return new AbstractConnection(endpoint, executor)
{
@Override
public void onFillable()

View File

@ -169,7 +169,7 @@ public class SslConnectionTest
public TestConnection(EndPoint endp)
{
super(endp, _threadPool,false);
super(endp, _threadPool);
}
@Override

View File

@ -41,7 +41,7 @@ public abstract class ProxyConnection extends AbstractConnection
protected ProxyConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
{
super(endp, executor, false);
super(endp, executor);
this.bufferPool = bufferPool;
this.context = context;
}

View File

@ -88,9 +88,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return last;
}
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint, boolean dispatchIO)
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
super(endPoint, connector.getExecutor(), dispatchIO);
super(endPoint, connector.getExecutor());
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();

View File

@ -64,6 +64,6 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(_config, connector, endPoint, false), connector, endPoint);
return configure(new HttpConnection(_config, connector, endPoint), connector, endPoint);
}
}

View File

@ -47,7 +47,7 @@ public abstract class NegotiatingServerConnection extends AbstractConnection
protected NegotiatingServerConnection(Connector connector, EndPoint endPoint, SSLEngine engine, List<String> protocols, String defaultProtocol)
{
super(endPoint, connector.getExecutor(), false);
super(endPoint, connector.getExecutor());
this.connector = connector;
this.protocols = protocols;
this.defaultProtocol = defaultProtocol;

View File

@ -98,7 +98,7 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
protected ProxyConnection(EndPoint endp, Connector connector, String next)
{
super(endp,connector.getExecutor(),false);
super(endp,connector.getExecutor());
_connector=connector;
_next=next;
}

View File

@ -94,7 +94,7 @@ public class ExtendedServerTest extends HttpServerTestBase
{
public ExtendedHttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
{
super(config,connector,endPoint,false);
super(config,connector,endPoint);
}
@Override

View File

@ -53,7 +53,7 @@ public class SlowClientWithPipelinedRequestTest
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint,false)
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint)
{
@Override
public void onFillable()

View File

@ -52,7 +52,7 @@ public class SPDYClientConnectionFactory implements ClientConnectionFactory
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(byteBufferPool, compressionFactory.newCompressor());
SPDYConnection connection = new ClientSPDYConnection(endPoint, byteBufferPool, parser, factory, client.isDispatchIO());
SPDYConnection connection = new ClientSPDYConnection(endPoint, byteBufferPool, parser, factory);
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
@ -75,9 +75,9 @@ public class SPDYClientConnectionFactory implements ClientConnectionFactory
{
private final Factory factory;
public ClientSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory, boolean dispatchIO)
public ClientSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
{
super(endPoint, bufferPool, parser, factory.getExecutor(), dispatchIO);
super(endPoint, bufferPool, parser, factory.getExecutor());
this.factory = factory;
}

View File

@ -44,12 +44,12 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id
private volatile ISession session;
private volatile boolean idle = false;
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO)
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
{
this(endPoint, bufferPool, parser, executor, dispatchIO, 8192);
this(endPoint, bufferPool, parser, executor, 8192);
}
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, boolean dispatchIO, int bufferSize)
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor, int bufferSize)
{
// Since SPDY is multiplexed, onFillable() must never block while calling application code. In fact,
// the SPDY code always dispatches to a new thread when calling application code,
@ -57,7 +57,7 @@ public class SPDYConnection extends AbstractConnection implements Controller, Id
// The IO operation (read, parse, etc.) will not block and will be fast in almost all cases.
// Big uploads to a server, however, might occupy the Selector thread for a long time and
// therefore starve other connections, so by default dispatchIO is true.
super(endPoint, executor, dispatchIO);
super(endPoint, executor);
this.bufferPool = bufferPool;
this.parser = parser;
onIdle(true);

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -168,9 +169,10 @@ public class HttpClientCustomProxyTest
public CAFEBABEConnection(EndPoint endPoint, Executor executor, ClientConnectionFactory connectionFactory, Map<String, Object> context)
{
super(endPoint, executor, true);
super(endPoint, executor);
this.connectionFactory = connectionFactory;
this.context = context;
throw new IllegalStateException("This was calling super dispatchIO=true. Needs to be reviewed");
}
@Override
@ -227,8 +229,10 @@ public class HttpClientCustomProxyTest
public CAFEBABEServerConnection(Connector connector, EndPoint endPoint, org.eclipse.jetty.server.ConnectionFactory connectionFactory)
{
super(endPoint, connector.getExecutor(), true);
super(endPoint, connector.getExecutor());
this.connectionFactory = connectionFactory;
throw new IllegalStateException("This was calling super dispatchIO=true. Needs to be reviewed");
}
@Override

View File

@ -68,7 +68,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
public ProxyHTTPSPDYConnection(Connector connector, HttpConfiguration config, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
{
super(config, connector, endPoint, false);
super(config, connector, endPoint);
this.version = version;
this.proxyEngineSelector = proxyEngineSelector;
this.session = new HTTPSession(version, connector);

View File

@ -86,8 +86,7 @@ public class SPDYServerConnectionFactory extends AbstractConnectionFactory
Generator generator = new Generator(connector.getByteBufferPool(), compressionFactory.newCompressor());
ServerSessionFrameListener listener = provideServerSessionFrameListener(connector, endPoint);
SPDYConnection connection = new ServerSPDYConnection(connector, endPoint, parser, listener,
isDispatchIO(), getInputBufferSize());
SPDYConnection connection = new ServerSPDYConnection(connector, endPoint, parser, listener, getInputBufferSize());
FlowControlStrategy flowControlStrategy = newFlowControlStrategy(version);
@ -180,10 +179,9 @@ public class SPDYServerConnectionFactory extends AbstractConnectionFactory
private final AtomicBoolean connected = new AtomicBoolean();
private ServerSPDYConnection(Connector connector, EndPoint endPoint, Parser parser,
ServerSessionFrameListener listener, boolean dispatchIO, int bufferSize)
ServerSessionFrameListener listener, int bufferSize)
{
super(endPoint, connector.getByteBufferPool(), parser, connector.getExecutor(),
dispatchIO, bufferSize);
super(endPoint, connector.getByteBufferPool(), parser, connector.getExecutor(), bufferSize);
this.listener = listener;
}

View File

@ -103,7 +103,7 @@ public class UpgradeConnection extends AbstractConnection
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor,connectPromise.getClient().isDispatchIO());
super(endp,executor);
this.connectPromise = connectPromise;
this.bufferPool = connectPromise.getClient().getBufferPool();
this.request = connectPromise.getRequest();

View File

@ -48,7 +48,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise, WebSocketPolicy policy)
{
super(endp,executor,connectPromise.getClient().getScheduler(),policy,connectPromise.getClient().getBufferPool(),connectPromise.getClient().isDispatchIO());
super(endp,executor,connectPromise.getClient().getScheduler(),policy,connectPromise.getClient().getBufferPool());
this.connectPromise = connectPromise;
this.masker = connectPromise.getMasker();
assert (this.masker != null);

View File

@ -219,9 +219,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private IOState ioState;
private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, boolean dispatchIO)
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor,dispatchIO);
super(endp,executor);
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);

View File

@ -33,9 +33,9 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
{
private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, boolean dispatchIO)
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor,scheduler,policy,bufferPool,dispatchIO);
super(endp,executor,scheduler,policy,bufferPool);
if (policy.getIdleTimeout() > 0)
{
endp.setIdleTimeout(policy.getIdleTimeout());

View File

@ -515,7 +515,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
// Setup websocket connection
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool, http.isDispatchIO());
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool);
extensionStack.setPolicy(driver.getPolicy());
extensionStack.configure(wsConnection.getParser());