jetty-9 SelectorManager refactorings.

This commit is contained in:
Simone Bordet 2012-05-11 19:30:02 +02:00
parent 835961dca8
commit 4b8f6b8413
9 changed files with 504 additions and 906 deletions

View File

@ -15,25 +15,21 @@ package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout.Task;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* An Endpoint that can be scheduled by {@link SelectorManager}. * An Endpoint that can be scheduled by {@link SelectorManager}.
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableAsyncEndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.SelectableAsyncEndPoint
{ {
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
@ -48,7 +44,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
/** The desired value for {@link SelectionKey#interestOps()} */ /** The desired value for {@link SelectionKey#interestOps()} */
private int _interestOps; private int _interestOps;
/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ /** true if {@link SelectSet#destroyEndPoint(SelectorManager.SelectableAsyncEndPoint)} has not been called */
private boolean _open; private boolean _open;
private volatile boolean _idlecheck; private volatile boolean _idlecheck;
@ -146,7 +142,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
} }
finally finally
{ {
doUpdateKey(); _selectSet.submit(this);
_selected = false; _selected = false;
} }
} }
@ -156,18 +152,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
_writeFlusher.completeWrite(); _writeFlusher.completeWrite();
} }
/* ------------------------------------------------------------ */
public void cancelTimeout(Task task)
{
getSelectSet().cancelTimeout(task);
}
/* ------------------------------------------------------------ */
public void scheduleTimeout(Task task, long timeoutMs)
{
getSelectSet().scheduleTimeout(task,timeoutMs);
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void setCheckForIdle(boolean check) public void setCheckForIdle(boolean check)
@ -274,13 +258,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
if (_interestOps != current_ops && !_changing) if (_interestOps != current_ops && !_changing)
{ {
_changing = true; _changing = true;
_selectSet.addChange(this); _selectSet.submit(this);
_selectSet.wakeup();
} }
} }
} }
} }
@Override
public void run()
{
doUpdateKey();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey

View File

@ -82,7 +82,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
SSLEngine engine = __sslCtxFactory.newSslEngine(); SSLEngine engine = __sslCtxFactory.newSslEngine();
engine.setUseClientMode(true); engine.setUseClientMode(true);

View File

@ -36,14 +36,8 @@ public class SelectChannelEndPointTest
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
private int maxIdleTimeout = 600000; // TODO: use smaller value private int maxIdleTimeout = 600000; // TODO: use smaller value
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager(_threadPool)
{ {
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override @Override
protected void endPointClosed(AsyncEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)
{ {
@ -244,7 +238,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
// Write client to server // Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
@ -290,10 +284,8 @@ public class SelectChannelEndPointTest
if (++i == 10) if (++i == 10)
Assert.fail(); Assert.fail();
} }
} }
@Test @Test
public void testShutdown() throws Exception public void testShutdown() throws Exception
{ {
@ -304,7 +296,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
// Write client to server // Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
@ -357,7 +349,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
OutputStream clientOutputStream = client.getOutputStream(); OutputStream clientOutputStream = client.getOutputStream();
InputStream clientInputStream = client.getInputStream(); InputStream clientInputStream = client.getInputStream();
@ -411,7 +403,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
// Write client to server // Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
@ -459,7 +451,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
// Write client to server // Write client to server
clientOutputStream.write("HelloWorld".getBytes("UTF-8")); clientOutputStream.write("HelloWorld".getBytes("UTF-8"));
@ -515,7 +507,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
int writes = 100000; int writes = 100000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
@ -602,7 +594,7 @@ public class SelectChannelEndPointTest
SocketChannel server = _connector.accept(); SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.register(server); _manager.accept(server);
// Write client to server // Write client to server
_writeCount=10000; _writeCount=10000;

View File

@ -14,27 +14,16 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletRequest;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.StandardByteBufferPool; import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.server.Connector.Statistics;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool;
/** /**
* Abstract Connector implementation. This abstract implementation of the Connector interface provides: * Abstract Connector implementation. This abstract implementation of the Connector interface provides:
@ -406,19 +395,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{ {
accept(_acceptor); accept(_acceptor);
} }
catch (EofException e) catch (IOException | InterruptedException e)
{ {
LOG.ignore(e); LOG.ignore(e);
} }
catch (IOException e)
{
LOG.ignore(e);
}
catch (InterruptedException x)
{
// Connector has been stopped
LOG.ignore(x);
}
catch (Throwable e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
@ -457,6 +437,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionOpened(AsyncConnection connection) protected void connectionOpened(AsyncConnection connection)
{ {
// TODO: should we dispatch the call to onOpen() to another thread ?
connection.onOpen(); connection.onOpen();
_stats.connectionOpened(); _stats.connectionOpened();
} }
@ -472,34 +453,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void connectionClosed(AsyncConnection connection) protected void connectionClosed(AsyncConnection connection)
{ {
// TODO: should we dispatch the call to onClose() to another thread ?
connection.onClose();
long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
// TODO: remove casts to HttpConnection
int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0;
_stats.connectionClosed(duration,requests,requests); _stats.connectionClosed(duration,requests,requests);
}
/* ------------------------------------------------------------ */
/**
* @return the acceptorPriority
*/
public int getAcceptorPriorityOffset()
{
return _acceptorPriorityOffset;
}
/* ------------------------------------------------------------ */
/**
* Set the priority offset of the acceptor threads. The priority is adjusted by this amount (default 0) to either favour the acceptance of new threads and
* newly active connections or to favour the handling of already dispatched connections.
*
* @param offset
* the amount to alter the priority of the acceptor threads.
*/
public void setAcceptorPriorityOffset(int offset)
{
_acceptorPriorityOffset = offset;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -520,17 +480,4 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{ {
_reuseAddress = reuseAddress; _reuseAddress = reuseAddress;
} }
/* ------------------------------------------------------------ */
void updateNotEqual(AtomicLong valueHolder, long compare, long value)
{
long oldValue = valueHolder.get();
while (compare != oldValue)
{
if (valueHolder.compareAndSet(oldValue,value))
break;
oldValue = valueHolder.get();
}
}
} }

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.io.SelectorManager.SelectSet;
import org.eclipse.jetty.server.AbstractHttpConnector; import org.eclipse.jetty.server.AbstractHttpConnector;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.thread.ThreadPool;
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
/** /**
@ -60,11 +59,10 @@ import org.eclipse.jetty.util.thread.ThreadPool;
*/ */
public class SelectChannelConnector extends AbstractHttpConnector public class SelectChannelConnector extends AbstractHttpConnector
{ {
private SelectorManager _manager;
protected ServerSocketChannel _acceptChannel; protected ServerSocketChannel _acceptChannel;
private int _localPort=-1; private int _localPort=-1;
private final SelectorManager _manager = new ConnectorSelectorManager();
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
/** /**
* Constructor. * Constructor.
@ -93,7 +91,7 @@ public class SelectChannelConnector extends AbstractHttpConnector
channel.configureBlocking(false); channel.configureBlocking(false);
Socket socket = channel.socket(); Socket socket = channel.socket();
configure(socket); configure(socket);
_manager.register(channel); _manager.accept(channel);
} }
} }
@ -187,10 +185,10 @@ public class SelectChannelConnector extends AbstractHttpConnector
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
_manager.setSelectSets(getAcceptors());
_manager.setMaxIdleTime(getMaxIdleTime());
super.doStart(); super.doStart();
_manager = new ConnectorSelectorManager(findExecutor(), getAcceptors());
_manager.setMaxIdleTime(getMaxIdleTime());
_manager.start();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -204,6 +202,7 @@ public class SelectChannelConnector extends AbstractHttpConnector
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected void endPointClosed(AsyncEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)
{ {
endpoint.onClose();
connectionClosed(endpoint.getAsyncConnection()); connectionClosed(endpoint.getAsyncConnection());
} }
@ -219,12 +218,9 @@ public class SelectChannelConnector extends AbstractHttpConnector
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private final class ConnectorSelectorManager extends SelectorManager private final class ConnectorSelectorManager extends SelectorManager
{ {
@Override private ConnectorSelectorManager(Executor executor, int selectSets)
public boolean dispatch(Runnable task)
{ {
Executor executor = findExecutor(); super(executor, selectSets);
executor.execute(task);
return true;
} }
@Override @Override
@ -257,7 +253,5 @@ public class SelectChannelConnector extends AbstractHttpConnector
{ {
return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
} }
} }
} }

View File

@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -104,7 +103,7 @@ public class SPDYClient
SessionPromise result = new SessionPromise(this, listener); SessionPromise result = new SessionPromise(this, listener);
channel.connect(address); channel.connect(address);
factory.selector.register(channel, result); factory.selector.connect(channel, result);
return result; return result;
} }
@ -209,7 +208,7 @@ public class SPDYClient
if (sslContextFactory != null) if (sslContextFactory != null)
addBean(sslContextFactory); addBean(sslContextFactory);
selector = new ClientSelectorManager(); selector = new ClientSelectorManager(threadPool);
addBean(selector); addBean(selector);
factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory()); factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory());
@ -267,18 +266,9 @@ public class SPDYClient
private class ClientSelectorManager extends SelectorManager private class ClientSelectorManager extends SelectorManager
{ {
@Override private ClientSelectorManager(Executor executor)
public boolean dispatch(Runnable task)
{ {
try super(executor);
{
threadPool.execute(task);
return true;
}
catch (RejectedExecutionException x)
{
return false;
}
} }
@Override @Override
@ -326,7 +316,7 @@ public class SPDYClient
final AtomicReference<AsyncEndPoint> sslEndPointRef = new AtomicReference<>(); final AtomicReference<AsyncEndPoint> sslEndPointRef = new AtomicReference<>();
final AtomicReference<Object> attachmentRef = new AtomicReference<>(attachment); final AtomicReference<Object> attachmentRef = new AtomicReference<>(attachment);
SSLEngine engine = client.newSSLEngine(sslContextFactory, channel); SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(engine, endPoint) SslConnection sslConnection = new SslConnection(bufferPool, threadPool, endPoint, engine)
{ {
@Override @Override
public void onClose() public void onClose()

View File

@ -0,0 +1,13 @@
package org.eclipse.jetty.util;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface Name
{
String value();
}

View File

@ -33,11 +33,11 @@ public abstract class AbstractLifeCycle implements LifeCycle
public static final String STOPPING="STOPPING"; public static final String STOPPING="STOPPING";
public static final String RUNNING="RUNNING"; public static final String RUNNING="RUNNING";
private final CopyOnWriteArrayList<LifeCycle.Listener> _listeners=new CopyOnWriteArrayList<LifeCycle.Listener>();
private final Object _lock = new Object(); private final Object _lock = new Object();
private final int __FAILED = -1, __STOPPED = 0, __STARTING = 1, __STARTED = 2, __STOPPING = 3; private final int __FAILED = -1, __STOPPED = 0, __STARTING = 1, __STARTED = 2, __STOPPING = 3;
private volatile int _state = __STOPPED; private volatile int _state = __STOPPED;
private long _stopTimeout = 10000;
protected final CopyOnWriteArrayList<LifeCycle.Listener> _listeners=new CopyOnWriteArrayList<LifeCycle.Listener>();
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
@ -59,12 +59,7 @@ public abstract class AbstractLifeCycle implements LifeCycle
doStart(); doStart();
setStarted(); setStarted();
} }
catch (Exception e) catch (Throwable e)
{
setFailed(e);
throw e;
}
catch (Error e)
{ {
setFailed(e); setFailed(e);
throw e; throw e;
@ -84,12 +79,7 @@ public abstract class AbstractLifeCycle implements LifeCycle
doStop(); doStop();
setStopped(); setStopped();
} }
catch (Exception e) catch (Throwable e)
{
setFailed(e);
throw e;
}
catch (Error e)
{ {
setFailed(e); setFailed(e);
throw e; throw e;
@ -201,6 +191,16 @@ public abstract class AbstractLifeCycle implements LifeCycle
listener.lifeCycleFailure(this,th); listener.lifeCycleFailure(this,th);
} }
public long getStopTimeout()
{
return _stopTimeout;
}
public void setStopTimeout(long stopTimeout)
{
this._stopTimeout = stopTimeout;
}
public static abstract class AbstractLifeCycleListener implements LifeCycle.Listener public static abstract class AbstractLifeCycleListener implements LifeCycle.Listener
{ {
public void lifeCycleFailure(LifeCycle event, Throwable cause) {} public void lifeCycleFailure(LifeCycle event, Throwable cause) {}