Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Joakim Erdfelt 2012-07-25 09:16:10 -07:00
commit e46cc961ac
35 changed files with 883 additions and 886 deletions

View File

@ -25,13 +25,13 @@
<Item>org.eclipse.jetty.servlet.DefaultServlet</Item> <Item>org.eclipse.jetty.servlet.DefaultServlet</Item>
</Array> </Array>
<Call name="addLifeCycle"> <Call name="addBean">
<Arg> <Arg>
<New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager"> <New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager">
<Set name="contexts"> <Set name="contexts">
<Ref id="Contexts" /> <Ref id="Contexts" />
</Set> </Set>
<Ref id="DeploymentManager"> <Ref id="DeploymentManager">
<Call name="addLifeCycleBinding"> <Call name="addLifeCycleBinding">
<Arg> <Arg>
@ -41,7 +41,7 @@
</Arg> </Arg>
</Call> </Call>
</Ref> </Ref>
<!-- Providers of Apps --> <!-- Providers of Apps -->
<Set name="appProviders"> <Set name="appProviders">
<Array type="org.eclipse.jetty.deploy.AppProvider"> <Array type="org.eclipse.jetty.deploy.AppProvider">

View File

@ -3,13 +3,13 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server"> <Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addLifeCycle"> <Call name="addBean">
<Arg> <Arg>
<New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager"> <New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager">
<Set name="contexts"> <Set name="contexts">
<Ref id="Contexts" /> <Ref id="Contexts" />
</Set> </Set>
<!-- Providers of Apps --> <!-- Providers of Apps -->
<Set name="appProviders"> <Set name="appProviders">
<Array type="org.eclipse.jetty.deploy.AppProvider"> <Array type="org.eclipse.jetty.deploy.AppProvider">

View File

@ -7,7 +7,7 @@ public abstract class AbstractEndPoint implements EndPoint
private final long _created=System.currentTimeMillis(); private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local; private final InetSocketAddress _local;
private final InetSocketAddress _remote; private final InetSocketAddress _remote;
private volatile long _maxIdleTime; private volatile long _idleTimeout;
private volatile long _idleTimestamp=System.currentTimeMillis(); private volatile long _idleTimestamp=System.currentTimeMillis();
@ -27,42 +27,37 @@ public abstract class AbstractEndPoint implements EndPoint
@Override @Override
public long getIdleTimeout() public long getIdleTimeout()
{ {
return _maxIdleTime; return _idleTimeout;
} }
@Override @Override
public void setIdleTimeout(long timeMs) public void setIdleTimeout(long idleTimeout)
{ {
_maxIdleTime=timeMs; _idleTimeout = idleTimeout;
} }
/* ------------------------------------------------------------ */
@Override @Override
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {
return _local; return _local;
} }
/* ------------------------------------------------------------ */
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {
return _remote; return _remote;
} }
/* ------------------------------------------------------------ */
public long getIdleTimestamp() public long getIdleTimestamp()
{ {
return _idleTimestamp; return _idleTimestamp;
} }
/* ------------------------------------------------------------ */
protected void notIdle() protected void notIdle()
{ {
_idleTimestamp=System.currentTimeMillis(); _idleTimestamp=System.currentTimeMillis();
} }
/* ------------------------------------------------------------ */
@Override @Override
public String toString() public String toString()
{ {
@ -74,5 +69,4 @@ public abstract class AbstractEndPoint implements EndPoint
isOpen(), isOpen(),
isOutputShutdown()); isOutputShutdown());
} }
} }

View File

@ -3,35 +3,20 @@ package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint, Runnable
{ {
private static final int TICK=Integer.getInteger("org.eclipse.jetty.io.AsyncByteArrayEndPoint.TICK",100); private static final Logger LOG = Log.getLogger(AsyncByteArrayEndPoint.class);
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final ScheduledExecutorService _timer;
private AsyncConnection _connection;
private final Runnable _checkTimeout=new Runnable()
{
@Override
public void run()
{
if (isOpen())
{
checkTimeout(System.currentTimeMillis());
if (isOpen())
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
}
};
private final ReadInterest _readInterest = new ReadInterest() private final ReadInterest _readInterest = new ReadInterest()
{ {
@ -40,10 +25,9 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
{ {
if (_closed) if (_closed)
throw new ClosedChannelException(); throw new ClosedChannelException();
return _in==null || BufferUtil.hasContent(_in); return _in == null || BufferUtil.hasContent(_in);
} }
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
@ -52,33 +36,80 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
// Don't need to do anything here as takeOutput does the signalling. // Don't need to do anything here as takeOutput does the signalling.
} }
}; };
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
public AsyncByteArrayEndPoint(ScheduledExecutorService timer) public AsyncByteArrayEndPoint(ScheduledExecutorService scheduler, long idleTimeout)
{ {
super(); _scheduler = scheduler;
_timer=timer; setIdleTimeout(idleTimeout);
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
} }
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, byte[] input, int outputSize) public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, byte[] input, int outputSize)
{ {
super(input,outputSize); super(input, outputSize);
_timer=timer; _scheduler = timer;
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); setIdleTimeout(idleTimeout);
} }
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, String input, int outputSize) public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, String input, int outputSize)
{ {
super(input,outputSize); super(input, outputSize);
_timer=timer; _scheduler = timer;
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); setIdleTimeout(idleTimeout);
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
private void scheduleIdleTimeout(long delay)
{
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(this, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
@Override
public void run()
{
if (isOpen())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
if (idleLeft < 0)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
}
} }
@Override @Override
public void setInput(ByteBuffer in) public void setInput(ByteBuffer in)
{ {
super.setInput(in); super.setInput(in);
if (in==null || BufferUtil.hasContent(in)) if (in == null || BufferUtil.hasContent(in))
_readInterest.readable(); _readInterest.readable();
} }
@ -126,36 +157,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
@Override @Override
public void setAsyncConnection(AsyncConnection connection) public void setAsyncConnection(AsyncConnection connection)
{ {
_connection=connection; _connection = connection;
}
@Override
public void checkTimeout(long now)
{
synchronized (this)
{
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
if (idleTimestamp != 0 && idleTimeout > 0)
{
long idleForMs = now - idleTimestamp;
if (idleForMs > idleTimeout)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("idle "+idleForMs+"ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
}
} }
@Override @Override

View File

@ -21,30 +21,30 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback; import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
/* ------------------------------------------------------------ */ /**
/**Asynchronous End Point * <p>{@link AsyncEndPoint} add asynchronous scheduling methods to {@link EndPoint}.</p>
* <p> * <p>The design of these has been influenced by NIO.2 Futures and Completion
* This extension of EndPoint provides asynchronous scheduling methods. * handlers, but does not use those actual interfaces because they have
* The design of these has been influenced by NIO.2 Futures and Completion * some inefficiencies.</p>
* handlers, but does not use those actual interfaces because: they have * <p>This class will frequently be used in conjunction with some of the utility
* some inefficiencies.
* <p>
* This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and * implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link ExecutorCallback}. Examples are: * {@link ExecutorCallback}. Examples are:</p>
*
* <h3>Blocking Read</h3> * <h3>Blocking Read</h3>
* A FutureCallback can be used to block until an endpoint is ready to be filled * <p>A FutureCallback can be used to block until an endpoint is ready to be filled
* from: * from:
* <blockquote><pre> * <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>(); * FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.fillInterested("ContextObj",future); * endpoint.fillInterested("ContextObj",future);
* ... * ...
* String context = future.get(); // This blocks * String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);</pre></blockquote> * int filled=endpoint.fill(mybuffer);
* </pre></blockquote></p>
*
* <h3>Dispatched Read</h3> * <h3>Dispatched Read</h3>
* By using a different callback, the read can be done asynchronously in its own dispatched thread: * <p>By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre> * <blockquote><pre>
* endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor) * endpoint.fillInterested("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* { * {
* public void onCompleted(String context) * public void onCompleted(String context)
* { * {
@ -52,25 +52,25 @@ import org.eclipse.jetty.util.FutureCallback;
* ... * ...
* } * }
* public void onFailed(String context,Throwable cause) {...} * public void onFailed(String context,Throwable cause) {...}
* });</pre></blockquote> * });
* The executor callback can also be customized to not dispatch in some circumstances when * </pre></blockquote></p>
* it knows it can use the callback thread and does not need to dispatch. * <p>The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.</p>
* *
* <h3>Blocking Write</h3> * <h3>Blocking Write</h3>
* The write contract is that the callback complete is not called until all data has been * <p>The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like: * written or there is a failure. For blocking this looks like:
*
* <blockquote><pre> * <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>(); * FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer); * endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks * String context = future.get(); // This blocks
* </pre></blockquote> * </pre></blockquote></p>
* *
* <h3>Dispatched Write</h3> * <h3>Dispatched Write</h3>
* Note also that multiple buffers may be passed in write so that gather writes * <p>Note also that multiple buffers may be passed in write so that gather writes
* can be done: * can be done:
* <blockquote><pre> * <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback<String>(executor) * endpoint.write("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* { * {
* public void onCompleted(String context) * public void onCompleted(String context)
* { * {
@ -78,45 +78,52 @@ import org.eclipse.jetty.util.FutureCallback;
* ... * ...
* } * }
* public void onFailed(String context,Throwable cause) {...} * public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);</pre></blockquote> * },headerBuffer,contentBuffer);
* * </pre></blockquote></p>
*/ */
public interface AsyncEndPoint extends EndPoint public interface AsyncEndPoint extends EndPoint
{ {
/** Asynchronous a fillable notification. /**
* <p> * <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
* This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF. *
* @param context Context to return via the callback * @param context the context to return via the callback
* @param callback The callback to call when an error occurs or we are readable. * @param callback the callback to call when an error occurs or we are readable.
* @throws ReadPendingException if another read operation is concurrent. * @throws ReadPendingException if another read operation is concurrent.
*/ */
<C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException; <C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException;
/** Asynchronous write operation. /**
* <p> * <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data * all the data has been flushed or an error occurs.</p>
* has been flushed or an error occurs. *
* @param context Context to return via the callback * @param context the context to return via the callback
* @param callback The callback to call when an error occurs or we are readable. * @param callback the callback to call when an error occurs or the write completed.
* @param buffers One or more {@link ByteBuffer}s that will be flushed. * @param buffers one or more {@link ByteBuffer}s that will be flushed.
* @throws WritePendingException if another write operation is concurrent. * @throws WritePendingException if another write operation is concurrent.
*/ */
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException; <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException;
/** /**
* @return Timestamp in ms since epoch of when the last data was * @return the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* filled or flushed from this endpoint. * @see #setAsyncConnection(AsyncConnection)
*/ */
long getIdleTimestamp();
AsyncConnection getAsyncConnection(); AsyncConnection getAsyncConnection();
/**
* @param connection the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* @see #getAsyncConnection()
*/
void setAsyncConnection(AsyncConnection connection); void setAsyncConnection(AsyncConnection connection);
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is opened.</p>
* @see #onClose()
*/
void onOpen(); void onOpen();
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is close.</p>
* @see #onOpen()
*/
void onClose(); void onClose();
void checkTimeout(long now);
} }

View File

@ -119,10 +119,10 @@ public interface EndPoint
long getIdleTimeout(); long getIdleTimeout();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Set the max idle time. /** Set the idle timeout.
* @param timeMs the max idle time in MS. Timeout <= 0 implies an infinite timeout * @param idleTimeout the idle timeout in MS. Timeout <= 0 implies an infinite timeout
*/ */
void setIdleTimeout(long timeMs); void setIdleTimeout(long idleTimeout);

View File

@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -28,9 +29,9 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
private final List<NetworkTrafficListener> listeners; private final List<NetworkTrafficListener> listeners;
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException
{ {
super(channel, selectSet, key, idleTimeout); super(channel, selectSet, key, scheduler, idleTimeout);
this.listeners = listeners; this.listeners = listeners;
} }

View File

@ -17,8 +17,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector; import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -32,21 +36,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{ {
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final SelectorManager.ManagedSelector _selector; private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final SelectionKey _key; private final Runnable _idleTask = new Runnable()
{
/** @Override
* The desired value for {@link SelectionKey#interestOps()} public void run()
*/ {
private volatile int _interestOps; checkIdleTimeout();
}
};
/** /**
* true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called * true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called
*/ */
private final AtomicBoolean _open = new AtomicBoolean(); private final AtomicBoolean _open = new AtomicBoolean();
private volatile AsyncConnection _connection;
private final ReadInterest _readInterest = new ReadInterest() private final ReadInterest _readInterest = new ReadInterest()
{ {
@Override @Override
@ -56,7 +58,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
return false; return false;
} }
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
@ -65,15 +66,39 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
updateKey(SelectionKey.OP_WRITE, true); updateKey(SelectionKey.OP_WRITE, true);
} }
}; };
private final SelectorManager.ManagedSelector _selector;
private final SelectionKey _key;
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
private volatile int _interestOps;
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, long idleTimeout) throws IOException public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout) throws IOException
{ {
super(channel); super(channel);
_selector = selector; _selector = selector;
_key = key; _key = key;
_scheduler = scheduler;
setIdleTimeout(idleTimeout); setIdleTimeout(idleTimeout);
} }
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
private void scheduleIdleTimeout(long delay)
{
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(_idleTask, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
@Override @Override
public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{ {
@ -111,32 +136,32 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_writeFlusher.completeWrite(); _writeFlusher.completeWrite();
} }
@Override private void checkIdleTimeout()
public void checkTimeout(long now)
{ {
synchronized (this) if (isOpen())
{ {
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{ {
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
if (idleTimestamp != 0 && idleTimeout > 0) if (idleTimestamp != 0 && idleTimeout > 0)
{ {
long idleForMs = now - idleTimestamp; if (idleLeft < 0)
if (idleForMs > idleTimeout)
{ {
if (isOutputShutdown()) if (isOutputShutdown())
close(); close();
notIdle(); notIdle();
TimeoutException timeout = new TimeoutException("idle " + idleForMs + "ms"); TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout); _readInterest.failed(timeout);
_writeFlusher.failed(timeout); _writeFlusher.failed(timeout);
} }
} }
} }
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
} }
} }

View File

@ -25,17 +25,15 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
@ -54,7 +52,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private final ManagedSelector[] _selectors; private final ManagedSelector[] _selectors;
private volatile long _selectorIndex; private volatile long _selectorIndex;
private volatile long _idleCheckPeriod;
protected SelectorManager() protected SelectorManager()
{ {
@ -64,23 +61,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected SelectorManager(@Name(value="selectors") int selectors) protected SelectorManager(@Name(value="selectors") int selectors)
{ {
_selectors = new ManagedSelector[selectors]; _selectors = new ManagedSelector[selectors];
setIdleCheckPeriod(1000);
}
/**
* @return the period, in milliseconds, a background thread checks for idle expiration
*/
public long getIdleCheckPeriod()
{
return _idleCheckPeriod;
}
/**
* @param idleCheckPeriod the period, in milliseconds, a background thread checks for idle expiration
*/
public void setIdleCheckPeriod(long idleCheckPeriod)
{
_idleCheckPeriod = idleCheckPeriod;
} }
/** /**
@ -145,7 +125,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_selectors[i] = selectSet; _selectors[i] = selectSet;
selectSet.start(); selectSet.start();
execute(selectSet); execute(selectSet);
execute(new Expirer());
} }
} }
@ -251,37 +230,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
AggregateLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); AggregateLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
} }
/**
* <p>The task that performs a periodic idle check of the connections managed by the selectors.</p>
* @see #getIdleCheckPeriod()
*/
private class Expirer implements Runnable
{
@Override
public void run()
{
while (isRunning())
{
for (ManagedSelector selector : _selectors)
if (selector != null)
selector.timeoutCheck();
sleep(getIdleCheckPeriod());
}
}
private void sleep(long delay)
{
try
{
Thread.sleep(delay);
}
catch (InterruptedException x)
{
LOG.ignore(x);
}
}
}
/** /**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p> * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
@ -291,7 +239,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{ {
private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>(); private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
private final Set<AsyncEndPoint> _endPoints = Collections.newSetFromMap(new ConcurrentHashMap<AsyncEndPoint, Boolean>());
private final int _id; private final int _id;
private Selector _selector; private Selector _selector;
private Thread _thread; private Thread _thread;
@ -517,7 +464,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{ {
AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey); AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey);
_endPoints.add(endPoint);
endPointOpened(endPoint); endPointOpened(endPoint);
AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment()); AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setAsyncConnection(asyncConnection); endPoint.setAsyncConnection(asyncConnection);
@ -529,11 +475,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
public void destroyEndPoint(AsyncEndPoint endPoint) public void destroyEndPoint(AsyncEndPoint endPoint)
{ {
LOG.debug("Destroyed {}", endPoint); LOG.debug("Destroyed {}", endPoint);
_endPoints.remove(endPoint);
endPoint.getAsyncConnection().onClose(); endPoint.getAsyncConnection().onClose();
endPointClosed(endPoint); endPointClosed(endPoint);
} }
@Override @Override
public String dump() public String dump()
{ {
@ -597,15 +542,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
} }
private void timeoutCheck()
{
// We cannot use the _selector.keys() because the returned Set is not thread
// safe so it may be modified by the selector thread while we iterate here.
long now = System.currentTimeMillis();
for (AsyncEndPoint endPoint : _endPoints)
endPoint.checkTimeout(now);
}
private class DumpKeys implements Runnable private class DumpKeys implements Runnable
{ {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -179,11 +179,6 @@ public class SslConnection extends AbstractAsyncConnection
{ {
} }
@Override
public void checkTimeout(long now)
{
}
private final Callback<Void> _writeCallback = new Callback<Void>() private final Callback<Void> _writeCallback = new Callback<Void>()
{ {

View File

@ -1,16 +1,9 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -20,85 +13,91 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AsyncByteArrayEndPointTest public class AsyncByteArrayEndPointTest
{ {
ScheduledExecutorService _timer; private ScheduledExecutorService _scheduler;
@Before @Before
public void before() public void before()
{ {
_timer = new ScheduledThreadPoolExecutor(1); _scheduler = Executors.newSingleThreadScheduledExecutor();
} }
@After @After
public void after() public void after()
{ {
_timer.shutdownNow(); _scheduler.shutdownNow();
} }
@Test @Test
public void testReadable() throws Exception public void testReadable() throws Exception
{ {
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000);
endp.setInput("test input"); endp.setInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<String> fcb = new FutureCallback<>(); FutureCallback<String> fcb = new FutureCallback<>();
endp.fillInterested("CTX",fcb); endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals(10,endp.fill(buffer)); assertEquals(10, endp.fill(buffer));
assertEquals("test input",BufferUtil.toString(buffer)); assertEquals("test input", BufferUtil.toString(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.fillInterested("CTX",fcb); endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0,endp.fill(buffer)); assertEquals(0, endp.fill(buffer));
endp.setInput(" more"); endp.setInput(" more");
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals(5,endp.fill(buffer)); assertEquals(5, endp.fill(buffer));
assertEquals("test input more",BufferUtil.toString(buffer)); assertEquals("test input more", BufferUtil.toString(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.fillInterested("CTX",fcb); endp.fillInterested("CTX", fcb);
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals(0,endp.fill(buffer)); assertEquals(0, endp.fill(buffer));
endp.setInput((ByteBuffer)null); endp.setInput((ByteBuffer)null);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals(-1,endp.fill(buffer)); assertEquals(-1, endp.fill(buffer));
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.fillInterested("CTX",fcb); endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals(-1,endp.fill(buffer)); assertEquals(-1, endp.fill(buffer));
endp.close(); endp.close();
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.fillInterested("CTX",fcb); endp.fillInterested("CTX", fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
try try
{ {
fcb.get(); fcb.get();
fail(); fail();
} }
catch(ExecutionException e) catch (ExecutionException e)
{ {
assertThat(e.toString(),containsString("Closed")); assertThat(e.toString(), containsString("Closed"));
} }
} }
@Test @Test
public void testWrite() throws Exception public void testWrite() throws Exception
{ {
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,(byte[])null,15); AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15);
endp.setGrowOutput(false); endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(10)); endp.setOutput(BufferUtil.allocate(10));
@ -106,28 +105,27 @@ public class AsyncByteArrayEndPointTest
ByteBuffer more = BufferUtil.toBuffer(" Some more."); ByteBuffer more = BufferUtil.toBuffer(" Some more.");
FutureCallback<String> fcb = new FutureCallback<>(); FutureCallback<String> fcb = new FutureCallback<>();
endp.write("CTX",fcb,data); endp.write("CTX", fcb, data);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals("Data.",endp.getOutputString()); assertEquals("Data.", endp.getOutputString());
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.write("CTX",fcb,more); endp.write("CTX", fcb, more);
assertFalse(fcb.isDone()); assertFalse(fcb.isDone());
assertEquals("Data. Some",endp.getOutputString()); assertEquals("Data. Some", endp.getOutputString());
assertEquals("Data. Some",endp.takeOutputString()); assertEquals("Data. Some", endp.takeOutputString());
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals("CTX",fcb.get()); assertEquals("CTX", fcb.get());
assertEquals(" more.",endp.getOutputString()); assertEquals(" more.", endp.getOutputString());
} }
@Test @Test
public void testIdle() throws Exception public void testIdle() throws Exception
{ {
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 500);
endp.setIdleTimeout(500);
endp.setInput("test"); endp.setInput("test");
endp.setGrowOutput(false); endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5)); endp.setOutput(BufferUtil.allocate(5));
@ -141,43 +139,43 @@ public class AsyncByteArrayEndPointTest
ByteBuffer buffer = BufferUtil.allocate(1024); ByteBuffer buffer = BufferUtil.allocate(1024);
FutureCallback<Void> fcb = new FutureCallback<>(); FutureCallback<Void> fcb = new FutureCallback<>();
endp.fillInterested(null,fcb); endp.fillInterested(null, fcb);
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals(null,fcb.get()); assertEquals(null, fcb.get());
assertEquals(4,endp.fill(buffer)); assertEquals(4, endp.fill(buffer));
assertEquals("test",BufferUtil.toString(buffer)); assertEquals("test", BufferUtil.toString(buffer));
// read timeout // read timeout
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
endp.fillInterested(null,fcb); endp.fillInterested(null, fcb);
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
try try
{ {
fcb.get(); fcb.get();
fail(); fail();
} }
catch(ExecutionException t) catch (ExecutionException t)
{ {
assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class)); assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
} }
assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L)); assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen()); assertTrue(endp.isOpen());
// write timeout // write timeout
fcb = new FutureCallback<>(); fcb = new FutureCallback<>();
start=System.currentTimeMillis(); start = System.currentTimeMillis();
endp.write(null,fcb,BufferUtil.toBuffer("This is too long")); endp.write(null, fcb, BufferUtil.toBuffer("This is too long"));
try try
{ {
fcb.get(); fcb.get();
fail(); fail();
} }
catch(ExecutionException t) catch (ExecutionException t)
{ {
assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class)); assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class));
} }
assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L)); assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L));
assertTrue(endp.isOpen()); assertTrue(endp.isOpen());
// Still no idle close // Still no idle close
@ -190,8 +188,5 @@ public class AsyncByteArrayEndPointTest
// idle close // idle close
Thread.sleep(1000); Thread.sleep(1000);
assertFalse(endp.isOpen()); assertFalse(endp.isOpen());
} }
} }

View File

@ -1,3 +1,19 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -13,6 +29,8 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -35,6 +53,7 @@ public class SelectChannelEndPointTest
protected volatile AsyncEndPoint _lastEndp; protected volatile AsyncEndPoint _lastEndp;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
{ {
@Override @Override
@ -46,27 +65,27 @@ public class SelectChannelEndPointTest
@Override @Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{ {
return SelectChannelEndPointTest.this.newConnection(channel,endpoint); return SelectChannelEndPointTest.this.newConnection(channel, endpoint);
} }
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000);
_lastEndp=endp; _lastEndp = endp;
return endp; return endp;
} }
}; };
// Must be volatile or the test may fail spuriously // Must be volatile or the test may fail spuriously
protected volatile int _blockAt=0; protected volatile int _blockAt = 0;
private volatile int _writeCount=1; private volatile int _writeCount = 1;
@Before @Before
public void startManager() throws Exception public void startManager() throws Exception
{ {
_writeCount=1; _writeCount = 1;
_lastEndp=null; _lastEndp = null;
_connector = ServerSocketChannel.open(); _connector = ServerSocketChannel.open();
_connector.socket().bind(null); _connector.socket().bind(null);
_threadPool.start(); _threadPool.start();
@ -83,7 +102,7 @@ public class SelectChannelEndPointTest
protected Socket newClient() throws IOException protected Socket newClient() throws IOException
{ {
return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort());
} }
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
@ -93,13 +112,13 @@ public class SelectChannelEndPointTest
public class TestConnection extends AbstractAsyncConnection public class TestConnection extends AbstractAsyncConnection
{ {
ByteBuffer _in = BufferUtil.allocate(32*1024); ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32*1024); ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last=-1; long _last = -1;
public TestConnection(AsyncEndPoint endp) public TestConnection(AsyncEndPoint endp)
{ {
super(endp,_threadPool); super(endp, _threadPool);
} }
@Override @Override
@ -108,45 +127,45 @@ public class SelectChannelEndPointTest
AsyncEndPoint _endp = getEndPoint(); AsyncEndPoint _endp = getEndPoint();
try try
{ {
_last=System.currentTimeMillis(); _last = System.currentTimeMillis();
boolean progress=true; boolean progress = true;
while(progress) while (progress)
{ {
progress=false; progress = false;
// Fill the input buffer with everything available // Fill the input buffer with everything available
if (BufferUtil.isFull(_in)) if (BufferUtil.isFull(_in))
throw new IllegalStateException("FULL "+BufferUtil.toDetailString(_in)); throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in));
int filled=_endp.fill(_in); int filled = _endp.fill(_in);
if (filled>0) if (filled > 0)
progress=true; progress = true;
// If the tests wants to block, then block // If the tests wants to block, then block
while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt)
{ {
FutureCallback<Void> blockingRead= new FutureCallback<>(); FutureCallback<Void> blockingRead = new FutureCallback<>();
_endp.fillInterested(null,blockingRead); _endp.fillInterested(null, blockingRead);
blockingRead.get(); blockingRead.get();
filled=_endp.fill(_in); filled = _endp.fill(_in);
progress|=filled>0; progress |= filled > 0;
} }
// Copy to the out buffer // Copy to the out buffer
if (BufferUtil.hasContent(_in) && BufferUtil.append(_in,_out)>0) if (BufferUtil.hasContent(_in) && BufferUtil.append(_in, _out) > 0)
progress=true; progress = true;
// Blocking writes // Blocking writes
if (BufferUtil.hasContent(_out)) if (BufferUtil.hasContent(_out))
{ {
ByteBuffer out=_out.duplicate(); ByteBuffer out = _out.duplicate();
BufferUtil.clear(_out); BufferUtil.clear(_out);
for (int i=0;i<_writeCount;i++) for (int i = 0; i < _writeCount; i++)
{ {
FutureCallback<Void> blockingWrite= new FutureCallback<>(); FutureCallback<Void> blockingWrite = new FutureCallback<>();
_endp.write(null,blockingWrite,out.asReadOnlyBuffer()); _endp.write(null, blockingWrite, out.asReadOnlyBuffer());
blockingWrite.get(); blockingWrite.get();
} }
progress=true; progress = true;
} }
// are we done? // are we done?
@ -154,26 +173,26 @@ public class SelectChannelEndPointTest
_endp.shutdownOutput(); _endp.shutdownOutput();
} }
} }
catch(ExecutionException e) catch (ExecutionException e)
{ {
// Timeout does not close, so echo exception then shutdown // Timeout does not close, so echo exception then shutdown
try try
{ {
FutureCallback<Void> blockingWrite= new FutureCallback<>(); FutureCallback<Void> blockingWrite = new FutureCallback<>();
_endp.write(null,blockingWrite,BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))); _endp.write(null, blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in)));
blockingWrite.get(); blockingWrite.get();
_endp.shutdownOutput(); _endp.shutdownOutput();
} }
catch(Exception e2) catch (Exception e2)
{ {
// e2.printStackTrace(); // e2.printStackTrace();
} }
} }
catch(InterruptedException|EofException e) catch (InterruptedException | EofException e)
{ {
SelectChannelEndPoint.LOG.ignore(e); SelectChannelEndPoint.LOG.ignore(e);
} }
catch(Exception e) catch (Exception e)
{ {
SelectChannelEndPoint.LOG.warn(e); SelectChannelEndPoint.LOG.warn(e);
} }
@ -204,21 +223,21 @@ public class SelectChannelEndPointTest
for (char c : "HelloWorld".toCharArray()) for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// wait for read timeout // wait for read timeout
client.setSoTimeout(500); client.setSoTimeout(500);
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
try try
{ {
client.getInputStream().read(); client.getInputStream().read();
Assert.fail(); Assert.fail();
} }
catch(SocketTimeoutException e) catch (SocketTimeoutException e)
{ {
long duration = System.currentTimeMillis()-start; long duration = System.currentTimeMillis() - start;
Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L)); Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L));
} }
@ -230,11 +249,11 @@ public class SelectChannelEndPointTest
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
Assert.assertThat("expect valid char integer", b, greaterThan(0)); Assert.assertThat("expect valid char integer", b, greaterThan(0));
assertEquals("expect characters to be same", c,(char)b); assertEquals("expect characters to be same", c, (char)b);
} }
client.close(); client.close();
int i=0; int i = 0;
while (server.isOpen()) while (server.isOpen())
{ {
Thread.sleep(10); Thread.sleep(10);
@ -262,20 +281,20 @@ public class SelectChannelEndPointTest
for (char c : "HelloWorld".toCharArray()) for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// wait for read timeout // wait for read timeout
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
try try
{ {
client.getInputStream().read(); client.getInputStream().read();
Assert.fail(); Assert.fail();
} }
catch(SocketTimeoutException e) catch (SocketTimeoutException e)
{ {
assertTrue(System.currentTimeMillis()-start>=400); assertTrue(System.currentTimeMillis() - start >= 400);
} }
// write then shutdown // write then shutdown
@ -287,17 +306,14 @@ public class SelectChannelEndPointTest
for (char c : "Goodbye Cruel TLS".toCharArray()) for (char c : "Goodbye Cruel TLS".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// Read close // Read close
assertEquals(-1,client.getInputStream().read()); assertEquals(-1, client.getInputStream().read());
} }
@Test @Test
public void testBlockRead() throws Exception public void testBlockRead() throws Exception
{ {
@ -315,27 +331,27 @@ public class SelectChannelEndPointTest
client.setSoTimeout(specifiedTimeout); client.setSoTimeout(specifiedTimeout);
// Write 8 and cause block waiting for 10 // Write 8 and cause block waiting for 10
_blockAt=10; _blockAt = 10;
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
long wait=System.currentTimeMillis()+1000; long wait = System.currentTimeMillis() + 1000;
while(_lastEndp==null && System.currentTimeMillis()<wait) while (_lastEndp == null && System.currentTimeMillis() < wait)
Thread.yield(); Thread.yield();
_lastEndp.setIdleTimeout(10 * specifiedTimeout); _lastEndp.setIdleTimeout(10 * specifiedTimeout);
Thread.sleep((11*specifiedTimeout)/10); Thread.sleep((11 * specifiedTimeout) / 10);
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
try try
{ {
int b = clientInputStream.read(); int b = clientInputStream.read();
Assert.fail("Should have timed out waiting for a response, but read "+b); Assert.fail("Should have timed out waiting for a response, but read " + b);
} }
catch(SocketTimeoutException e) catch (SocketTimeoutException e)
{ {
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4)); Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4));
} }
// write remaining characters // write remaining characters
@ -346,8 +362,8 @@ public class SelectChannelEndPointTest
for (char c : "1234567890ABCDEF".toCharArray()) for (char c : "1234567890ABCDEF".toCharArray())
{ {
int b = clientInputStream.read(); int b = clientInputStream.read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
} }
@ -370,20 +386,20 @@ public class SelectChannelEndPointTest
for (char c : "HelloWorld".toCharArray()) for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// Set Max idle // Set Max idle
_lastEndp.setIdleTimeout(500); _lastEndp.setIdleTimeout(500);
// read until idle shutdown received // read until idle shutdown received
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
int b=client.getInputStream().read(); int b = client.getInputStream().read();
assertEquals(-1,b); assertEquals(-1, b);
long idle=System.currentTimeMillis()-start; long idle = System.currentTimeMillis() - start;
assertTrue(idle>400); assertTrue(idle > 400);
assertTrue(idle<2000); assertTrue(idle < 2000);
// But endpoint may still be open for a little bit. // But endpoint may still be open for a little bit.
if (_lastEndp.isOpen()) if (_lastEndp.isOpen())
@ -414,31 +430,31 @@ public class SelectChannelEndPointTest
for (char c : "HelloWorld".toCharArray()) for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// Set Max idle // Set Max idle
_lastEndp.setIdleTimeout(500); _lastEndp.setIdleTimeout(500);
// Write 8 and cause block waiting for 10 // Write 8 and cause block waiting for 10
_blockAt=10; _blockAt = 10;
clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.write("12345678".getBytes("UTF-8"));
clientOutputStream.flush(); clientOutputStream.flush();
// read until idle shutdown received // read until idle shutdown received
long start=System.currentTimeMillis(); long start = System.currentTimeMillis();
int b=client.getInputStream().read(); int b = client.getInputStream().read();
assertEquals('E',b); assertEquals('E', b);
long idle=System.currentTimeMillis()-start; long idle = System.currentTimeMillis() - start;
assertTrue(idle>400); assertTrue(idle > 400);
assertTrue(idle<2000); assertTrue(idle < 2000);
for (char c : "E: 12345678".toCharArray()) for (char c : "E: 12345678".toCharArray())
{ {
b = client.getInputStream().read(); b = client.getInputStream().read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals(c,(char)b); assertEquals(c, (char)b);
} }
// But endpoint is still open. // But endpoint is still open.
@ -451,7 +467,6 @@ public class SelectChannelEndPointTest
assertFalse(_lastEndp.isOpen()); assertFalse(_lastEndp.isOpen());
} }
@Test @Test
public void testStress() throws Exception public void testStress() throws Exception
{ {
@ -464,8 +479,8 @@ public class SelectChannelEndPointTest
_manager.accept(server); _manager.accept(server);
final int writes = 200000; final int writes = 200000;
final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); final byte[] bytes = "HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET);
byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); byte[] count = "0\n".getBytes(StringUtil.__UTF8_CHARSET);
BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
final CountDownLatch latch = new CountDownLatch(writes); final CountDownLatch latch = new CountDownLatch(writes);
final InputStream in = new BufferedInputStream(client.getInputStream()); final InputStream in = new BufferedInputStream(client.getInputStream());
@ -474,7 +489,7 @@ public class SelectChannelEndPointTest
out.write(count); out.write(count);
out.flush(); out.flush();
while (_lastEndp==null) while (_lastEndp == null)
Thread.sleep(10); Thread.sleep(10);
_lastEndp.setIdleTimeout(5000); _lastEndp.setIdleTimeout(5000);
@ -484,28 +499,28 @@ public class SelectChannelEndPointTest
public void run() public void run()
{ {
Thread.currentThread().setPriority(MAX_PRIORITY); Thread.currentThread().setPriority(MAX_PRIORITY);
long last=-1; long last = -1;
int count=-1; int count = -1;
try try
{ {
while (latch.getCount()>0) while (latch.getCount() > 0)
{ {
// Verify echo server to client // Verify echo server to client
for (byte b0 : bytes) for (byte b0 : bytes)
{ {
int b = in.read(); int b = in.read();
Assert.assertThat(b,greaterThan(0)); Assert.assertThat(b, greaterThan(0));
assertEquals(0xff&b0,b); assertEquals(0xff & b0, b);
} }
count=0; count = 0;
int b=in.read(); int b = in.read();
while(b>0 && b!='\n') while (b > 0 && b != '\n')
{ {
count=count*10+(b-'0'); count = count * 10 + (b - '0');
b=in.read(); b = in.read();
} }
last=System.currentTimeMillis(); last = System.currentTimeMillis();
//if (latch.getCount()%1000==0) //if (latch.getCount()%1000==0)
// System.out.println(writes-latch.getCount()); // System.out.println(writes-latch.getCount());
@ -513,16 +528,16 @@ public class SelectChannelEndPointTest
latch.countDown(); latch.countDown();
} }
} }
catch(Throwable e) catch (Throwable e)
{ {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
System.err.println("count="+count); System.err.println("count=" + count);
System.err.println("latch="+latch.getCount()); System.err.println("latch=" + latch.getCount());
System.err.println("time="+(now-start)); System.err.println("time=" + (now - start));
System.err.println("last="+(now-last)); System.err.println("last=" + (now - last));
System.err.println("endp="+_lastEndp); System.err.println("endp=" + _lastEndp);
System.err.println("conn="+_lastEndp.getAsyncConnection()); System.err.println("conn=" + _lastEndp.getAsyncConnection());
e.printStackTrace(); e.printStackTrace();
} }
@ -530,12 +545,12 @@ public class SelectChannelEndPointTest
}.start(); }.start();
// Write client to server // Write client to server
for (int i=1;i<writes;i++) for (int i = 1; i < writes; i++)
{ {
out.write(bytes); out.write(bytes);
out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET)); out.write(Integer.toString(i).getBytes(StringUtil.__ISO_8859_1_CHARSET));
out.write('\n'); out.write('\n');
if (i%1000==0) if (i % 1000 == 0)
{ {
//System.err.println(i+"/"+writes); //System.err.println(i+"/"+writes);
out.flush(); out.flush();
@ -544,19 +559,18 @@ public class SelectChannelEndPointTest
} }
out.flush(); out.flush();
long last=latch.getCount(); long last = latch.getCount();
while(!latch.await(5,TimeUnit.SECONDS)) while (!latch.await(5, TimeUnit.SECONDS))
{ {
//System.err.println(latch.getCount()); //System.err.println(latch.getCount());
if (latch.getCount()==last) if (latch.getCount() == last)
Assert.fail(); Assert.fail();
last=latch.getCount(); last = latch.getCount();
} }
assertEquals(0,latch.getCount()); assertEquals(0, latch.getCount());
} }
@Test @Test
public void testWriteBlock() throws Exception public void testWriteBlock() throws Exception
{ {
@ -570,40 +584,38 @@ public class SelectChannelEndPointTest
_manager.accept(server); _manager.accept(server);
// Write client to server // Write client to server
_writeCount=10000; _writeCount = 10000;
String data="Now is the time for all good men to come to the aid of the party"; String data = "Now is the time for all good men to come to the aid of the party";
client.getOutputStream().write(data.getBytes("UTF-8")); client.getOutputStream().write(data.getBytes("UTF-8"));
BufferedInputStream in = new BufferedInputStream(client.getInputStream()); BufferedInputStream in = new BufferedInputStream(client.getInputStream());
for (int i=0;i<_writeCount;i++) for (int i = 0; i < _writeCount; i++)
{ {
if (i%1000==0) if (i % 1000 == 0)
{ {
//System.out.println(i); //System.out.println(i);
TimeUnit.MILLISECONDS.sleep(200); TimeUnit.MILLISECONDS.sleep(200);
} }
// Verify echo server to client // Verify echo server to client
for (int j=0;j<data.length();j++) for (int j = 0; j < data.length(); j++)
{ {
char c=data.charAt(j); char c = data.charAt(j);
int b = in.read(); int b = in.read();
assertTrue(b>0); assertTrue(b > 0);
assertEquals("test-"+i+"/"+j,c,(char)b); assertEquals("test-" + i + "/" + j, c, (char)b);
} }
if (i==0) if (i == 0)
_lastEndp.setIdleTimeout(60000); _lastEndp.setIdleTimeout(60000);
} }
client.close(); client.close();
int i=0; int i = 0;
while (server.isOpen()) while (server.isOpen())
{ {
assert(i++<10); assert (i++ < 10);
Thread.sleep(10); Thread.sleep(10);
} }
} }
} }

View File

@ -10,6 +10,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
@ -36,6 +38,7 @@ public class SslConnectionTest
protected volatile AsyncEndPoint _lastEndp; protected volatile AsyncEndPoint _lastEndp;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool(); protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
protected SelectorManager _manager = new SelectorManager() protected SelectorManager _manager = new SelectorManager()
{ {
@Override @Override
@ -61,7 +64,7 @@ public class SslConnectionTest
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{ {
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000); SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, _scheduler, 60000);
_lastEndp=endp; _lastEndp=endp;
return endp; return endp;
} }

View File

@ -512,6 +512,23 @@ public class ObjectMBean implements DynamicMBean
// This class is an influence // This class is an influence
influences=LazyList.add(influences,aClass); influences=LazyList.add(influences,aClass);
/* enabled mbean influence
String pack = aClass.getPackage().getName();
String clazz = aClass.getSimpleName();
try
{
Class mbean = Class.forName(pack + ".jmx." + clazz + "MBean");
LOG.debug("MBean Influence found for " + aClass.getSimpleName() );
influences = LazyList.add(influences, mbean);
}
catch ( ClassNotFoundException cnfe )
{
LOG.debug("No MBean Influence for " + aClass.getSimpleName() );
}
*/
// So are the super classes // So are the super classes
influences=findInfluences(influences,aClass.getSuperclass()); influences=findInfluences(influences,aClass.getSuperclass());

View File

@ -44,8 +44,8 @@ public class Derived extends Base implements Signature
System.err.println("doodle "+doodle); System.err.println("doodle "+doodle);
} }
public void somethingElse() public String bad()
{ {
return "bad";
} }
} }

View File

@ -0,0 +1,27 @@
package com.acme.jmx;
import org.eclipse.jetty.util.annotation.Managed;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import com.acme.Derived;
@Managed("Derived MBean")
public class DerivedMBean
{
private static final Logger LOG = Log.getLogger(DerivedMBean.class);
Derived managedObject;
public DerivedMBean(Object managedObject)
{
this.managedObject = (Derived)managedObject;
}
@Managed(value="test of proxy", attribute=true, managed=true, getter="good" )
public String good()
{
return "not " + managedObject.bad();
}
}

View File

@ -44,7 +44,7 @@ public class ObjectMBeanTest
{ {
Derived derived = new Derived(); Derived derived = new Derived();
ObjectMBean mbean = new ObjectMBean(derived); ObjectMBean mbean = new ObjectMBean(derived);
assertTrue(mbean.getMBeanInfo()!=null); // TODO do more than just run it assertTrue(mbean.getMBeanInfo()!=null);
MBeanInfo info = mbean.getMBeanInfo(); MBeanInfo info = mbean.getMBeanInfo();
@ -68,6 +68,7 @@ public class ObjectMBeanTest
MBeanOperationInfo[] opinfos = info.getOperations(); MBeanOperationInfo[] opinfos = info.getOperations();
boolean publish = false; boolean publish = false;
boolean doodle = false; boolean doodle = false;
boolean good = false;
for ( int i = 0 ; i < opinfos.length; ++i ) for ( int i = 0 ; i < opinfos.length; ++i )
{ {
MBeanOperationInfo opinfo = opinfos[i]; MBeanOperationInfo opinfo = opinfos[i];
@ -90,10 +91,21 @@ public class ObjectMBeanTest
Assert.assertEquals("parameter description doesn't match", "A description of the argument", pinfos[0].getDescription()); Assert.assertEquals("parameter description doesn't match", "A description of the argument", pinfos[0].getDescription());
Assert.assertEquals("parameter name doesn't match", "doodle", pinfos[0].getName()); Assert.assertEquals("parameter name doesn't match", "doodle", pinfos[0].getName());
} }
if ("good".equals(opinfo.getName()))
{
doodle = true;
Assert.assertEquals("description does not match", "test of proxy", opinfo.getDescription());
Assert.assertEquals("execution contexts wrong", "not bad", mbean.invoke("good", new Object[] {}, new String[] {}));
}
} }
Assert.assertTrue("publish operation was not not found", publish); Assert.assertTrue("publish operation was not not found", publish);
Assert.assertTrue("doodle operation was not not found", doodle); Assert.assertTrue("doodle operation was not not found", doodle);
// Assert.assertTrue("good operation was not not found", good); not wired up yet
} }
} }

View File

@ -194,7 +194,7 @@
<!-- in the $JETTY_HOME/contexts directory --> <!-- in the $JETTY_HOME/contexts directory -->
<!-- --> <!-- -->
<!-- =========================================================== --> <!-- =========================================================== -->
<Call name="addLifeCycle"> <Call name="addBean">
<Arg> <Arg>
<New class="org.eclipse.jetty.server.deployer.ContextDeployer"> <New class="org.eclipse.jetty.server.deployer.ContextDeployer">
<Set name="contexts"><Ref id="Contexts"/></Set> <Set name="contexts"><Ref id="Contexts"/></Set>
@ -217,7 +217,7 @@
<!-- Normally only one type of deployer need be used. --> <!-- Normally only one type of deployer need be used. -->
<!-- --> <!-- -->
<!-- =========================================================== --> <!-- =========================================================== -->
<Call name="addLifeCycle"> <Call name="addBean">
<Arg> <Arg>
<New class="org.eclipse.jetty.server.deployer.WebAppDeployer"> <New class="org.eclipse.jetty.server.deployer.WebAppDeployer">
<Set name="contexts"><Ref id="Contexts"/></Set> <Set name="contexts"><Ref id="Contexts"/></Set>

View File

@ -36,103 +36,77 @@ import org.eclipse.jetty.util.log.Logger;
* <li>Base acceptor thread</li> * <li>Base acceptor thread</li>
* <li>Optional reverse proxy headers checking</li> * <li>Optional reverse proxy headers checking</li>
* </ul> * </ul>
*
*
*/ */
public abstract class AbstractConnector extends AggregateLifeCycle implements Connector, Dumpable public abstract class AbstractConnector extends AggregateLifeCycle implements Connector, Dumpable
{ {
static final Logger LOG = Log.getLogger(AbstractConnector.class); protected final Logger LOG = Log.getLogger(getClass());
private final Thread[] _acceptors;
private String _name;
private Server _server;
private Executor _executor;
private String _host;
private int _port = 0;
private int _acceptQueueSize = 0;
private int _acceptorPriorityOffset = 0;
private boolean _reuseAddress = true;
private ByteBufferPool _byteBufferPool=new StandardByteBufferPool(); // TODO should this be server wide? or a thread local one?
private final Statistics _stats = new ConnectionStatistics(); private final Statistics _stats = new ConnectionStatistics();
private final Thread[] _acceptors;
private volatile String _name;
private volatile Server _server;
private volatile Executor _executor;
private volatile int _acceptQueueSize = 128;
private volatile boolean _reuseAddress = true;
private volatile ByteBufferPool _byteBufferPool;
private volatile long _idleTimeout = 200000;
private volatile int _soLingerTime = -1;
protected long _idleTimeout = 200000;
protected int _soLingerTime = -1;
/* ------------------------------------------------------------ */
/**
*/
public AbstractConnector() public AbstractConnector()
{ {
this(Math.max(1,(Runtime.getRuntime().availableProcessors())/4)); this(Math.max(1, (Runtime.getRuntime().availableProcessors()) / 4));
} }
/* ------------------------------------------------------------ */ public AbstractConnector(@Name("acceptors") int acceptors)
/**
*/
public AbstractConnector(@Name(value="acceptors") int acceptors)
{ {
if (acceptors > 2 * Runtime.getRuntime().availableProcessors()) if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
LOG.warn("Acceptors should be <=2*availableProcessors: " + this); LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
_acceptors=new Thread[acceptors]; _acceptors = new Thread[acceptors];
} }
/* ------------------------------------------------------------ */
@Override @Override
public Statistics getStatistics() public Statistics getStatistics()
{ {
return _stats; return _stats;
} }
/* ------------------------------------------------------------ */
/*
*/
@Override @Override
public Server getServer() public Server getServer()
{ {
return _server; return _server;
} }
/* ------------------------------------------------------------ */
public void setServer(Server server) public void setServer(Server server)
{ {
_server = server; _server = server;
} }
/* ------------------------------------------------------------ */
public Executor findExecutor() public Executor findExecutor()
{ {
if (_executor==null && getServer()!=null) if (_executor == null && getServer() != null)
return getServer().getThreadPool(); return getServer().getThreadPool();
return _executor; return _executor;
} }
/* ------------------------------------------------------------ */
@Override @Override
public Executor getExecutor() public Executor getExecutor()
{ {
return _executor; return _executor;
} }
/* ------------------------------------------------------------ */
public void setExecutor(Executor executor) public void setExecutor(Executor executor)
{ {
removeBean(_executor); removeBean(_executor);
_executor=executor; _executor = executor;
addBean(_executor); addBean(_executor);
} }
/* ------------------------------------------------------------ */
@Override @Override
public ByteBufferPool getByteBufferPool() public ByteBufferPool getByteBufferPool()
{ {
return _byteBufferPool; return _byteBufferPool;
} }
/* ------------------------------------------------------------ */
public void setByteBufferPool(ByteBufferPool byteBufferPool) public void setByteBufferPool(ByteBufferPool byteBufferPool)
{ {
removeBean(byteBufferPool); removeBean(byteBufferPool);
@ -140,53 +114,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
addBean(_byteBufferPool); addBean(_byteBufferPool);
} }
/* ------------------------------------------------------------ */
public void setHost(String host)
{
if (this instanceof NetConnector)
_host = host;
else
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public String getHost()
{
return _host;
}
/* ------------------------------------------------------------ */
public void setPort(int port)
{
if (this instanceof NetConnector)
_port = port;
else
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public int getPort()
{
return _port;
}
/* ------------------------------------------------------------ */
public void open() throws IOException
{
}
/* ------------------------------------------------------------ */
public void close() throws IOException
{
}
/* ------------------------------------------------------------ */
public int getLocalPort()
{
return -1;
}
/* ------------------------------------------------------------ */
/** /**
* @return Returns the maxIdleTime. * @return Returns the maxIdleTime.
*/ */
@ -196,7 +123,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _idleTimeout; return _idleTimeout;
} }
/* ------------------------------------------------------------ */
/** /**
* Set the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)} call, although with NIO implementations * Set the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)} call, although with NIO implementations
* other mechanisms may be used to implement the timeout. The max idle time is applied: * other mechanisms may be used to implement the timeout. The max idle time is applied:
@ -209,19 +135,17 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
* timeout (if implemented by jetty) is reset. However, in many instances, the reading/writing is delegated to the JVM, and the semantic is more strictly * timeout (if implemented by jetty) is reset. However, in many instances, the reading/writing is delegated to the JVM, and the semantic is more strictly
* enforced as the maximum time a single read/write operation can take. Note, that as Jetty supports writes of memory mapped file buffers, then a write may * enforced as the maximum time a single read/write operation can take. Note, that as Jetty supports writes of memory mapped file buffers, then a write may
* take many 10s of seconds for large content written to a slow device. * take many 10s of seconds for large content written to a slow device.
* <p> * <p/>
* Previously, Jetty supported separate idle timeouts and IO operation timeouts, however the expense of changing the value of soTimeout was significant, so * Previously, Jetty supported separate idle timeouts and IO operation timeouts, however the expense of changing the value of soTimeout was significant, so
* these timeouts were merged. With the advent of NIO, it may be possible to again differentiate these values (if there is demand). * these timeouts were merged. With the advent of NIO, it may be possible to again differentiate these values (if there is demand).
* *
* @param idleTimeout * @param idleTimeout The idleTimeout to set.
* The idleTimeout to set.
*/ */
public void setIdleTimeout(long idleTimeout) public void setIdleTimeout(long idleTimeout)
{ {
_idleTimeout = idleTimeout; _idleTimeout = idleTimeout;
} }
/* ------------------------------------------------------------ */
/** /**
* @return Returns the soLingerTime. * @return Returns the soLingerTime.
*/ */
@ -230,7 +154,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _soLingerTime; return _soLingerTime;
} }
/* ------------------------------------------------------------ */
/** /**
* @return Returns the acceptQueueSize. * @return Returns the acceptQueueSize.
*/ */
@ -239,17 +162,14 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _acceptQueueSize; return _acceptQueueSize;
} }
/* ------------------------------------------------------------ */
/** /**
* @param acceptQueueSize * @param acceptQueueSize The acceptQueueSize to set.
* The acceptQueueSize to set.
*/ */
public void setAcceptQueueSize(int acceptQueueSize) public void setAcceptQueueSize(int acceptQueueSize)
{ {
_acceptQueueSize = acceptQueueSize; _acceptQueueSize = acceptQueueSize;
} }
/* ------------------------------------------------------------ */
/** /**
* @return Returns the number of acceptor threads. * @return Returns the number of acceptor threads.
*/ */
@ -259,30 +179,21 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
} }
/* ------------------------------------------------------------ */
/** /**
* @param soLingerTime * @param soLingerTime The soLingerTime to set or -1 to disable.
* The soLingerTime to set or -1 to disable.
*/ */
public void setSoLingerTime(int soLingerTime) public void setSoLingerTime(int soLingerTime)
{ {
_soLingerTime = soLingerTime; _soLingerTime = soLingerTime;
} }
/* ------------------------------------------------------------ */
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
if (_server == null) if (_server == null)
throw new IllegalStateException("No server"); throw new IllegalStateException("No server");
if (_name==null) _byteBufferPool = new StandardByteBufferPool();
_name = (getHost() == null?"0.0.0.0":getHost()) + ":" + getPort();
// open listener port
open();
_name=_name+"/"+getLocalPort();
super.doStart(); super.doStart();
@ -293,22 +204,12 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
findExecutor().execute(new Acceptor(i)); findExecutor().execute(new Acceptor(i));
} }
LOG.info("Started {}",this); LOG.info("Started {}", this);
} }
/* ------------------------------------------------------------ */
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
try
{
close();
}
catch (IOException e)
{
LOG.warn(e);
}
super.doStop(); super.doStop();
for (Thread thread : _acceptors) for (Thread thread : _acceptors)
@ -317,12 +218,11 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
thread.interrupt(); thread.interrupt();
} }
int i=_name.lastIndexOf("/"); int i = _name.lastIndexOf("/");
if (i>0) if (i > 0)
_name=_name.substring(0,i); _name = _name.substring(0, i);
} }
/* ------------------------------------------------------------ */
public void join() throws InterruptedException public void join() throws InterruptedException
{ {
for (Thread thread : _acceptors) for (Thread thread : _acceptors)
@ -330,16 +230,15 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
thread.join(); thread.join();
} }
/* ------------------------------------------------------------ */
protected void configure(Socket socket) protected void configure(Socket socket)
{ {
try try
{ {
socket.setTcpNoDelay(true); socket.setTcpNoDelay(true);
if (_soLingerTime >= 0) if (_soLingerTime >= 0)
socket.setSoLinger(true,_soLingerTime / 1000); socket.setSoLinger(true, _soLingerTime / 1000);
else else
socket.setSoLinger(false,0); socket.setSoLinger(false, 0);
} }
catch (Exception e) catch (Exception e)
{ {
@ -347,21 +246,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
} }
} }
/* ------------------------------------------------------------ */
protected abstract void accept(int acceptorID) throws IOException, InterruptedException; protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%s:%d",
getClass().getSimpleName(),
getHost()==null?"0.0.0.0":getHost(),
getLocalPort()<=0?getPort():getLocalPort());
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private class Acceptor implements Runnable private class Acceptor implements Runnable
{ {
@ -372,7 +258,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_acceptor = id; _acceptor = id;
} }
/* ------------------------------------------------------------ */
@Override @Override
public void run() public void run()
{ {
@ -391,7 +276,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
try try
{ {
current.setPriority(old_priority - _acceptorPriorityOffset); current.setPriority(old_priority);
while (isRunning() && getTransport() != null) while (isRunning() && getTransport() != null)
{ {
try try
@ -421,43 +306,37 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
} }
} }
/* ------------------------------------------------------------ */
@Override @Override
public String getName() public String getName()
{ {
return _name; return _name;
} }
/* ------------------------------------------------------------ */
public void setName(String name) public void setName(String name)
{ {
_name = name; _name = name;
} }
/* ------------------------------------------------------------ */
protected void connectionOpened(AsyncConnection connection) protected void connectionOpened(AsyncConnection connection)
{ {
_stats.connectionOpened(); _stats.connectionOpened();
} }
/* ------------------------------------------------------------ */
protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection) protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection)
{ {
long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp(); long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp();
int requests = (oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0; int requests = (oldConnection instanceof HttpConnection) ? ((HttpConnection)oldConnection).getHttpChannel().getRequests() : 0;
_stats.connectionUpgraded(duration,requests,requests); _stats.connectionUpgraded(duration, requests, requests);
} }
/* ------------------------------------------------------------ */
protected void connectionClosed(AsyncConnection connection) protected void connectionClosed(AsyncConnection connection)
{ {
long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
// TODO: remove casts to HttpConnection // TODO: remove casts to HttpConnection
int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; int requests = (connection instanceof HttpConnection) ? ((HttpConnection)connection).getHttpChannel().getRequests() : 0;
_stats.connectionClosed(duration,requests,requests); _stats.connectionClosed(duration, requests, requests);
} }
/* ------------------------------------------------------------ */
/** /**
* @return True if the the server socket will be opened in SO_REUSEADDR mode. * @return True if the the server socket will be opened in SO_REUSEADDR mode.
*/ */
@ -466,10 +345,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _reuseAddress; return _reuseAddress;
} }
/* ------------------------------------------------------------ */
/** /**
* @param reuseAddress * @param reuseAddress True if the the server socket will be opened in SO_REUSEADDR mode.
* True if the the server socket will be opened in SO_REUSEADDR mode.
*/ */
public void setReuseAddress(boolean reuseAddress) public void setReuseAddress(boolean reuseAddress)
{ {

View File

@ -0,0 +1,100 @@
// ========================================================================
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import java.io.IOException;
public abstract class AbstractNetConnector extends AbstractConnector implements Connector.NetConnector
{
private volatile String _host;
private volatile int _port = 0;
protected AbstractNetConnector()
{
}
protected AbstractNetConnector(int acceptors)
{
super(acceptors);
}
public void setHost(String host)
{
_host = host;
}
public String getHost()
{
return _host;
}
public void setPort(int port)
{
_port = port;
}
public int getPort()
{
return _port;
}
public int getLocalPort()
{
return -1;
}
@Override
protected void doStart() throws Exception
{
if (getName() == null)
setName(getHost() == null ? "0.0.0.0" : getHost() + ":" + getPort());
open();
setName(getName() + "/" + getLocalPort());
super.doStart();
}
@Override
protected void doStop() throws Exception
{
try
{
close();
}
catch (IOException e)
{
LOG.warn(e);
}
super.doStop();
}
public void open() throws IOException
{
}
public void close() throws IOException
{
}
@Override
public String toString()
{
return String.format("%s@%s:%d",
getClass().getSimpleName(),
getHost() == null ? "0.0.0.0" : getHost(),
getLocalPort() <= 0 ? getPort() : getLocalPort());
}
}

View File

@ -65,7 +65,7 @@ public interface Connector extends LifeCycle
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
Statistics getStatistics(); Statistics getStatistics();
interface NetConnector extends Connector interface NetConnector extends Connector, AutoCloseable
{ {
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -75,7 +75,7 @@ public interface Connector extends LifeCycle
void open() throws IOException; void open() throws IOException;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
void close(); void close() throws IOException;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**

View File

@ -18,8 +18,7 @@ import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Timer; import java.util.concurrent.ScheduledExecutorService;
import javax.servlet.DispatcherType; import javax.servlet.DispatcherType;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
@ -55,7 +54,7 @@ public abstract class HttpChannel
{ {
static final Logger LOG = Log.getLogger(HttpChannel.class); static final Logger LOG = Log.getLogger(HttpChannel.class);
private static final ThreadLocal<HttpChannel> __currentChannel = new ThreadLocal<HttpChannel>(); private static final ThreadLocal<HttpChannel> __currentChannel = new ThreadLocal<>();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public static HttpChannel getCurrentHttpChannel() public static HttpChannel getCurrentHttpChannel()
@ -68,8 +67,8 @@ public abstract class HttpChannel
{ {
__currentChannel.set(channel); __currentChannel.set(channel);
} }
private final Server _server; private final Server _server;
private final AsyncConnection _connection; private final AsyncConnection _connection;
@ -77,7 +76,7 @@ public abstract class HttpChannel
private final ChannelEventHandler _handler = new ChannelEventHandler(); private final ChannelEventHandler _handler = new ChannelEventHandler();
private final HttpChannelState _state; private final HttpChannelState _state;
private final HttpFields _requestFields; private final HttpFields _requestFields;
private final Request _request; private final Request _request;
private final HttpInput _in; private final HttpInput _in;
@ -90,19 +89,16 @@ public abstract class HttpChannel
private int _requests; private int _requests;
private int _include; private int _include;
private HttpVersion _version = HttpVersion.HTTP_1_1; private HttpVersion _version = HttpVersion.HTTP_1_1;
private boolean _expect = false; private boolean _expect = false;
private boolean _expect100Continue = false; private boolean _expect100Continue = false;
private boolean _expect102Processing = false; private boolean _expect102Processing = false;
private boolean _host = false; private boolean _host = false;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Constructor
*
*/
public HttpChannel(Server server,AsyncConnection connection,HttpInput input) public HttpChannel(Server server,AsyncConnection connection,HttpInput input)
{ {
_server = server; _server = server;
@ -116,31 +112,31 @@ public abstract class HttpChannel
_in=input; _in=input;
_out=new Output(); _out=new Output();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public HttpChannelState getState() public HttpChannelState getState()
{ {
return _state; return _state;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public EventHandler getEventHandler() public EventHandler getEventHandler()
{ {
return _handler; return _handler;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public AsyncEndPoint getEndPoint() public AsyncEndPoint getEndPoint()
{ {
return getConnection().getEndPoint(); return getConnection().getEndPoint();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isIdle() public boolean isIdle()
{ {
return _state.isIdle(); return _state.isIdle();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return the number of requests handled by this connection * @return the number of requests handled by this connection
@ -197,7 +193,7 @@ public abstract class HttpChannel
{ {
return _connection; return _connection;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public InetSocketAddress getLocalAddress() public InetSocketAddress getLocalAddress()
{ {
@ -209,7 +205,7 @@ public abstract class HttpChannel
{ {
return _connection.getEndPoint().getRemoteAddress(); return _connection.getEndPoint().getRemoteAddress();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Get the inputStream from the connection. * Get the inputStream from the connection.
@ -220,6 +216,7 @@ public abstract class HttpChannel
* *
* @return The input stream for this connection. * @return The input stream for this connection.
* The stream will be created if it does not already exist. * The stream will be created if it does not already exist.
* @throws IOException if the InputStream cannot be created
*/ */
public ServletInputStream getInputStream() throws IOException public ServletInputStream getInputStream() throws IOException
{ {
@ -251,6 +248,7 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @param charset the character set for the PrintWriter
* @return A {@link PrintWriter} wrapping the {@link #getOutputStream output stream}. The writer is created if it * @return A {@link PrintWriter} wrapping the {@link #getOutputStream output stream}. The writer is created if it
* does not already exist. * does not already exist.
*/ */
@ -319,7 +317,7 @@ public abstract class HttpChannel
// Loop here to handle async request redispatches. // Loop here to handle async request redispatches.
// The loop is controlled by the call to async.unhandle in the // The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has // finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called. // already happened when unhandle is called.
boolean handling=_state.handling(); boolean handling=_state.handling();
while(handling && getServer().isRunning()) while(handling && getServer().isRunning())
@ -328,7 +326,7 @@ public abstract class HttpChannel
{ {
_request.setHandled(false); _request.setHandled(false);
_out.reopen(); _out.reopen();
if (_state.isInitial()) if (_state.isInitial())
{ {
_request.setDispatcherType(DispatcherType.REQUEST); _request.setDispatcherType(DispatcherType.REQUEST);
@ -340,7 +338,7 @@ public abstract class HttpChannel
_request.setDispatcherType(DispatcherType.ASYNC); _request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this); getServer().handleAsync(this);
} }
} }
catch (ContinuationThrowable e) catch (ContinuationThrowable e)
{ {
@ -377,7 +375,7 @@ public abstract class HttpChannel
__currentChannel.set(null); __currentChannel.set(null);
if (threadName!=null) if (threadName!=null)
Thread.currentThread().setName(threadName); Thread.currentThread().setName(threadName);
if (_state.isCompleting()) if (_state.isCompleting())
{ {
try try
@ -401,7 +399,7 @@ public abstract class HttpChannel
// Complete generating the response // Complete generating the response
_response.complete(); _response.complete();
// Complete reading the request // Complete reading the request
_in.consumeAll(); _in.consumeAll();
} }
@ -420,7 +418,7 @@ public abstract class HttpChannel
completed(); completed();
} }
} }
LOG.debug("{} !process",this); LOG.debug("{} !process",this);
} }
} }
@ -429,10 +427,10 @@ public abstract class HttpChannel
protected boolean commitError(final int status, final String reason, String content) protected boolean commitError(final int status, final String reason, String content)
{ {
LOG.debug("{} sendError {} {}",this,status,reason); LOG.debug("{} sendError {} {}",this,status,reason);
if (_response.isCommitted()) if (_response.isCommitted())
return false; return false;
try try
{ {
_response.setStatus(status,reason); _response.setStatus(status,reason);
@ -447,7 +445,7 @@ public abstract class HttpChannel
HttpGenerator.ResponseInfo info = _handler.commit(); HttpGenerator.ResponseInfo info = _handler.commit();
commit(info,buffer); commit(info,buffer);
return true; return true;
} }
catch(Exception e) catch(Exception e)
@ -481,11 +479,8 @@ public abstract class HttpChannel
_include--; _include--;
_out.reopen(); _out.reopen();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncConnection#isSuspended()
*/
public boolean isSuspended() public boolean isSuspended()
{ {
return _request.getAsyncContinuation().isSuspended(); return _request.getAsyncContinuation().isSuspended();
@ -508,7 +503,7 @@ public abstract class HttpChannel
{ {
return _expect102Processing; return _expect102Processing;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public String toString() public String toString()
@ -536,7 +531,7 @@ public abstract class HttpChannel
if(_request.getTimeStamp()==0) if(_request.getTimeStamp()==0)
_request.setTimeStamp(System.currentTimeMillis()); _request.setTimeStamp(System.currentTimeMillis());
_request.setMethod(httpMethod,method); _request.setMethod(httpMethod,method);
if (httpMethod==HttpMethod.CONNECT) if (httpMethod==HttpMethod.CONNECT)
_uri.parseConnect(uri); _uri.parseConnect(uri);
else else
@ -545,7 +540,7 @@ public abstract class HttpChannel
_request.setPathInfo(_uri.getDecodedPath()); _request.setPathInfo(_uri.getDecodedPath());
_version=version==null?HttpVersion.HTTP_0_9:version; _version=version==null?HttpVersion.HTTP_0_9:version;
_request.setHttpVersion(_version); _request.setHttpVersion(_version);
return false; return false;
} }
@ -576,7 +571,7 @@ public abstract class HttpChannel
break; break;
default: default:
String[] values = value.toString().split(","); String[] values = value.split(",");
for (int i=0;values!=null && i<values.length;i++) for (int i=0;values!=null && i<values.length;i++)
{ {
expect=HttpHeaderValue.CACHE.get(values[i].trim()); expect=HttpHeaderValue.CACHE.get(values[i].trim());
@ -633,7 +628,7 @@ public abstract class HttpChannel
if (!persistent) if (!persistent)
_responseFields.add(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE); _responseFields.add(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE);
if (_server.getSendDateHeader()) if (_server.getSendDateHeader())
_responseFields.putDateField(HttpHeader.DATE.toString(),_request.getTimeStamp()); _responseFields.putDateField(HttpHeader.DATE.toString(),_request.getTimeStamp());
@ -656,10 +651,10 @@ public abstract class HttpChannel
// Either handle now or wait for first content/message complete // Either handle now or wait for first content/message complete
if (_expect100Continue) if (_expect100Continue)
return true; return true;
return false; return false;
} }
@Override @Override
public boolean content(ByteBuffer ref) public boolean content(ByteBuffer ref)
{ {
@ -704,7 +699,7 @@ public abstract class HttpChannel
} }
return _response.commit(); return _response.commit();
} }
@Override @Override
public String toString() public String toString()
{ {
@ -754,10 +749,10 @@ public abstract class HttpChannel
String contentType = httpContent.getContentType(); String contentType = httpContent.getContentType();
if (contentType != null) if (contentType != null)
_responseFields.put(HttpHeader.CONTENT_TYPE, contentType); _responseFields.put(HttpHeader.CONTENT_TYPE, contentType);
if (httpContent.getContentLength() > 0) if (httpContent.getContentLength() > 0)
_responseFields.putLongField(HttpHeader.CONTENT_LENGTH, httpContent.getContentLength()); _responseFields.putLongField(HttpHeader.CONTENT_LENGTH, httpContent.getContentLength());
String lm = httpContent.getLastModified(); String lm = httpContent.getLastModified();
if (lm != null) if (lm != null)
_responseFields.put(HttpHeader.LAST_MODIFIED, lm); _responseFields.put(HttpHeader.LAST_MODIFIED, lm);
@ -794,40 +789,34 @@ public abstract class HttpChannel
throw new IllegalArgumentException("unknown content type?"); throw new IllegalArgumentException("unknown content type?");
} }
} }
public abstract HttpConnector getHttpConnector(); public abstract HttpConnector getHttpConnector();
protected abstract int write(ByteBuffer content) throws IOException; protected abstract int write(ByteBuffer content) throws IOException;
/* Called by the channel or application to commit a specific response info */ /* Called by the channel or application to commit a specific response info */
protected abstract void commit(ResponseInfo info, ByteBuffer content) throws IOException; protected abstract void commit(ResponseInfo info, ByteBuffer content) throws IOException;
protected abstract int getContentBufferSize(); protected abstract int getContentBufferSize();
protected abstract void increaseContentBufferSize(int size); protected abstract void increaseContentBufferSize(int size);
protected abstract void resetBuffer(); protected abstract void resetBuffer();
protected abstract void flushResponse() throws IOException; protected abstract void flushResponse() throws IOException;
protected abstract void completeResponse() throws IOException; protected abstract void completeResponse() throws IOException;
protected abstract void completed(); protected abstract void completed();
protected abstract void execute(Runnable task);
// TODO replace with ScheduledExecutorService?
// TODO constructor inject
public abstract Timer getTimer();
protected abstract void execute(Runnable task);
// TODO use constructor injection ?
public abstract ScheduledExecutorService getScheduler();
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public interface EventHandler extends HttpParser.RequestHandler public interface EventHandler extends HttpParser.RequestHandler
{ {
ResponseInfo commit(); ResponseInfo commit();
} }
} }

View File

@ -4,20 +4,20 @@
// All rights reserved. This program and the accompanying materials // All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0 // are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution. // and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at // The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html // http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at // The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php // http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses. // You may elect to redistribute this code under either of these licenses.
// ======================================================================== // ========================================================================
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.concurrent.Future;
import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent; import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener; import javax.servlet.AsyncListener;
@ -39,32 +39,32 @@ import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Implementation of Continuation and AsyncContext interfaces /** Implementation of Continuation and AsyncContext interfaces
* *
*/ */
public class HttpChannelState implements AsyncContext, Continuation public class HttpChannelState implements AsyncContext, Continuation
{ {
private static final Logger LOG = Log.getLogger(HttpChannelState.class); private static final Logger LOG = Log.getLogger(HttpChannelState.class);
private final static long DEFAULT_TIMEOUT=30000L; private final static long DEFAULT_TIMEOUT=30000L;
private final static ContinuationThrowable __exception = new ContinuationThrowable(); private final static ContinuationThrowable __exception = new ContinuationThrowable();
// STATES: // STATES:
// handling() suspend() unhandle() resume() complete() completed() // handling() suspend() unhandle() resume() complete() completed()
// startAsync() dispatch() // startAsync() dispatch()
// IDLE DISPATCHED COMPLETECALLED // IDLE DISPATCHED COMPLETECALLED
// DISPATCHED ASYNCSTARTED COMPLETING // DISPATCHED ASYNCSTARTED COMPLETING
// ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETECALLED // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETECALLED
// REDISPATCHING REDISPATCHED // REDISPATCHING REDISPATCHED
// ASYNCWAIT REDISPATCH COMPLETECALLED // ASYNCWAIT REDISPATCH COMPLETECALLED
// REDISPATCH REDISPATCHED // REDISPATCH REDISPATCHED
// REDISPATCHED ASYNCSTARTED COMPLETING // REDISPATCHED ASYNCSTARTED COMPLETING
// COMPLETECALLED COMPLETING COMPLETING // COMPLETECALLED COMPLETING COMPLETING
// COMPLETING COMPLETING COMPLETED // COMPLETING COMPLETING COMPLETED
// COMPLETED // COMPLETED
public enum State public enum State
{ {
IDLE, // Idle request IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet DISPATCHED, // Request dispatched to filter/servlet
ASYNCSTARTED, // Suspend called, but not yet returned to container ASYNCSTARTED, // Suspend called, but not yet returned to container
@ -75,8 +75,8 @@ public class HttpChannelState implements AsyncContext, Continuation
COMPLETECALLED,// complete called COMPLETECALLED,// complete called
COMPLETING, // Request is completable COMPLETING, // Request is completable
COMPLETED // Request is complete COMPLETED // Request is complete
}; }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private final HttpChannel _channel; private final HttpChannel _channel;
private List<AsyncListener> _lastAsyncListeners; private List<AsyncListener> _lastAsyncListeners;
@ -90,9 +90,9 @@ public class HttpChannelState implements AsyncContext, Continuation
private boolean _expired; private boolean _expired;
private volatile boolean _responseWrapped; private volatile boolean _responseWrapped;
private long _timeoutMs=DEFAULT_TIMEOUT; private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncEventState _event; private AsyncEventState _event;
private volatile boolean _continuation; private volatile boolean _continuation;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected HttpChannelState(HttpChannel channel) protected HttpChannelState(HttpChannel channel)
{ {
@ -103,13 +103,13 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public State getState() public State getState()
{ {
synchronized(this) synchronized(this)
{ {
return _state; return _state;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void addListener(AsyncListener listener) public void addListener(AsyncListener listener)
@ -117,7 +117,7 @@ public class HttpChannelState implements AsyncContext, Continuation
synchronized(this) synchronized(this)
{ {
if (_asyncListeners==null) if (_asyncListeners==null)
_asyncListeners=new ArrayList<AsyncListener>(); _asyncListeners=new ArrayList<>();
_asyncListeners.add(listener); _asyncListeners.add(listener);
} }
} }
@ -130,7 +130,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
// TODO handle the request/response ??? // TODO handle the request/response ???
if (_asyncListeners==null) if (_asyncListeners==null)
_asyncListeners=new ArrayList<AsyncListener>(); _asyncListeners=new ArrayList<>();
_asyncListeners.add(listener); _asyncListeners.add(listener);
} }
} }
@ -142,7 +142,7 @@ public class HttpChannelState implements AsyncContext, Continuation
synchronized(this) synchronized(this)
{ {
if (_continuationListeners==null) if (_continuationListeners==null)
_continuationListeners=new ArrayList<ContinuationListener>(); _continuationListeners=new ArrayList<>();
_continuationListeners.add(listener); _continuationListeners.add(listener);
} }
} }
@ -155,7 +155,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
_timeoutMs=ms; _timeoutMs=ms;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
@ -165,7 +165,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
return _timeoutMs; return _timeoutMs;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public AsyncEventState getAsyncEventState() public AsyncEventState getAsyncEventState()
@ -174,8 +174,8 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
return _event; return _event;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped() * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
@ -198,7 +198,7 @@ public class HttpChannelState implements AsyncContext, Continuation
return _initial; return _initial;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* (non-Javadoc) /* (non-Javadoc)
* @see javax.servlet.ServletRequest#isSuspended() * @see javax.servlet.ServletRequest#isSuspended()
@ -215,9 +215,9 @@ public class HttpChannelState implements AsyncContext, Continuation
case COMPLETECALLED: case COMPLETECALLED:
case ASYNCWAIT: case ASYNCWAIT:
return true; return true;
default: default:
return false; return false;
} }
} }
} }
@ -230,7 +230,7 @@ public class HttpChannelState implements AsyncContext, Continuation
return _state==State.IDLE; return _state==State.IDLE;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isSuspending() public boolean isSuspending()
{ {
@ -241,13 +241,13 @@ public class HttpChannelState implements AsyncContext, Continuation
case ASYNCSTARTED: case ASYNCSTARTED:
case ASYNCWAIT: case ASYNCWAIT:
return true; return true;
default: default:
return false; return false;
} }
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isDispatchable() public boolean isDispatchable()
{ {
@ -260,9 +260,9 @@ public class HttpChannelState implements AsyncContext, Continuation
case REDISPATCHING: case REDISPATCHING:
case COMPLETECALLED: case COMPLETECALLED:
return true; return true;
default: default:
return false; return false;
} }
} }
} }
@ -299,7 +299,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
_continuation=false; _continuation=false;
_responseWrapped=false; _responseWrapped=false;
switch(_state) switch(_state)
{ {
case IDLE: case IDLE:
@ -315,7 +315,7 @@ public class HttpChannelState implements AsyncContext, Continuation
_lastAsyncListeners=null; _lastAsyncListeners=null;
} }
return true; return true;
case COMPLETECALLED: case COMPLETECALLED:
_state=State.COMPLETING; _state=State.COMPLETING;
return false; return false;
@ -323,7 +323,7 @@ public class HttpChannelState implements AsyncContext, Continuation
case COMPLETING: case COMPLETING:
case ASYNCWAIT: case ASYNCWAIT:
return false; return false;
case REDISPATCH: case REDISPATCH:
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
return true; return true;
@ -370,7 +370,7 @@ public class HttpChannelState implements AsyncContext, Continuation
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
} }
} }
if (_lastAsyncListeners!=null) if (_lastAsyncListeners!=null)
{ {
for (AsyncListener listener : _lastAsyncListeners) for (AsyncListener listener : _lastAsyncListeners)
@ -397,13 +397,13 @@ public class HttpChannelState implements AsyncContext, Continuation
_event._cause=th; _event._cause=th;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Signal that the HttpConnection has finished handling the request. * Signal that the HttpConnection has finished handling the request.
* For blocking connectors, this call may block if the request has * For blocking connectors, this call may block if the request has
* been suspended (startAsync called). * been suspended (startAsync called).
* @return true if handling is complete, false if the request should * @return true if handling is complete, false if the request should
* be handled again (eg because of a resume that happened before unhandle was called) * be handled again (eg because of a resume that happened before unhandle was called)
*/ */
protected boolean unhandle() protected boolean unhandle()
@ -423,22 +423,22 @@ public class HttpChannelState implements AsyncContext, Continuation
case ASYNCSTARTED: case ASYNCSTARTED:
_initial=false; _initial=false;
_state=State.ASYNCWAIT; _state=State.ASYNCWAIT;
scheduleTimeout(); scheduleTimeout();
if (_state==State.ASYNCWAIT) if (_state==State.ASYNCWAIT)
return true; return true;
else if (_state==State.COMPLETECALLED) else if (_state==State.COMPLETECALLED)
{ {
_state=State.COMPLETING; _state=State.COMPLETING;
return true; return true;
} }
_initial=false; _initial=false;
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
return false; return false;
case REDISPATCHING: case REDISPATCHING:
_initial=false; _initial=false;
_state=State.REDISPATCHED; _state=State.REDISPATCHED;
return false; return false;
case COMPLETECALLED: case COMPLETECALLED:
_initial=false; _initial=false;
@ -470,15 +470,15 @@ public class HttpChannelState implements AsyncContext, Continuation
_state=State.REDISPATCH; _state=State.REDISPATCH;
_resumed=true; _resumed=true;
break; break;
case REDISPATCH: case REDISPATCH:
return; return;
default: default:
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
} }
} }
if (dispatch) if (dispatch)
{ {
cancelTimeout(); cancelTimeout();
@ -507,7 +507,7 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
_expired=true; _expired=true;
} }
if (aListeners!=null) if (aListeners!=null)
{ {
for (AsyncListener listener : aListeners) for (AsyncListener listener : aListeners)
@ -536,16 +536,16 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
} }
} }
synchronized (this) synchronized (this)
{ {
switch(_state) switch(_state)
{ {
case ASYNCSTARTED: case ASYNCSTARTED:
case ASYNCWAIT: case ASYNCWAIT:
if (_continuation) if (_continuation)
dispatch(); dispatch();
else else
// TODO maybe error dispatch? // TODO maybe error dispatch?
@ -555,7 +555,7 @@ public class HttpChannelState implements AsyncContext, Continuation
scheduleDispatch(); scheduleDispatch();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* (non-Javadoc) /* (non-Javadoc)
* @see javax.servlet.ServletRequest#complete() * @see javax.servlet.ServletRequest#complete()
@ -577,17 +577,17 @@ public class HttpChannelState implements AsyncContext, Continuation
case ASYNCSTARTED: case ASYNCSTARTED:
_state=State.COMPLETECALLED; _state=State.COMPLETECALLED;
return; return;
case ASYNCWAIT: case ASYNCWAIT:
_state=State.COMPLETECALLED; _state=State.COMPLETECALLED;
dispatch=!_expired; dispatch=!_expired;
break; break;
default: default:
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
} }
} }
if (dispatch) if (dispatch)
{ {
cancelTimeout(); cancelTimeout();
@ -597,7 +597,7 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
{ {
try try
{ {
@ -628,14 +628,14 @@ public class HttpChannelState implements AsyncContext, Continuation
cListeners=_continuationListeners; cListeners=_continuationListeners;
aListeners=_asyncListeners; aListeners=_asyncListeners;
break; break;
default: default:
cListeners=null; cListeners=null;
aListeners=null; aListeners=null;
throw new IllegalStateException(this.getStatusString()); throw new IllegalStateException(this.getStatusString());
} }
} }
if (aListeners!=null) if (aListeners!=null)
{ {
for (AsyncListener listener : aListeners) for (AsyncListener listener : aListeners)
@ -696,8 +696,8 @@ public class HttpChannelState implements AsyncContext, Continuation
if (_event!=null) if (_event!=null)
_event._cause=null; _event._cause=null;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void cancel() public void cancel()
{ {
@ -717,12 +717,9 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void scheduleTimeout() protected void scheduleTimeout()
{ {
Timer timer = _channel.getTimer(); ScheduledExecutorService scheduler = _channel.getScheduler();
if (timer!=null) if (scheduler!=null)
{ _event._timeout=scheduler.schedule(new AsyncTimeout(),_timeoutMs,TimeUnit.MILLISECONDS);
_event._timeout= new AsyncTimeout();
timer.schedule(_event._timeout,_timeoutMs);
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -731,9 +728,9 @@ public class HttpChannelState implements AsyncContext, Continuation
AsyncEventState event=_event; AsyncEventState event=_event;
if (event!=null) if (event!=null)
{ {
TimerTask task=event._timeout; Future<?> task=event._timeout;
if (task!=null) if (task!=null)
task.cancel(); task.cancel(false);
} }
} }
@ -745,7 +742,7 @@ public class HttpChannelState implements AsyncContext, Continuation
return _state==State.COMPLETECALLED; return _state==State.COMPLETECALLED;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
boolean isCompleting() boolean isCompleting()
{ {
@ -753,8 +750,8 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
return _state==State.COMPLETING; return _state==State.COMPLETING;
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isCompleted() public boolean isCompleted()
{ {
@ -826,7 +823,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
return _channel.getRequest(); return _channel.getRequest();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public ServletRequest getRequest() public ServletRequest getRequest()
@ -917,7 +914,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
dispatch(); dispatch();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -936,7 +933,7 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @see Continuation#suspend() * @see Continuation#suspend()
@ -946,7 +943,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
_continuation=true; _continuation=true;
_responseWrapped=!(response instanceof Response); _responseWrapped=!(response instanceof Response);
doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response); doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -958,7 +955,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{ {
_responseWrapped=false; _responseWrapped=false;
_continuation=true; _continuation=true;
doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse());
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -1022,7 +1019,7 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public class AsyncTimeout extends TimerTask public class AsyncTimeout implements Runnable
{ {
@Override @Override
public void run() public void run()
@ -1035,25 +1032,25 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public class AsyncEventState extends AsyncEvent public class AsyncEventState extends AsyncEvent
{ {
private TimerTask _timeout; private Future<?> _timeout;
private final ServletContext _suspendedContext; private final ServletContext _suspendedContext;
private ServletContext _dispatchContext; private ServletContext _dispatchContext;
private String _pathInContext; private String _pathInContext;
private Throwable _cause; private Throwable _cause;
public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response) public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
{ {
super(HttpChannelState.this, request,response); super(HttpChannelState.this, request,response);
_suspendedContext=context; _suspendedContext=context;
// Get the base request So we can remember the initial paths // Get the base request So we can remember the initial paths
Request r=_channel.getRequest(); Request r=_channel.getRequest();
// If we haven't been async dispatched before // If we haven't been async dispatched before
if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null) if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null)
{ {
// We are setting these attributes during startAsync, when the spec implies that // We are setting these attributes during startAsync, when the spec implies that
// they are only available after a call to AsyncContext.dispatch(...); // they are only available after a call to AsyncContext.dispatch(...);
// have we been forwarded before? // have we been forwarded before?
String uri=(String)r.getAttribute(Dispatcher.FORWARD_REQUEST_URI); String uri=(String)r.getAttribute(Dispatcher.FORWARD_REQUEST_URI);
if (uri!=null) if (uri!=null)
@ -1074,22 +1071,22 @@ public class HttpChannelState implements AsyncContext, Continuation
} }
} }
} }
public ServletContext getSuspendedContext() public ServletContext getSuspendedContext()
{ {
return _suspendedContext; return _suspendedContext;
} }
public ServletContext getDispatchContext() public ServletContext getDispatchContext()
{ {
return _dispatchContext; return _dispatchContext;
} }
public ServletContext getServletContext() public ServletContext getServletContext()
{ {
return _dispatchContext==null?_suspendedContext:_dispatchContext; return _dispatchContext==null?_suspendedContext:_dispatchContext;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return The path in the context * @return The path in the context
@ -1099,11 +1096,11 @@ public class HttpChannelState implements AsyncContext, Continuation
return _pathInContext; return _pathInContext;
} }
} }
private final Runnable _handleRequest = new Runnable() private final Runnable _handleRequest = new Runnable()
{ {
@Override @Override
public void run() public void run()
{ {
_channel.handle(); _channel.handle();
} }

View File

@ -15,9 +15,9 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action; import org.eclipse.jetty.http.HttpGenerator.Action;
@ -41,7 +41,7 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
public static final Logger LOG = Log.getLogger(HttpConnection.class); public static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<HttpConnection>(); private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE"; public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
@ -76,17 +76,12 @@ public class HttpConnection extends AbstractAsyncConnection
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Constructor
*
*/
public HttpConnection(HttpConnector connector, AsyncEndPoint endpoint, Server server) public HttpConnection(HttpConnector connector, AsyncEndPoint endpoint, Server server)
{ {
super(endpoint,connector.findExecutor()); super(endpoint,connector.findExecutor());
_connector = connector; _connector = connector;
_bufferPool=_connector.getByteBufferPool(); _bufferPool=_connector.getByteBufferPool();
if (_bufferPool==null)
new Throwable().printStackTrace();
_server = server; _server = server;
@ -696,9 +691,9 @@ public class HttpConnection extends AbstractAsyncConnection
} }
@Override @Override
public Timer getTimer() public ScheduledExecutorService getScheduler()
{ {
return _connector.getTimer(); return _connector.getScheduler();
} }
@Override @Override
@ -747,7 +742,7 @@ public class HttpConnection extends AbstractAsyncConnection
return fcb; return fcb;
} }
}; }
private class HttpHttpInput extends HttpInput private class HttpHttpInput extends HttpInput
{ {

View File

@ -2,8 +2,9 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Timer; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.servlet.ServletRequest; import javax.servlet.ServletRequest;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -11,7 +12,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
public abstract class HttpConnector extends AbstractConnector public abstract class HttpConnector extends AbstractNetConnector
{ {
private String _integralScheme = HttpScheme.HTTPS.asString(); private String _integralScheme = HttpScheme.HTTPS.asString();
private int _integralPort = 0; private int _integralPort = 0;
@ -19,52 +20,65 @@ public abstract class HttpConnector extends AbstractConnector
private int _confidentialPort = 0; private int _confidentialPort = 0;
private boolean _forwarded; private boolean _forwarded;
private String _hostHeader; private String _hostHeader;
private Timer _timer = new Timer(true); private ScheduledExecutorService _scheduler;
private boolean _shutdownScheduler;
private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString(); private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString();
private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString(); private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString();
private String _forwardedForHeader = HttpHeader.X_FORWARDED_FOR.toString(); private String _forwardedForHeader = HttpHeader.X_FORWARDED_FOR.toString();
private String _forwardedProtoHeader = HttpHeader.X_FORWARDED_PROTO.toString(); private String _forwardedProtoHeader = HttpHeader.X_FORWARDED_PROTO.toString();
private String _forwardedCipherSuiteHeader; private String _forwardedCipherSuiteHeader;
private String _forwardedSslSessionIdHeader; private String _forwardedSslSessionIdHeader;
private int _requestHeaderSize=6*1024;
private int _requestHeaderSize=6*1024;;
private int _requestBufferSize=16*1024; private int _requestBufferSize=16*1024;
private int _responseHeaderSize=6*1024; private int _responseHeaderSize=6*1024;
private int _responseBufferSize=16*1024; private int _responseBufferSize=16*1024;
public HttpConnector() public HttpConnector()
{ {
super();
} }
public HttpConnector(int acceptors) public HttpConnector(int acceptors)
{ {
super(acceptors); super(acceptors);
} }
public ScheduledExecutorService getScheduler()
{
return _scheduler;
}
public void setScheduler(ScheduledExecutorService scheduler)
{
_scheduler = scheduler;
}
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
_timer=new Timer("Timer-"+getName(),true); if (_scheduler == null)
{
_scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
@Override
public Thread newThread(Runnable r)
{
return new Thread(r, "Timer-" + HttpConnector.this.getName());
}
});
_shutdownScheduler = true;
}
} }
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
if (_timer!=null) if (_shutdownScheduler)
_timer.cancel(); _scheduler.shutdownNow();
_timer=null; _scheduler = null;
super.doStop(); super.doStop();
} }
public Timer getTimer()
{
return _timer;
}
public int getRequestHeaderSize() public int getRequestHeaderSize()
{ {
return _requestHeaderSize; return _requestHeaderSize;
@ -307,12 +321,11 @@ public abstract class HttpConnector extends AbstractConnector
* Set reverse proxy handling. If set to true, then the X-Forwarded headers (or the headers set in their place) are looked for to set the request protocol, * Set reverse proxy handling. If set to true, then the X-Forwarded headers (or the headers set in their place) are looked for to set the request protocol,
* host, server and client ip. * host, server and client ip.
* *
* @param check * @param check true if this connector is checking the x-forwarded-for/host/server headers
* true if this connector is checking the x-forwarded-for/host/server headers * @see #setForwardedForHeader(String)
* @set {@link #setForwardedForHeader(String)} * @see #setForwardedHostHeader(String)
* @set {@link #setForwardedHostHeader(String)} * @see #setForwardedProtoHeader(String)
* @set {@link #setForwardedProtoHeader(String)} * @see #setForwardedServerHeader(String)
* @set {@link #setForwardedServerHeader(String)}
*/ */
public void setForwarded(boolean check) public void setForwarded(boolean check)
{ {
@ -384,6 +397,7 @@ public abstract class HttpConnector extends AbstractConnector
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return the forwarded for header
* @see #setForwarded(boolean) * @see #setForwarded(boolean)
*/ */
public String getForwardedForHeader() public String getForwardedForHeader()
@ -464,6 +478,4 @@ public abstract class HttpConnector extends AbstractConnector
{ {
_forwardedSslSessionIdHeader = forwardedSslSessionId; _forwardedSslSessionIdHeader = forwardedSslSessionId;
} }
} }

View File

@ -15,14 +15,11 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint; import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
@ -34,9 +31,9 @@ import org.eclipse.jetty.util.log.Logger;
public class LocalHttpConnector extends HttpConnector public class LocalHttpConnector extends HttpConnector
{ {
private static final Logger LOG = Log.getLogger(LocalHttpConnector.class); private static final Logger LOG = Log.getLogger(LocalHttpConnector.class);
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<LocalEndPoint>();
private ScheduledExecutorService _timer; private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
private LocalExecutor _executor; private volatile LocalExecutor _executor;
public LocalHttpConnector() public LocalHttpConnector()
{ {
@ -49,12 +46,12 @@ public class LocalHttpConnector extends HttpConnector
return this; return this;
} }
/** Sends requests and get's responses based on thread activity. /** Sends requests and get responses based on thread activity.
* Returns all the responses received once the thread activity has * Returns all the responses received once the thread activity has
* returned to the level it was before the requests. * returned to the level it was before the requests.
* @param requests * @param requests the requests
* @return * @return the responses
* @throws Exception * @throws Exception if the requests fail
*/ */
public String getResponses(String requests) throws Exception public String getResponses(String requests) throws Exception
{ {
@ -65,6 +62,9 @@ public class LocalHttpConnector extends HttpConnector
/** Sends requests and get's responses based on thread activity. /** Sends requests and get's responses based on thread activity.
* Returns all the responses received once the thread activity has * Returns all the responses received once the thread activity has
* returned to the level it was before the requests. * returned to the level it was before the requests.
* @param requestsBuffer the requests
* @return the responses
* @throws Exception if the requests fail
*/ */
public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
{ {
@ -81,8 +81,8 @@ public class LocalHttpConnector extends HttpConnector
/** /**
* Execute a request and return the EndPoint through which * Execute a request and return the EndPoint through which
* responses can be received. * responses can be received.
* @param rawRequest * @param rawRequest the request
* @return * @return the local endpoint
*/ */
public LocalEndPoint executeRequest(String rawRequest) public LocalEndPoint executeRequest(String rawRequest)
{ {
@ -111,7 +111,6 @@ public class LocalHttpConnector extends HttpConnector
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
_timer=new ScheduledThreadPoolExecutor(1);
_executor=new LocalExecutor(findExecutor()); _executor=new LocalExecutor(findExecutor());
} }
@ -119,7 +118,6 @@ public class LocalHttpConnector extends HttpConnector
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
super.doStop(); super.doStop();
_timer.shutdownNow();
_executor=null; _executor=null;
} }
@ -175,9 +173,8 @@ public class LocalHttpConnector extends HttpConnector
public LocalEndPoint() public LocalEndPoint()
{ {
super(_timer); super(getScheduler(), LocalHttpConnector.this.getIdleTimeout());
setGrowOutput(true); setGrowOutput(true);
setIdleTimeout(LocalHttpConnector.this.getIdleTimeout());
} }
public void addInput(String s) public void addInput(String s)

View File

@ -50,8 +50,6 @@ import org.eclipse.jetty.server.Connector.NetConnector;
* thus if possible it should be read after the continuation or saved as a request attribute or as the * thus if possible it should be read after the continuation or saved as a request attribute or as the
* associated object of the Continuation instance. * associated object of the Continuation instance.
* </p> * </p>
*
* @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
*/ */
public class SelectChannelConnector extends HttpConnector implements NetConnector public class SelectChannelConnector extends HttpConnector implements NetConnector
{ {
@ -96,22 +94,15 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto
} }
@Override @Override
public void close() public void close() throws IOException
{ {
synchronized(this) synchronized(this)
{ {
if (_acceptChannel != null) if (_acceptChannel != null)
{ {
removeBean(_acceptChannel); removeBean(_acceptChannel);
try if (_acceptChannel.isOpen())
{ _acceptChannel.close();
if (_acceptChannel.isOpen())
_acceptChannel.close();
}
catch(IOException e)
{
LOG.warn(e);
}
} }
_acceptChannel = null; _acceptChannel = null;
_localPort=-2; _localPort=-2;
@ -182,7 +173,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{ {
return new SelectChannelEndPoint(channel,selectSet,key, this._idleTimeout); return new SelectChannelEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout());
} }
protected void endPointClosed(AsyncEndPoint endpoint) protected void endPointClosed(AsyncEndPoint endpoint)

View File

@ -55,7 +55,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException
{ {
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getIdleTimeout(), listeners); NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
endPoint.notifyOpened(); endPoint.notifyOpened();
return endPoint; return endPoint;
} }

View File

@ -12,11 +12,9 @@ package org.eclipse.jetty.server;
//You may elect to redistribute this code under either of these licenses. //You may elect to redistribute this code under either of these licenses.
//======================================================================== //========================================================================
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Timer; import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -26,6 +24,8 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class HttpWriterTest public class HttpWriterTest
{ {
private HttpWriter _writer; private HttpWriter _writer;
@ -94,13 +94,13 @@ public class HttpWriterTest
} }
@Override @Override
public Timer getTimer() public ScheduledExecutorService getScheduler()
{ {
return null; return null;
} }
}; };
HttpOutput httpOut = new HttpOutput(channel); HttpOutput httpOut = new HttpOutput(channel);
_writer = new HttpWriter(httpOut); _writer = new HttpWriter(httpOut);
} }
@ -120,7 +120,7 @@ public class HttpWriterTest
_writer.write("How now \uFF22rown cow"); _writer.write("How now \uFF22rown cow");
assertArrayEquals("How now \uFF22rown cow".getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes)); assertArrayEquals("How now \uFF22rown cow".getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes));
} }
@Test @Test
public void testNotCESU8() throws Exception public void testNotCESU8() throws Exception
{ {
@ -130,11 +130,11 @@ public class HttpWriterTest
assertEquals("787878F0909080787878",TypeUtil.toHexString(BufferUtil.toArray(_bytes))); assertEquals("787878F0909080787878",TypeUtil.toHexString(BufferUtil.toArray(_bytes)));
assertArrayEquals(data.getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes)); assertArrayEquals(data.getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes));
assertEquals(3+4+3,_bytes.remaining()); assertEquals(3+4+3,_bytes.remaining());
Utf8StringBuilder buf = new Utf8StringBuilder(); Utf8StringBuilder buf = new Utf8StringBuilder();
buf.append(BufferUtil.toArray(_bytes),0,_bytes.remaining()); buf.append(BufferUtil.toArray(_bytes),0,_bytes.remaining());
assertEquals(data,buf.toString()); assertEquals(data,buf.toString());
} }
@Test @Test
@ -203,7 +203,7 @@ public class HttpWriterTest
final String singleByteStr = "a"; final String singleByteStr = "a";
int remainSize = 1; int remainSize = 1;
final String multiByteDuplicateStr = "\uD842\uDF9F"; final String multiByteDuplicateStr = "\uD842\uDF9F";
int adjustSize = -1; int adjustSize = -1;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -233,7 +233,7 @@ public class HttpWriterTest
assertArrayEquals(bytes,BufferUtil.toArray(_bytes)); assertArrayEquals(bytes,BufferUtil.toArray(_bytes));
assertArrayEquals(baos.toByteArray(),BufferUtil.toArray(_bytes)); assertArrayEquals(baos.toByteArray(),BufferUtil.toArray(_bytes));
} }
@Test @Test
public void testMultiByteOverflowUTF16x2_2() throws Exception public void testMultiByteOverflowUTF16x2_2() throws Exception
{ {
@ -241,8 +241,8 @@ public class HttpWriterTest
final String singleByteStr = "a"; final String singleByteStr = "a";
int remainSize = 1; int remainSize = 1;
final String multiByteDuplicateStr = "\uD842\uDF9F"; final String multiByteDuplicateStr = "\uD842\uDF9F";
int adjustSize = -2; int adjustSize = -2;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 0; i < HttpWriter.MAX_OUTPUT_CHARS + adjustSize; i++) for (int i = 0; i < HttpWriter.MAX_OUTPUT_CHARS + adjustSize; i++)

View File

@ -13,11 +13,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.LineNumberReader; import java.io.LineNumberReader;
@ -26,11 +21,9 @@ import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.Locale; import java.util.Locale;
import java.util.Timer;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.Cookie; import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -52,6 +45,11 @@ import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* *
*/ */
@ -71,8 +69,8 @@ public class ResponseTest
_server.setHandler(new DumpHandler()); _server.setHandler(new DumpHandler());
_server.start(); _server.start();
_timer=new ScheduledThreadPoolExecutor(1); _timer=new ScheduledThreadPoolExecutor(1);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,5000);
HttpInput input = new HttpInput(); HttpInput input = new HttpInput();
AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor() AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor()
{ {
@ -114,7 +112,7 @@ public class ResponseTest
} }
@Override @Override
public Timer getTimer() public ScheduledExecutorService getScheduler()
{ {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;

View File

@ -13,9 +13,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -23,7 +20,6 @@ import java.io.PrintWriter;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -40,12 +36,15 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SelectChannelStatisticsTest public class SelectChannelStatisticsTest
{ {
private static final Logger LOG = Log.getLogger(SelectChannelStatisticsTest.class); private static final Logger LOG = Log.getLogger(SelectChannelStatisticsTest.class);
private static Server _server; private static Server _server;
private static AbstractConnector _connector; private static AbstractNetConnector _connector;
private static CyclicBarrier _connect; private static CyclicBarrier _connect;
private static CountDownLatch _closed; private static CountDownLatch _closed;
@ -132,7 +131,7 @@ public class SelectChannelStatisticsTest
{ {
_connector.getStatistics().start(); _connector.getStatistics().start();
} }
@After @After
public void tini() throws Exception public void tini() throws Exception
{ {

View File

@ -1,4 +1,16 @@
package org.eclipse.jetty.util.annotation; package org.eclipse.jetty.util.annotation;
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
import java.lang.annotation.Documented; import java.lang.annotation.Documented;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;

View File

@ -41,53 +41,55 @@ import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
public class WebSocketClientFactory extends AggregateLifeCycle public class WebSocketClientFactory extends AggregateLifeCycle
{ {
private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class); private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class);
/**
* Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
*/
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<>(); private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool(); private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final Executor executor; private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketClientSelectorManager selector; private final WebSocketClientSelectorManager selector;
private final EventMethodsCache methodsCache; private final EventMethodsCache methodsCache;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
public WebSocketClientFactory() public WebSocketClientFactory()
{ {
this(new QueuedThreadPool(),null); this(new QueuedThreadPool());
} }
public WebSocketClientFactory(Executor threadPool) public WebSocketClientFactory(Executor threadPool)
{ {
this(threadPool,null); this(threadPool, Executors.newSingleThreadScheduledExecutor());
}
public WebSocketClientFactory(Executor executor, SslContextFactory sslContextFactory)
{
if (executor == null)
{
throw new IllegalArgumentException("Executor is required");
}
this.executor = executor;
addBean(executor);
if (sslContextFactory != null)
{
addBean(sslContextFactory);
}
this.policy = WebSocketPolicy.newClientPolicy();
selector = new WebSocketClientSelectorManager(bufferPool,executor,policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
this.methodsCache = new EventMethodsCache();
} }
public WebSocketClientFactory(SslContextFactory sslContextFactory) public WebSocketClientFactory(SslContextFactory sslContextFactory)
{ {
this(null,sslContextFactory); this(new QueuedThreadPool(), Executors.newSingleThreadScheduledExecutor(), sslContextFactory);
}
public WebSocketClientFactory(Executor threadPool, ScheduledExecutorService scheduler)
{
this(threadPool, scheduler, null);
}
public WebSocketClientFactory(Executor executor, ScheduledExecutorService scheduler, SslContextFactory sslContextFactory)
{
if (executor == null)
throw new IllegalArgumentException("Executor is required");
this.executor = executor;
addBean(executor);
if (scheduler == null)
throw new IllegalArgumentException("Scheduler is required");
this.scheduler = scheduler;
if (sslContextFactory != null)
addBean(sslContextFactory);
this.policy = WebSocketPolicy.newClientPolicy();
selector = new WebSocketClientSelectorManager(bufferPool, executor, scheduler, policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
this.methodsCache = new EventMethodsCache();
} }
private void closeConnections() private void closeConnections()
@ -150,6 +152,6 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketEventDriver newWebSocketDriver(Object websocketPojo) public WebSocketEventDriver newWebSocketDriver(Object websocketPojo)
{ {
return new WebSocketEventDriver(websocketPojo,methodsCache,policy,getBufferPool()); return new WebSocketEventDriver(websocketPojo, methodsCache, policy, getBufferPool());
} }
} }

View File

@ -39,16 +39,18 @@ import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
public class WebSocketClientSelectorManager extends SelectorManager public class WebSocketClientSelectorManager extends SelectorManager
{ {
private SslContextFactory sslContextFactory;
private final Executor executor; private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private SslContextFactory sslContextFactory;
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, WebSocketPolicy policy) public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy)
{ {
super(); super();
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.executor = executor; this.executor = executor;
this.scheduler = scheduler;
this.policy = policy; this.policy = policy;
} }
@ -135,7 +137,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{ {
return new SelectChannelEndPoint(channel,selectSet, selectionKey, policy.getIdleTimeout()); return new SelectChannelEndPoint(channel,selectSet, selectionKey, scheduler, policy.getIdleTimeout());
} }
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel) public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)

View File

@ -57,30 +57,30 @@
</Item> </Item>
<Item> <Item>
<New id="vcontexts" class="org.eclipse.jetty.server.handler.ContextHandler"> <New id="vcontexts" class="org.eclipse.jetty.server.handler.ContextHandler">
<Set name="contextPath">/tests</Set> <Set name="contextPath">/tests</Set>
<Set name="VirtualHosts"> <Set name="VirtualHosts">
<Array type="java.lang.String"> <Array type="java.lang.String">
<Item>VirtualHost</Item> <Item>VirtualHost</Item>
</Array> </Array>
</Set> </Set>
<Set name="ResourceBase"><Property name="test.docroot.base"/>/virtualhost</Set> <Set name="ResourceBase"><Property name="test.docroot.base"/>/virtualhost</Set>
<Set name="Handler"><New id="reshandler" class="org.eclipse.jetty.server.handler.ResourceHandler"/></Set> <Set name="Handler"><New id="reshandler" class="org.eclipse.jetty.server.handler.ResourceHandler"/></Set>
<Set name="DisplayName">virtual</Set> <Set name="DisplayName">virtual</Set>
</New> </New>
</Item> </Item>
<Item> <Item>
<New id="defcontext" class="org.eclipse.jetty.server.handler.ContextHandler"> <New id="defcontext" class="org.eclipse.jetty.server.handler.ContextHandler">
<Set name="contextPath">/tests</Set> <Set name="contextPath">/tests</Set>
<Set name="ResourceBase"><Property name="test.docroot.base"/>/default</Set> <Set name="ResourceBase"><Property name="test.docroot.base"/>/default</Set>
<Set name="Handler"><New id="reshandler" class="org.eclipse.jetty.server.handler.ResourceHandler"/></Set> <Set name="Handler"><New id="reshandler" class="org.eclipse.jetty.server.handler.ResourceHandler"/></Set>
<Set name="DisplayName">default</Set> <Set name="DisplayName">default</Set>
</New> </New>
</Item> </Item>
<Item> <Item>
<New id="echocontext" class="org.eclipse.jetty.server.handler.ContextHandler"> <New id="echocontext" class="org.eclipse.jetty.server.handler.ContextHandler">
<Set name="contextPath">/echo</Set> <Set name="contextPath">/echo</Set>
<Set name="Handler"><New id="echohandler" class="org.eclipse.jetty.test.support.EchoHandler"/></Set> <Set name="Handler"><New id="echohandler" class="org.eclipse.jetty.test.support.EchoHandler"/></Set>
<Set name="DisplayName">echo</Set> <Set name="DisplayName">echo</Set>
</New> </New>
</Item> </Item>
</Array> </Array>
@ -88,20 +88,20 @@
</New> </New>
</Set> </Set>
<Call name="addLifeCycle"> <Call name="addBean">
<Arg> <Arg>
<New class="org.eclipse.jetty.deploy.ContextDeployer"> <New class="org.eclipse.jetty.deploy.ContextDeployer">
<Set name="contexts"><Ref id="WebappContexts"/></Set> <Set name="contexts"><Ref id="WebappContexts"/></Set>
<Set name="configurationDir"><Property name="test.resourcesdir" default="src/test/resources"/>/webapp-contexts/RFC2616</Set> <Set name="configurationDir"><Property name="test.resourcesdir" default="src/test/resources"/>/webapp-contexts/RFC2616</Set>
<Set name="scanInterval">0</Set> <Set name="scanInterval">0</Set>
<Set name="configurationManager"> <Set name="configurationManager">
<New class="org.eclipse.jetty.deploy.FileConfigurationManager"> <New class="org.eclipse.jetty.deploy.FileConfigurationManager">
<Set name="file"><Property name="test.targetdir" default="target"/>/testable-jetty-server-config.properties</Set> <Set name="file"><Property name="test.targetdir" default="target"/>/testable-jetty-server-config.properties</Set>
</New> </New>
</Set> </Set>
</New> </New>
</Arg> </Arg>
</Call> </Call>
<!-- =========================================================== --> <!-- =========================================================== -->
<!-- Configure the webapp deployer. --> <!-- Configure the webapp deployer. -->
@ -122,9 +122,9 @@
<New class="org.eclipse.jetty.deploy.WebAppDeployer"> <New class="org.eclipse.jetty.deploy.WebAppDeployer">
<Set name="contexts"><Ref id="WebappContexts"/></Set> <Set name="contexts"><Ref id="WebappContexts"/></Set>
<Set name="webAppDir"><Property name="test.targetdir" default="target"/>/webapps</Set> <Set name="webAppDir"><Property name="test.targetdir" default="target"/>/webapps</Set>
<Set name="parentLoaderPriority">false</Set> <Set name="parentLoaderPriority">false</Set>
<Set name="extract">true</Set> <Set name="extract">true</Set>
<Set name="allowDuplicates">false</Set> <Set name="allowDuplicates">false</Set>
<Set name="defaultsDescriptor"><Property name="test.resourcesdir" default="src/test/resources"/>/webdefault.xml</Set> <Set name="defaultsDescriptor"><Property name="test.resourcesdir" default="src/test/resources"/>/webdefault.xml</Set>
<Call name="setAttribute"> <Call name="setAttribute">
<Arg>org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern</Arg> <Arg>org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern</Arg>