jetty-9 many name and javadoc cleanups
This commit is contained in:
parent
792509d459
commit
903e4cd0f2
|
@ -9,6 +9,15 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** A convenience base implementation of {@link AsyncConnection}.
|
||||
* <p>
|
||||
* This class uses the capabilities of the {@link AsyncEndPoint} API to provide a
|
||||
* more traditional style of async reading. A call to {@link #readInterested()}
|
||||
* will schedule a callback to {@link #onReadable()} or {@link #onReadFail(Throwable)}
|
||||
* as appropriate.
|
||||
*/
|
||||
public abstract class AbstractAsyncConnection implements AsyncConnection
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(AbstractAsyncConnection.class);
|
||||
|
@ -59,10 +68,34 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
|
|||
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Call to register read interest.
|
||||
* After this call, {@link #onReadable()} or {@link #onReadFail(Throwable)}
|
||||
* will be called back as appropriate.
|
||||
*/
|
||||
public void readInterested()
|
||||
{
|
||||
if (_readInterested.compareAndSet(false,true))
|
||||
getEndPoint().readable(null,_readCallback);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public abstract void onReadable();
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void onReadFail(Throwable cause)
|
||||
{
|
||||
LOG.debug("{} onReadFailed {}",this,cause);
|
||||
if (_endp.isOpen())
|
||||
{
|
||||
if (_endp.isOutputShutdown())
|
||||
_endp.close();
|
||||
else
|
||||
_endp.shutdownOutput();
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public void onOpen()
|
||||
|
@ -85,26 +118,6 @@ public abstract class AbstractAsyncConnection implements AsyncConnection
|
|||
return _endp;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void scheduleOnReadable()
|
||||
{
|
||||
if (_readInterested.compareAndSet(false,true))
|
||||
getEndPoint().readable(null,_readCallback);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void onReadFail(Throwable cause)
|
||||
{
|
||||
LOG.debug("{} onReadFailed {}",this,cause);
|
||||
if (_endp.isOpen())
|
||||
{
|
||||
if (_endp.isOutputShutdown())
|
||||
_endp.close();
|
||||
else
|
||||
_endp.shutdownOutput();
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public String toString()
|
||||
|
|
|
@ -25,7 +25,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
|
|||
private final ReadInterest _readInterest = new ReadInterest()
|
||||
{
|
||||
@Override
|
||||
protected boolean readInterested() throws IOException
|
||||
protected boolean registerReadInterest() throws IOException
|
||||
{
|
||||
if (_closed)
|
||||
throw new ClosedChannelException();
|
||||
|
@ -36,7 +36,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
|
|||
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
|
||||
{
|
||||
@Override
|
||||
protected boolean canFlush()
|
||||
protected boolean registerFlushInterest()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
|
|||
@Override
|
||||
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
|
||||
{
|
||||
_readInterest.registerInterest(context,callback);
|
||||
_readInterest.register(context,callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -11,7 +11,8 @@ import org.eclipse.jetty.util.Callback;
|
|||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* A Utility class to help implement {@link AsyncEndPoint#readable(Object, Callback)}
|
||||
* by keeping and calling the context and callback objects.
|
||||
* by keeping state and calling the context and callback objects.
|
||||
*
|
||||
*/
|
||||
public abstract class ReadInterest
|
||||
{
|
||||
|
@ -26,13 +27,13 @@ public abstract class ReadInterest
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Call to register interest in a callback when a read is possible.
|
||||
* The callback will be called either immediately if {@link #readInterested()}
|
||||
* The callback will be called either immediately if {@link #registerReadInterest()}
|
||||
* returns true or eventually once {@link #readable()} is called.
|
||||
* @param context
|
||||
* @param callback
|
||||
* @throws ReadPendingException
|
||||
*/
|
||||
public <C> void registerInterest(C context, Callback<C> callback) throws ReadPendingException
|
||||
public <C> void register(C context, Callback<C> callback) throws ReadPendingException
|
||||
{
|
||||
if (!_interested.compareAndSet(false,true))
|
||||
throw new ReadPendingException();
|
||||
|
@ -40,7 +41,7 @@ public abstract class ReadInterest
|
|||
_callback=callback;
|
||||
try
|
||||
{
|
||||
if (readInterested())
|
||||
if (registerReadInterest())
|
||||
readable();
|
||||
}
|
||||
catch(IOException e)
|
||||
|
@ -108,13 +109,14 @@ public abstract class ReadInterest
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Is a read interest satisfied now?
|
||||
/** Register the read interest
|
||||
* Abstract method to be implemented by the Specific ReadInterest to
|
||||
* enquire if a read is immediately possible
|
||||
* enquire if a read is immediately possible and if not to schedule a future
|
||||
* call to {@link #readable()} or {@link #failed(Throwable)}
|
||||
* @return true if a read is possible
|
||||
* @throws IOException
|
||||
*/
|
||||
abstract protected boolean readInterested() throws IOException;
|
||||
abstract protected boolean registerReadInterest() throws IOException;
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -27,13 +27,13 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* An Endpoint that can be scheduled by {@link SelectorManager}.
|
||||
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
|
||||
*/
|
||||
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.SelectableAsyncEndPoint
|
||||
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.Selectable
|
||||
{
|
||||
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
|
||||
|
||||
private final SelectorManager.ManagedSelector _selectSet;
|
||||
private final SelectorManager.ManagedSelector _selector;
|
||||
private final SelectorManager _manager;
|
||||
|
||||
private SelectionKey _key;
|
||||
|
@ -44,26 +44,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
/** The desired value for {@link SelectionKey#interestOps()} */
|
||||
private int _interestOps;
|
||||
|
||||
/** true if {@link ManagedSelector#destroyEndPoint(SelectorManager.SelectableAsyncEndPoint)} has not been called */
|
||||
/** true if {@link ManagedSelector#destroyEndPoint(SelectorManager.Selectable)} has not been called */
|
||||
private boolean _open;
|
||||
|
||||
private volatile AsyncConnection _connection;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private final ReadInterest _readInterest = new ReadInterest()
|
||||
{
|
||||
@Override
|
||||
protected boolean readInterested()
|
||||
protected boolean registerReadInterest()
|
||||
{
|
||||
_interestOps=_interestOps | SelectionKey.OP_READ;
|
||||
updateKey();
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
|
||||
{
|
||||
@Override
|
||||
protected boolean canFlush()
|
||||
protected boolean registerFlushInterest()
|
||||
{
|
||||
_interestOps = _interestOps | SelectionKey.OP_WRITE;
|
||||
updateKey();
|
||||
|
@ -76,13 +78,27 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
{
|
||||
super(channel);
|
||||
_manager = selectSet.getManager();
|
||||
_selectSet = selectSet;
|
||||
_selector = selectSet;
|
||||
_open = true;
|
||||
_key = key;
|
||||
|
||||
setMaxIdleTime(maxIdleTime);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
|
||||
{
|
||||
_readInterest.register(context,callback);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
|
||||
{
|
||||
_writeFlusher.write(context,callback,buffers);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public AsyncConnection getAsyncConnection()
|
||||
|
@ -141,7 +157,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
}
|
||||
finally
|
||||
{
|
||||
_selectSet.submit(this);
|
||||
_selector.submit(this);
|
||||
_selected = false;
|
||||
}
|
||||
}
|
||||
|
@ -181,19 +197,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
|
||||
{
|
||||
_readInterest.registerInterest(context,callback);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
|
||||
{
|
||||
_writeFlusher.write(context,callback,buffers);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
|
@ -229,7 +232,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
if (_interestOps != current_ops && !_changing)
|
||||
{
|
||||
_changing = true;
|
||||
_selectSet.submit(this);
|
||||
_selector.submit(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -266,7 +269,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
{
|
||||
try
|
||||
{
|
||||
_key = ((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
|
||||
_key = ((SelectableChannel)getChannel()).register(_selector.getSelector(),_interestOps,this);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -278,7 +281,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
|
||||
if (_open)
|
||||
{
|
||||
_selectSet.destroyEndPoint(this);
|
||||
_selector.destroyEndPoint(this);
|
||||
}
|
||||
_open = false;
|
||||
_key = null;
|
||||
|
@ -306,7 +309,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
if (_open)
|
||||
{
|
||||
_open = false;
|
||||
_selectSet.destroyEndPoint(this);
|
||||
_selector.destroyEndPoint(this);
|
||||
}
|
||||
_key = null;
|
||||
}
|
||||
|
@ -368,7 +371,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
|
|||
/* ------------------------------------------------------------ */
|
||||
public ManagedSelector getSelectSet()
|
||||
{
|
||||
return _selectSet;
|
||||
return _selector;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -168,7 +168,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
* @return the new endpoint {@link SelectChannelEndPoint}
|
||||
* @throws IOException if the endPoint cannot be created
|
||||
*/
|
||||
protected abstract SelectableAsyncEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey sKey) throws IOException;
|
||||
protected abstract Selectable newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey sKey) throws IOException;
|
||||
|
||||
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
|
||||
{
|
||||
|
@ -216,7 +216,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
|
||||
{
|
||||
private final ConcurrentLinkedQueue<Runnable> _changes = new ConcurrentLinkedQueue<>();
|
||||
private ConcurrentMap<SelectableAsyncEndPoint,Object> _endPoints = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<Selectable,Object> _endPoints = new ConcurrentHashMap<>();
|
||||
private final int _id;
|
||||
private Selector _selector;
|
||||
private Thread _thread;
|
||||
|
@ -384,10 +384,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
try
|
||||
{
|
||||
Object att = key.attachment();
|
||||
if (att instanceof SelectableAsyncEndPoint)
|
||||
if (att instanceof Selectable)
|
||||
{
|
||||
if (key.isReadable() || key.isWritable())
|
||||
((SelectableAsyncEndPoint)att).onSelected();
|
||||
((Selectable)att).onSelected();
|
||||
}
|
||||
else if (key.isConnectable())
|
||||
{
|
||||
|
@ -435,7 +435,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
|
||||
{
|
||||
SelectableAsyncEndPoint endp = newEndPoint(channel, this, sKey);
|
||||
Selectable endp = newEndPoint(channel, this, sKey);
|
||||
_endPoints.put(endp, this);
|
||||
LOG.debug("Created {}", endp);
|
||||
endPointOpened(endp);
|
||||
|
@ -443,7 +443,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
}
|
||||
|
||||
|
||||
public void destroyEndPoint(SelectableAsyncEndPoint endp)
|
||||
public void destroyEndPoint(Selectable endp)
|
||||
{
|
||||
LOG.debug("Destroyed {}", endp);
|
||||
_endPoints.remove(endp);
|
||||
|
@ -521,7 +521,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private void timeoutCheck()
|
||||
{
|
||||
long now = System.currentTimeMillis();
|
||||
for (SelectableAsyncEndPoint endPoint : _endPoints.keySet())
|
||||
for (Selectable endPoint : _endPoints.keySet())
|
||||
endPoint.checkReadWriteTimeout(now);
|
||||
}
|
||||
|
||||
|
@ -675,7 +675,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
|
||||
// TODO review this interface
|
||||
public interface SelectableAsyncEndPoint extends AsyncEndPoint
|
||||
public interface Selectable extends AsyncEndPoint
|
||||
{
|
||||
void onSelected();
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import org.eclipse.jetty.util.Callback;
|
|||
/**
|
||||
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
|
||||
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
|
||||
* The abstract method {@link #canFlush()} is called when not all content has been
|
||||
* The abstract method {@link #registerFlushInterest()} is called when not all content has been
|
||||
* written after a call to flush and should organise for the {@link #completeWrite()}
|
||||
* method to be called when a subsequent call to flush should be able to make more progress.
|
||||
*
|
||||
|
@ -52,7 +52,7 @@ abstract public class WriteFlusher
|
|||
_buffers=buffers;
|
||||
_context=context;
|
||||
_callback=callback;
|
||||
if(canFlush())
|
||||
if(registerFlushInterest())
|
||||
completeWrite();
|
||||
else
|
||||
_writing.set(true); // Needed as memory barrier
|
||||
|
@ -75,7 +75,13 @@ abstract public class WriteFlusher
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
abstract protected boolean canFlush();
|
||||
/**
|
||||
* Abstract call to be implemented by specific WriteFlushers. Will return true if a
|
||||
* flush is immediately possible, otherwise it will schedule a call to {@link #completeWrite()} or
|
||||
* {@link #failed(Throwable)} when appropriate.
|
||||
* @return true if a flush can proceed.
|
||||
*/
|
||||
abstract protected boolean registerFlushInterest();
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -103,7 +109,7 @@ abstract public class WriteFlusher
|
|||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Complete a write that has not completed and that called
|
||||
* {@link #canFlush()} to request a call to this
|
||||
* {@link #registerFlushInterest()} to request a call to this
|
||||
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
|
||||
* is likely to be able to progress.
|
||||
* @return true if a write was in progress
|
||||
|
@ -125,7 +131,7 @@ abstract public class WriteFlusher
|
|||
{
|
||||
if (b.hasRemaining())
|
||||
{
|
||||
if (canFlush())
|
||||
if (registerFlushInterest())
|
||||
continue retry;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -229,7 +229,7 @@ public class SslConnection extends AbstractAsyncConnection
|
|||
private final ReadInterest _readInterest = new ReadInterest()
|
||||
{
|
||||
@Override
|
||||
protected boolean readInterested() throws IOException
|
||||
protected boolean registerReadInterest() throws IOException
|
||||
{
|
||||
synchronized (SslEndPoint.this)
|
||||
{
|
||||
|
@ -262,7 +262,7 @@ public class SslConnection extends AbstractAsyncConnection
|
|||
}
|
||||
else
|
||||
// Normal readable callback
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -272,7 +272,7 @@ public class SslConnection extends AbstractAsyncConnection
|
|||
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
|
||||
{
|
||||
@Override
|
||||
protected boolean canFlush()
|
||||
protected boolean registerFlushInterest()
|
||||
{
|
||||
synchronized (SslEndPoint.this)
|
||||
{
|
||||
|
@ -285,7 +285,7 @@ public class SslConnection extends AbstractAsyncConnection
|
|||
}
|
||||
else if (_sslEngine.getHandshakeStatus()==HandshakeStatus.NEED_UNWRAP )
|
||||
// we are actually read blocked in order to write
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
else
|
||||
{
|
||||
// try the flush again
|
||||
|
@ -309,7 +309,7 @@ public class SslConnection extends AbstractAsyncConnection
|
|||
@Override
|
||||
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
|
||||
{
|
||||
_readInterest.registerInterest(context,callback);
|
||||
_readInterest.register(context,callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -129,7 +129,7 @@ public class SelectChannelEndPointTest
|
|||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -210,7 +210,7 @@ public class SelectChannelEndPointTest
|
|||
finally
|
||||
{
|
||||
if (_endp.isOpen())
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ public class SslConnectionTest
|
|||
public void onOpen()
|
||||
{
|
||||
// System.err.println("onOpen");
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -201,7 +201,7 @@ public class SslConnectionTest
|
|||
finally
|
||||
{
|
||||
if (endp.isOpen())
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* A Connection that handles the HTTP protocol
|
||||
*/
|
||||
public class HttpConnection extends AbstractAsyncConnection
|
||||
{
|
||||
|
@ -181,7 +182,7 @@ public class HttpConnection extends AbstractAsyncConnection
|
|||
{
|
||||
LOG.debug("Opened HTTP Connection {}",this);
|
||||
super.onOpen();
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -195,26 +196,26 @@ public class HttpConnection extends AbstractAsyncConnection
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* {@link #scheduleOnReadable()}
|
||||
* @see org.eclipse.jetty.io.AbstractAsyncConnection#onReadable()
|
||||
/** Parse and handle HTTP messages.
|
||||
* <p>
|
||||
* This method is normally called as the {@link AbstractAsyncConnection} onReadable callback.
|
||||
* However, it can also be called {@link HttpChannelOverHttp#completed()} if there is unconsumed
|
||||
* data in the _requestBuffer, as a result of resuming a suspended request when there is a pipelined
|
||||
* request already read into the buffer.
|
||||
* <p>
|
||||
* This method will fill data and parse it until either: EOF is filled; 0 bytes are filled;
|
||||
* the HttpChannel becomes !idle; or the connection has been changed
|
||||
*/
|
||||
@Override
|
||||
public synchronized void onReadable()
|
||||
{
|
||||
LOG.debug("{} onReadable {}",this,_channel.isIdle());
|
||||
|
||||
// This method is normally called as callback passed to
|
||||
// EndPoint.readable() by scheduleOnReadable. However, it can also be called
|
||||
// by HttpChannel.completed() if there is unconsumed data in the _requestBuffer, as a result of
|
||||
// resuming a suspending a request when there is a pipelined request already read into the buffer.
|
||||
//
|
||||
// This method will fill data and parse it until either: EOF is filled; 0 bytes are filled;
|
||||
// the HttpChannel becomes !idle; or the connection has been changed
|
||||
try
|
||||
{
|
||||
setCurrentConnection(this);
|
||||
|
||||
// TODO try to generalize this loop into AbstractAsyncConnection
|
||||
while (true)
|
||||
{
|
||||
// Fill the request buffer with data only if it is totally empty.
|
||||
|
@ -226,14 +227,12 @@ public class HttpConnection extends AbstractAsyncConnection
|
|||
int filled=getEndPoint().fill(_requestBuffer);
|
||||
|
||||
LOG.debug("{} filled {}",this,filled);
|
||||
|
||||
// TODO protect against large/infinite headers as denial of service
|
||||
|
||||
|
||||
// If we failed to fill
|
||||
if (filled==0)
|
||||
{
|
||||
// Somebody wanted to read, we didn't so schedule another attempt
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
releaseRequestBuffer();
|
||||
return;
|
||||
}
|
||||
|
@ -431,7 +430,7 @@ public class HttpConnection extends AbstractAsyncConnection
|
|||
{
|
||||
// it wants to eat more
|
||||
if (_requestBuffer==null)
|
||||
scheduleOnReadable();
|
||||
readInterested();
|
||||
else if (getConnector().isStarted())
|
||||
{
|
||||
LOG.debug("{} pipelined",this);
|
||||
|
|
|
@ -85,7 +85,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
|||
public QueuedThreadPool(int maxThreads, int minThreads, int maxIdleTimeMs)
|
||||
{
|
||||
_name="qtp"+super.hashCode();
|
||||
setMaxThreads(minThreads);
|
||||
setMinThreads(minThreads);
|
||||
setMaxThreads(maxThreads);
|
||||
setMaxIdleTimeMs(maxIdleTimeMs);
|
||||
}
|
||||
|
|
12
pom.xml
12
pom.xml
|
@ -351,8 +351,13 @@
|
|||
<module>jetty-servlet</module>
|
||||
<module>jetty-webapp</module>
|
||||
<module>jetty-deploy</module>
|
||||
<!--
|
||||
|
||||
<module>jetty-websocket</module>
|
||||
|
||||
<module>test-continuation</module>
|
||||
<module>test-jetty-webapp</module>
|
||||
<module>example-jetty-embedded</module>
|
||||
<!--
|
||||
<module>jetty-jaspi</module>
|
||||
<module>jetty-client</module>
|
||||
<module>jetty-servlets</module>
|
||||
|
@ -371,11 +376,8 @@
|
|||
<module>jetty-http-spi</module>
|
||||
<module>jetty-jsp</module>
|
||||
<module>jetty-distribution</module>
|
||||
<module>test-continuation</module>
|
||||
<module>test-jetty-servlet</module>
|
||||
<module>test-jetty-webapp</module>
|
||||
<module>test-jetty-nested</module>
|
||||
<module>example-jetty-embedded</module>
|
||||
<module>test-jetty-servlet</module>
|
||||
<module>example-async-rest</module>
|
||||
<module>tests</module>
|
||||
-->
|
||||
|
|
|
@ -22,16 +22,17 @@
|
|||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.toolchain</groupId>
|
||||
<artifactId>jetty-test-helper</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -13,6 +13,12 @@
|
|||
|
||||
package org.eclipse.jetty.continuation.test;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.Socket;
|
||||
|
@ -24,15 +30,12 @@ import javax.servlet.http.HttpServlet;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationListener;
|
||||
import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||
|
||||
|
||||
|
||||
public abstract class ContinuationBase extends TestCase
|
||||
public abstract class ContinuationBase
|
||||
{
|
||||
protected SuspendServlet _servlet=new SuspendServlet();
|
||||
protected int _port;
|
||||
|
@ -206,22 +209,14 @@ public abstract class ContinuationBase extends TestCase
|
|||
|
||||
protected void assertContains(String content,String response)
|
||||
{
|
||||
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
|
||||
if (response.indexOf(content,15)<0)
|
||||
{
|
||||
System.err.println("'"+content+"' NOT IN:\n"+response+"\n--");
|
||||
assertTrue(false);
|
||||
}
|
||||
assertThat(response,startsWith("HTTP/1.1 200 OK"));
|
||||
assertThat(response,containsString(content));
|
||||
}
|
||||
|
||||
protected void assertNotContains(String content,String response)
|
||||
{
|
||||
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
|
||||
if (response.indexOf(content,15)>=0)
|
||||
{
|
||||
System.err.println("'"+content+"' IS IN:\n"+response+"'\n--");
|
||||
assertTrue(false);
|
||||
}
|
||||
assertThat(response,startsWith("HTTP/1.1 200 OK"));
|
||||
assertThat(response,not(containsString(content)));
|
||||
}
|
||||
|
||||
public synchronized String process(String query,String content) throws Exception
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
|
|||
import org.eclipse.jetty.servlet.ServletHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
|
||||
|
@ -36,8 +39,8 @@ public class ContinuationTest extends ContinuationBase
|
|||
protected SelectChannelConnector _connector;
|
||||
FilterHolder _filter;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
_connector = new SelectChannelConnector();
|
||||
_server.setConnectors(new Connector[]{ _connector });
|
||||
|
@ -50,101 +53,117 @@ public class ContinuationTest extends ContinuationBase
|
|||
|
||||
_server.start();
|
||||
_port=_connector.getLocalPort();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
_server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContinuation() throws Exception
|
||||
{
|
||||
doNormal("AsyncContinuation");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSleep() throws Exception
|
||||
{
|
||||
doSleep();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspend() throws Exception
|
||||
{
|
||||
doSuspend();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendWaitResume() throws Exception
|
||||
{
|
||||
doSuspendWaitResume();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendResume() throws Exception
|
||||
{
|
||||
doSuspendResume();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendWaitComplete() throws Exception
|
||||
{
|
||||
doSuspendWaitComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendComplete() throws Exception
|
||||
{
|
||||
doSuspendComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendWaitResumeSuspendWaitResume() throws Exception
|
||||
{
|
||||
doSuspendWaitResumeSuspendWaitResume();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSuspendWaitResumeSuspendComplete() throws Exception
|
||||
{
|
||||
doSuspendWaitResumeSuspendComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendWaitResumeSuspend() throws Exception
|
||||
{
|
||||
doSuspendWaitResumeSuspend();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendTimeoutSuspendResume() throws Exception
|
||||
{
|
||||
doSuspendTimeoutSuspendResume();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendTimeoutSuspendComplete() throws Exception
|
||||
{
|
||||
doSuspendTimeoutSuspendComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendTimeoutSuspend() throws Exception
|
||||
{
|
||||
doSuspendTimeoutSuspend();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendThrowResume() throws Exception
|
||||
{
|
||||
doSuspendThrowResume();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendResumeThrow() throws Exception
|
||||
{
|
||||
doSuspendResumeThrow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendThrowComplete() throws Exception
|
||||
{
|
||||
doSuspendThrowComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendCompleteThrow() throws Exception
|
||||
{
|
||||
doSuspendCompleteThrow();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String toString(InputStream in) throws IOException
|
||||
{
|
||||
return IO.toString(in);
|
||||
|
|
Loading…
Reference in New Issue