Refactored NIO to better handle half closes. Applied the following policy:

Call shutdownOutput to signal the other end that you have written all the data that your are going to write (eg and the end of a non persistent HTTP response).   This can be done either by generator (when it is complete) or coordinator or handle - but we need to decide which and have only 1 doing it.
    Call shutdownInput to signal your own end that you have read -1 and to allow other local code to check that with an isInputShutdown. Ideally we could get by without any calls at all to shutdownInput, so long as we well handle reading -1 (Currently we don't).  This should be done by whatever does the IO read.
    Calling close should always be a real TCP close, even with SSL. SSL shutdown
    The default handling of an idle callback should be close.  But some connections (NOT endpoints) may implement idle as initiating a shutdown exchange (eg websocket close).   If they do, this is state that should be held in the connection or parser - ie do-this-exhange-and-then-shutdown
    Call close when you want to shutdown Output and you have already read -1, so input is already shutdown.   We need to double verify that this is correct and that if a FIN has been received from the other end, that a close will not result in a reset.  I'll do that today.
    Call close when you want to shutdown Input and output has already been shutdown.  This means you have read -1 after having sent a FIN.
    Call close on any errors.

The current state is that server HttpConnection appears to be working well.  Other connection types have not been updated and/or tested
This commit is contained in:
Greg Wilkins 2011-10-18 14:38:02 +11:00
parent e9c398e86b
commit e43b718fb1
58 changed files with 1530 additions and 1055 deletions

View File

@ -133,9 +133,9 @@ public class Ajp13Generator extends AbstractGenerator
/* ------------------------------------------------------------ */
@Override
public void reset(boolean returnBuffers)
public void reset()
{
super.reset(returnBuffers);
super.reset();
_needEOC = false;
_needMore = false;
@ -819,7 +819,7 @@ public class Ajp13Generator extends AbstractGenerator
while (buff.length() > 0);
_buffers.returnBuffer(buff);
reset(true);
reset();
}

View File

@ -148,21 +148,16 @@ public class Ajp13Parser implements Parser
}
/* ------------------------------------------------------------------------------- */
public int parseAvailable() throws IOException
public boolean parseAvailable() throws IOException
{
int len = parseNext();
int total = len > 0 ? len : 0;
boolean progress=parseNext()>0;
// continue parsing
while (!isComplete() && _buffer != null && _buffer.length() > 0)
while (!isComplete() && _buffer!=null && _buffer.length()>0)
{
len = parseNext();
if (len > 0)
total += len;
else
break;
progress |= parseNext()>0;
}
return total;
return progress;
}
/* ------------------------------------------------------------------------------- */
@ -876,6 +871,15 @@ public class Ajp13Parser implements Parser
return _content.length() > 0;
}
}
public boolean isPersistent()
{
return true;
}
public void setPersistent(boolean persistent)
{
LOG.warn("AJP13.setPersistent is not IMPLEMENTED!");
}
}

View File

@ -49,28 +49,26 @@ import org.eclipse.jetty.util.thread.Timeout;
*
* @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
*/
public class HttpConnection extends AbstractConnection implements Dumpable
public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
private HttpDestination _destination;
private HttpGenerator _generator;
private HttpParser _parser;
private boolean _http11 = true;
private int _status;
private Buffer _connectionHeader;
private Buffer _requestContentChunk;
private boolean _requestComplete;
private boolean _reserved;
protected HttpDestination _destination;
protected HttpGenerator _generator;
protected HttpParser _parser;
protected boolean _http11 = true;
protected int _status;
protected Buffer _connectionHeader;
protected boolean _reserved;
// The current exchange waiting for a response
private volatile HttpExchange _exchange;
private HttpExchange _pipeline;
protected volatile HttpExchange _exchange;
protected HttpExchange _pipeline;
private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
private AtomicBoolean _idle = new AtomicBoolean(false);
HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{
super(endp);
@ -161,308 +159,8 @@ public class HttpConnection extends AbstractConnection implements Dumpable
}
}
public Connection handle() throws IOException
{
try
{
int no_progress = 0;
boolean failed = false;
while (_endp.isBufferingInput() || _endp.isOpen())
{
synchronized (this)
{
while (_exchange == null)
{
if (_endp.isBlocking())
{
try
{
this.wait();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
else
{
long filled = _parser.fill();
if (filled < 0)
{
close();
}
else
{
// Hopefully just space?
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
LOG.warn("Unexpected data received but no request sent");
close();
}
}
return this;
}
}
}
try
{
if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
{
no_progress = 0;
commitRequest();
}
long io = 0;
_endp.flush();
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
}
else
{
// Write as much of the request as possible
synchronized (this)
{
if (_exchange == null)
continue;
}
long flushed = _generator.flushBuffer();
io += flushed;
if (!_generator.isComplete())
{
if (_exchange!=null)
{
InputStream in = _exchange.getRequestContentSource();
if (in != null)
{
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
_requestContentChunk = _exchange.getRequestContentChunk();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
flushed = _generator.flushBuffer();
io += flushed;
}
}
else
_generator.complete();
}
else
_generator.complete();
}
}
if (_generator.isComplete() && !_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// If we are not ended then parse available
if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
{
long filled = _parser.parseAvailable();
io += filled;
if (_parser.isIdle() && (_endp.isInputShutdown() || !_endp.isOpen()))
throw new EOFException();
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 1 && !_endp.isBlocking())
{
// SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
{
long flushed = _generator.flushBuffer();
if (flushed>0)
continue;
}
return this;
}
}
catch (Throwable e)
{
LOG.debug("Failure on " + _exchange, e);
if (e instanceof ThreadDeath)
throw (ThreadDeath)e;
failed = true;
synchronized (this)
{
if (_exchange != null)
{
// Cancelling the exchange causes an exception as we close the connection,
// but we don't report it as it is normal cancelling operation
if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
_exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(e);
}
}
else
{
if (e instanceof IOException)
throw (IOException)e;
if (e instanceof Error)
throw (Error)e;
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new RuntimeException(e);
}
}
}
finally
{
boolean complete = false;
boolean close = failed; // always close the connection on error
if (!failed)
{
// are we complete?
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// we need to return the HttpConnection to a state that
// it can be reused or closed out
if (_parser.isComplete())
{
_exchange.cancelTimeout(_destination.getHttpClient());
complete = true;
}
}
// if the endpoint is closed, but the parser incomplete
if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
{
// we wont be called again so let the parser see the close
complete=true;
_parser.parseAvailable();
// TODO should not need this
if (!(_parser.isComplete()||_parser.isIdle()))
{
LOG.warn("Incomplete {} {}",_parser,_endp);
if (_exchange!=null && !_exchange.isDone())
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
}
}
}
}
if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
{
if (_exchange!=null && !_exchange.isDone())
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
}
_endp.close();
}
if (complete || failed)
{
synchronized (this)
{
if (!close)
close = shouldClose();
reset(true);
no_progress = 0;
if (_exchange != null)
{
HttpExchange exchange=_exchange;
_exchange = null;
// Reset the maxIdleTime because it may have been changed
if (!close)
_endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=exchange.onSwitchProtocol(_endp);
if (switched!=null)
{
// switched protocol!
exchange = _pipeline;
_pipeline = null;
if (exchange!=null)
_destination.send(exchange);
return switched;
}
}
if (_pipeline == null)
{
if (!isReserved())
_destination.returnConnection(this, close);
}
else
{
if (close)
{
if (!isReserved())
_destination.returnConnection(this,close);
exchange = _pipeline;
_pipeline = null;
_destination.send(exchange);
}
else
{
exchange = _pipeline;
_pipeline = null;
send(exchange);
}
}
}
}
}
}
}
}
finally
{
_parser.returnBuffers();
// Do we have more stuff to write?
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
{
// Assume we are write blocked!
((AsyncEndPoint)_endp).scheduleWrite();
}
}
return this;
}
public abstract Connection handle() throws IOException;
public boolean isIdle()
{
@ -481,7 +179,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
{
}
private void commitRequest() throws IOException
protected void commitRequest() throws IOException
{
synchronized (this)
{
@ -559,16 +257,15 @@ public class HttpConnection extends AbstractConnection implements Dumpable
protected void reset(boolean returnBuffers) throws IOException
{
_requestComplete = false;
_connectionHeader = null;
_parser.reset();
if (returnBuffers)
_parser.returnBuffers();
_generator.reset(returnBuffers);
_generator.reset();
_http11 = true;
}
private boolean shouldClose()
protected boolean shouldClose()
{
if (_endp.isInputShutdown())
return true;
@ -783,7 +480,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
// Connection idle, close it
if (_idle.compareAndSet(true, false))
{
_destination.returnIdleConnection(HttpConnection.this);
_destination.returnIdleConnection(AbstractHttpConnection.this);
}
}
}

View File

@ -0,0 +1,346 @@
package org.eclipse.jetty.client;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
{
private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
private Buffer _requestContentChunk;
private boolean _requestComplete;
private int _status;
AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{
super(requestBuffers,responseBuffers,endp);
}
protected void reset(boolean returnBuffers) throws IOException
{
_requestComplete = false;
super.reset(returnBuffers);
}
public Connection handle() throws IOException
{
try
{
int no_progress = 0;
boolean failed = false;
while (_endp.isBufferingInput() || _endp.isOpen())
{
synchronized (this)
{
while (_exchange == null)
{
if (_endp.isBlocking())
{
try
{
this.wait();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
else
{
long filled = _parser.fill();
if (filled < 0)
{
close();
}
else
{
// Hopefully just space?
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
LOG.warn("Unexpected data received but no request sent");
close();
}
}
return this;
}
}
}
try
{
if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
{
no_progress = 0;
commitRequest();
}
long io = 0;
_endp.flush();
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
}
else
{
// Write as much of the request as possible
synchronized (this)
{
if (_exchange == null)
continue;
}
long flushed = _generator.flushBuffer();
io += flushed;
if (!_generator.isComplete())
{
if (_exchange!=null)
{
InputStream in = _exchange.getRequestContentSource();
if (in != null)
{
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
_requestContentChunk = _exchange.getRequestContentChunk();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
flushed = _generator.flushBuffer();
io += flushed;
}
}
else
_generator.complete();
}
else
_generator.complete();
}
}
if (_generator.isComplete() && !_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// If we are not ended then parse available
if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
{
if (_parser.parseAvailable())
io++;
if (_parser.isIdle() && (_endp.isInputShutdown() || !_endp.isOpen()))
throw new EOFException();
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 1 && !_endp.isBlocking())
{
// SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
{
long flushed = _generator.flushBuffer();
if (flushed>0)
continue;
}
return this;
}
}
catch (Throwable e)
{
LOG.debug("Failure on " + _exchange, e);
if (e instanceof ThreadDeath)
throw (ThreadDeath)e;
failed = true;
synchronized (this)
{
if (_exchange != null)
{
// Cancelling the exchange causes an exception as we close the connection,
// but we don't report it as it is normal cancelling operation
if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
_exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(e);
}
}
else
{
if (e instanceof IOException)
throw (IOException)e;
if (e instanceof Error)
throw (Error)e;
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new RuntimeException(e);
}
}
}
finally
{
boolean complete = false;
boolean close = failed; // always close the connection on error
if (!failed)
{
// are we complete?
if (_generator.isComplete())
{
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
}
// we need to return the HttpConnection to a state that
// it can be reused or closed out
if (_parser.isComplete())
{
_exchange.cancelTimeout(_destination.getHttpClient());
complete = true;
}
}
// if the endpoint is closed, but the parser incomplete
if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
{
// we wont be called again so let the parser see the close
complete=true;
_parser.parseAvailable();
// TODO should not need this
if (!(_parser.isComplete()||_parser.isIdle()))
{
LOG.warn("Incomplete {} {}",_parser,_endp);
if (_exchange!=null && !_exchange.isDone())
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
}
}
}
}
if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
{
if (_exchange!=null && !_exchange.isDone())
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
}
_endp.close();
}
if (complete || failed)
{
synchronized (this)
{
if (!close)
close = shouldClose();
reset(true);
no_progress = 0;
if (_exchange != null)
{
HttpExchange exchange=_exchange;
_exchange = null;
// Reset the maxIdleTime because it may have been changed
if (!close)
_endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=exchange.onSwitchProtocol(_endp);
if (switched!=null)
{
// switched protocol!
exchange = _pipeline;
_pipeline = null;
if (exchange!=null)
_destination.send(exchange);
return switched;
}
}
if (_pipeline == null)
{
if (!isReserved())
_destination.returnConnection(this, close);
}
else
{
if (close)
{
if (!isReserved())
_destination.returnConnection(this,close);
exchange = _pipeline;
_pipeline = null;
_destination.send(exchange);
}
else
{
exchange = _pipeline;
_pipeline = null;
send(exchange);
}
}
}
}
}
}
}
}
finally
{
_parser.returnBuffers();
// Do we have more stuff to write?
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
{
// Assume we are write blocked!
((AsyncEndPoint)_endp).scheduleWrite();
}
}
return this;
}
public void onInputShutdown() throws IOException
{
// TODO
}
}

View File

@ -0,0 +1,25 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import javax.naming.OperationNotSupportedException;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
public class BlockingHttpConnection extends AbstractHttpConnection
{
BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{
super(requestBuffers,responseBuffers,endp);
}
@Override
public Connection handle() throws IOException
{
throw new IOException("NOT IMPLEMENTED YET");
}
}

View File

@ -52,11 +52,11 @@ import org.eclipse.jetty.util.thread.Timeout;
* The an instance of {@link HttpExchange} is passed to the {@link #send(HttpExchange)} method
* to send a request. The exchange contains both the headers and content (source) of the request
* plus the callbacks to handle responses. A HttpClient can have many exchanges outstanding
* and they may be queued on the {@link HttpDestination} waiting for a {@link HttpConnection},
* queued in the {@link HttpConnection} waiting to be transmitted or pipelined on the actual
* and they may be queued on the {@link HttpDestination} waiting for a {@link AbstractHttpConnection},
* queued in the {@link AbstractHttpConnection} waiting to be transmitted or pipelined on the actual
* TCP/IP connection waiting for a response.
* <p/>
* The {@link HttpDestination} class is an aggregation of {@link HttpConnection}s for the
* The {@link HttpDestination} class is an aggregation of {@link AbstractHttpConnection}s for the
* same host, port and protocol. A destination may limit the number of connections
* open and they provide a pool of open connections that may be reused. Connections may also
* be allocated from a destination, so that multiple request sources are not multiplexed
@ -526,7 +526,7 @@ public class HttpClient extends HttpBuffers implements Attributes, Dumpable
/* ------------------------------------------------------------ */
/**
* @return the period in milliseconds a {@link HttpConnection} can be idle for before it is closed.
* @return the period in milliseconds a {@link AbstractHttpConnection} can be idle for before it is closed.
*/
public long getIdleTimeout()
{
@ -535,7 +535,7 @@ public class HttpClient extends HttpBuffers implements Attributes, Dumpable
/* ------------------------------------------------------------ */
/**
* @param ms the period in milliseconds a {@link HttpConnection} can be idle for before it is closed.
* @param ms the period in milliseconds a {@link AbstractHttpConnection} can be idle for before it is closed.
*/
public void setIdleTimeout(long ms)
{

View File

@ -48,9 +48,9 @@ public class HttpDestination implements Dumpable
private static final Logger LOG = Log.getLogger(HttpDestination.class);
private final List<HttpExchange> _queue = new LinkedList<HttpExchange>();
private final List<HttpConnection> _connections = new LinkedList<HttpConnection>();
private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
private final BlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10, true);
private final List<HttpConnection> _idle = new ArrayList<HttpConnection>();
private final List<AbstractHttpConnection> _idle = new ArrayList<AbstractHttpConnection>();
private final HttpClient _client;
private final Address _address;
private final boolean _ssl;
@ -168,9 +168,9 @@ public class HttpDestination implements Dumpable
* @return a HttpConnection for this destination
* @throws IOException if an I/O error occurs
*/
private HttpConnection getConnection(long timeout) throws IOException
private AbstractHttpConnection getConnection(long timeout) throws IOException
{
HttpConnection connection = null;
AbstractHttpConnection connection = null;
while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
{
@ -191,9 +191,9 @@ public class HttpDestination implements Dumpable
try
{
Object o = _newQueue.take();
if (o instanceof HttpConnection)
if (o instanceof AbstractHttpConnection)
{
connection = (HttpConnection)o;
connection = (AbstractHttpConnection)o;
}
else
throw (IOException)o;
@ -220,17 +220,17 @@ public class HttpDestination implements Dumpable
return connection;
}
public HttpConnection reserveConnection(long timeout) throws IOException
public AbstractHttpConnection reserveConnection(long timeout) throws IOException
{
HttpConnection connection = getConnection(timeout);
AbstractHttpConnection connection = getConnection(timeout);
if (connection != null)
connection.setReserved(true);
return connection;
}
public HttpConnection getIdleConnection() throws IOException
public AbstractHttpConnection getIdleConnection() throws IOException
{
HttpConnection connection = null;
AbstractHttpConnection connection = null;
while (true)
{
synchronized (this)
@ -330,7 +330,7 @@ public class HttpDestination implements Dumpable
}
}
public void onNewConnection(final HttpConnection connection) throws IOException
public void onNewConnection(final AbstractHttpConnection connection) throws IOException
{
Connection q_connection = null;
@ -381,7 +381,7 @@ public class HttpDestination implements Dumpable
}
}
public void returnConnection(HttpConnection connection, boolean close) throws IOException
public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
{
if (connection.isReserved())
connection.setReserved(false);
@ -433,7 +433,7 @@ public class HttpDestination implements Dumpable
}
}
public void returnIdleConnection(HttpConnection connection)
public void returnIdleConnection(AbstractHttpConnection connection)
{
try
{
@ -533,7 +533,7 @@ public class HttpDestination implements Dumpable
// so that we count also the queue time in the timeout
ex.scheduleTimeout(this);
HttpConnection connection = getIdleConnection();
AbstractHttpConnection connection = getIdleConnection();
if (connection != null)
{
send(connection, ex);
@ -566,7 +566,7 @@ public class HttpDestination implements Dumpable
}
}
protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
{
synchronized (this)
{
@ -594,7 +594,7 @@ public class HttpDestination implements Dumpable
b.append('\n');
synchronized (this)
{
for (HttpConnection connection : _connections)
for (AbstractHttpConnection connection : _connections)
{
b.append(connection.toDetailString());
if (_idle.contains(connection))
@ -637,7 +637,7 @@ public class HttpDestination implements Dumpable
{
synchronized (this)
{
for (HttpConnection connection : _connections)
for (AbstractHttpConnection connection : _connections)
{
connection.close();
}

View File

@ -59,7 +59,7 @@ import org.eclipse.jetty.util.thread.Timeout;
*
* <p>
* Typically the HttpExchange is passed to the {@link HttpClient#send(HttpExchange)} method, which in turn selects a {@link HttpDestination} and calls its
* {@link HttpDestination#send(HttpExchange)}, which then creates or selects a {@link HttpConnection} and calls its {@link HttpConnection#send(HttpExchange)}. A
* {@link HttpDestination#send(HttpExchange)}, which then creates or selects a {@link AbstractHttpConnection} and calls its {@link AbstractHttpConnection#send(HttpExchange)}. A
* developer may wish to directly call send on the destination or connection if they wish to bypass some handling provided (eg Cookie handling in the
* HttpDestination).
* </p>
@ -103,7 +103,7 @@ public class HttpExchange
// controls if the exchange will have listeners autoconfigured by the destination
private boolean _configureListeners = true;
private HttpEventListener _listener = new Listener();
private volatile HttpConnection _connection;
private volatile AbstractHttpConnection _connection;
private Address _localAddress = null;
@ -125,7 +125,7 @@ public class HttpExchange
setStatus(HttpExchange.STATUS_EXPIRED);
destination.exchangeExpired(this);
HttpConnection connection = _connection;
AbstractHttpConnection connection = _connection;
if (connection != null)
connection.exchangeExpired(this);
}
@ -778,7 +778,7 @@ public class HttpExchange
private void abort()
{
HttpConnection httpConnection = _connection;
AbstractHttpConnection httpConnection = _connection;
if (httpConnection != null)
{
try
@ -798,7 +798,7 @@ public class HttpExchange
}
}
void associate(HttpConnection connection)
void associate(AbstractHttpConnection connection)
{
if (connection.getEndPoint().getLocalHost() != null)
_localAddress = new Address(connection.getEndPoint().getLocalHost(),connection.getEndPoint().getLocalPort());
@ -813,9 +813,9 @@ public class HttpExchange
return this._connection != null;
}
HttpConnection disassociate()
AbstractHttpConnection disassociate()
{
HttpConnection result = _connection;
AbstractHttpConnection result = _connection;
this._connection = null;
if (getStatus() == STATUS_CANCELLING)
setStatus(STATUS_CANCELLED);

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
@ -153,12 +154,12 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
if (endpoint instanceof SslSelectChannelEndPoint)
return new HttpConnection(_sslBuffers,_sslBuffers,endpoint);
return new AsyncHttpConnection(_sslBuffers,_sslBuffers,endpoint);
return new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
}
@Override
@ -195,7 +196,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
}
HttpConnection connection=(HttpConnection)ep.getConnection();
AbstractHttpConnection connection=(AbstractHttpConnection)ep.getConnection();
connection.setDestination(dest);
dest.onNewConnection(connection);
return ep;

View File

@ -57,7 +57,7 @@ class SocketConnector extends AbstractLifeCycle implements HttpClient.Connector
EndPoint endpoint=new SocketEndPoint(socket);
final HttpConnection connection=new HttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
final AbstractHttpConnection connection=new BlockingHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
connection.setDestination(destination);
destination.onNewConnection(connection);
_httpClient.getThreadPool().dispatch(new Runnable()

View File

@ -571,7 +571,7 @@ public class HttpExchangeTest
{
_httpClient = serverAndClientCreator.createClient(3000L,3500L,2000);
final HttpDestination destination = _httpClient.getDestination(new Address("localhost",_port),_scheme.equalsIgnoreCase("https"));
final org.eclipse.jetty.client.HttpConnection[] connections = new org.eclipse.jetty.client.HttpConnection[_maxConnectionsPerAddress];
final org.eclipse.jetty.client.AbstractHttpConnection[] connections = new org.eclipse.jetty.client.AbstractHttpConnection[_maxConnectionsPerAddress];
for (int i = 0; i < _maxConnectionsPerAddress; i++)
{
connections[i] = destination.reserveConnection(200);
@ -595,7 +595,7 @@ public class HttpExchangeTest
assertNotNull(c);
// release connections
for (HttpConnection httpConnection : connections){
for (AbstractHttpConnection httpConnection : connections){
destination.returnConnection(httpConnection,false);
}
}

View File

@ -100,7 +100,7 @@ public abstract class AbstractGenerator implements Generator
}
/* ------------------------------------------------------------------------------- */
public void reset(boolean returnBuffers)
public void reset()
{
_state = STATE_HEADER;
_status = 0;
@ -114,20 +114,6 @@ public abstract class AbstractGenerator implements Generator
_contentLength = HttpTokens.UNKNOWN_CONTENT;
_date = null;
// always return the body buffer
if (_buffer!=null)
_buffers.returnBuffer(_buffer);
_buffer=null;
if (returnBuffers)
{
if (_header!=null)
_buffers.returnBuffer(_header);
_header=null;
}
else if (_header != null)
_header.clear();
_content = null;
_method=null;
}

View File

@ -70,7 +70,7 @@ public interface Generator
boolean isPersistent();
void reset(boolean returnBuffers);
void reset();
void resetBuffer();

View File

@ -125,9 +125,15 @@ public class HttpGenerator extends AbstractGenerator
/* ------------------------------------------------------------------------------- */
@Override
public void reset(boolean returnBuffers)
public void reset()
{
super.reset(returnBuffers);
super.reset();
if (_buffer!=null)
_buffer.clear();
if (_header!=null)
_header.clear();
if (_content!=null)
_content=null;
_bypass = false;
_needCRLF = false;
_needEOC = false;
@ -137,8 +143,6 @@ public class HttpGenerator extends AbstractGenerator
_noContent=false;
}
/* ------------------------------------------------------------ */
/**
* Add content.
@ -399,7 +403,7 @@ public class HttpGenerator extends AbstractGenerator
_contentLength = HttpTokens.NO_CONTENT;
_header.put(_method);
_header.put((byte)' ');
_header.put(_uri.getBytes("utf-8")); // TODO WRONG!
_header.put(_uri.getBytes("UTF-8")); // TODO check
_header.put(HttpTokens.CRLF);
_state = STATE_FLUSHING;
_noContent=true;
@ -409,7 +413,7 @@ public class HttpGenerator extends AbstractGenerator
{
_header.put(_method);
_header.put((byte)' ');
_header.put(_uri.getBytes("utf-8")); // TODO WRONG!
_header.put(_uri.getBytes("UTF-8")); // TODO check
_header.put((byte)' ');
_header.put(_version==HttpVersions.HTTP_1_0_ORDINAL?HttpVersions.HTTP_1_0_BUFFER:HttpVersions.HTTP_1_1_BUFFER);
_header.put(HttpTokens.CRLF);
@ -418,7 +422,6 @@ public class HttpGenerator extends AbstractGenerator
else
{
// Responses
if (_version == HttpVersions.HTTP_0_9_ORDINAL)
{
_persistent = false;
@ -786,8 +789,6 @@ public class HttpGenerator extends AbstractGenerator
}
}
/* ------------------------------------------------------------ */
/**
* Complete the message.
@ -899,7 +900,7 @@ public class HttpGenerator extends AbstractGenerator
if (_state==STATE_END && _persistent != null && !_persistent && _status!=100 && _method==null)
{
_endp.shutdownOutput();
_endp.shutdownOutput();
}
}
else
@ -907,7 +908,7 @@ public class HttpGenerator extends AbstractGenerator
prepareBuffers();
}
}
if (len > 0)
total+=len;

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http;
import java.io.EOFException;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
@ -55,6 +54,7 @@ public class HttpParser implements Parser
public static final int STATE_CHUNK_SIZE=4;
public static final int STATE_CHUNK_PARAMS=5;
public static final int STATE_CHUNK=6;
public static final int STATE_SEEKING_EOF=7;
private final EventHandler _handler;
private final Buffers _buffers; // source of buffers
@ -68,6 +68,7 @@ public class HttpParser implements Parser
private String _multiLineValue;
private int _responseStatus; // If >0 then we are parsing a response
private boolean _forceContentBuffer;
private boolean _persistent;
/* ------------------------------------------------------------------------------- */
protected final View _contentView=new View(); // View of the content in the buffer for {@link Input}
@ -186,6 +187,20 @@ public class HttpParser implements Parser
return _state == state;
}
/* ------------------------------------------------------------------------------- */
public boolean isPersistent()
{
return _persistent;
}
/* ------------------------------------------------------------------------------- */
public void setPersistent(boolean persistent)
{
_persistent = persistent;
if (_state==STATE_END)
_state=STATE_SEEKING_EOF;
}
/* ------------------------------------------------------------------------------- */
/**
* Parse until {@link #STATE_END END} state.
@ -213,19 +228,16 @@ public class HttpParser implements Parser
* @see #parse
* @see #parseNext
*/
public int parseAvailable() throws IOException
public boolean parseAvailable() throws IOException
{
int progress = parseNext();
int total=progress>0?1:0;
boolean progress=parseNext()>0;
// continue parsing
while (!isComplete() && _buffer!=null && _buffer.length()>0)
{
progress = parseNext();
if (progress>0)
total++;
progress |= parseNext()>0;
}
return total;
return progress;
}
@ -281,32 +293,44 @@ public class HttpParser implements Parser
if (filled < 0 || _endp.isInputShutdown())
{
if (_headResponse && _state>STATE_END)
_persistent=false;
// do we have content to deliver?
if (_state>STATE_END)
{
_state=STATE_END;
_handler.messageComplete(_contentPosition);
return 1;
}
if ( _state == STATE_EOF_CONTENT)
{
if (_buffer.length()>0)
if (_buffer.length()>0 && !_headResponse)
{
// TODO should we do this here or fall down to main loop?
Buffer chunk=_buffer.get(_buffer.length());
_contentPosition += chunk.length();
_contentView.update(chunk);
_handler.content(chunk); // May recurse here
}
_state=STATE_END;
_handler.messageComplete(_contentPosition);
return 1;
}
// was this unexpected?
switch(_state)
{
case STATE_END:
case STATE_SEEKING_EOF:
_state=STATE_END;
break;
case STATE_EOF_CONTENT:
_state=STATE_END;
_handler.messageComplete(_contentPosition);
break;
default:
_state=STATE_END;
_handler.earlyEOF();
_handler.messageComplete(_contentPosition);
}
if (ex!=null)
throw ex;
if (!isComplete() && !isIdle())
throw new EOFException();
throw new EofException();
return -1;
}
@ -314,7 +338,7 @@ public class HttpParser implements Parser
}
// EventHandler header
// Handle header states
byte ch;
byte[] array=_buffer.array();
int last=_state;
@ -417,7 +441,8 @@ public class HttpParser implements Parser
{
// HTTP/0.9
_handler.startRequest(HttpMethods.CACHE.lookup(_tok0), _buffer.sliceFromMark(), null);
_state=STATE_END;
_persistent=false;
_state=STATE_SEEKING_EOF;
_handler.headerComplete();
_handler.messageComplete(_contentPosition);
return 1;
@ -445,7 +470,8 @@ public class HttpParser implements Parser
{
// HTTP/0.9
_handler.startRequest(HttpMethods.CACHE.lookup(_tok0), _tok1, null);
_state=STATE_END;
_persistent=false;
_state=STATE_SEEKING_EOF;
_handler.headerComplete();
_handler.messageComplete(_contentPosition);
return 1;
@ -456,11 +482,13 @@ public class HttpParser implements Parser
case STATE_FIELD2:
if (ch == HttpTokens.CARRIAGE_RETURN || ch == HttpTokens.LINE_FEED)
{
Buffer version;
if (_responseStatus>0)
_handler.startResponse(HttpVersions.CACHE.lookup(_tok0), _responseStatus,_buffer.sliceFromMark());
_handler.startResponse(version=HttpVersions.CACHE.lookup(_tok0), _responseStatus,_buffer.sliceFromMark());
else
_handler.startRequest(HttpMethods.CACHE.lookup(_tok0), _tok1, HttpVersions.CACHE.lookup(_buffer.sliceFromMark()));
_handler.startRequest(HttpMethods.CACHE.lookup(_tok0), _tok1, version=HttpVersions.CACHE.lookup(_buffer.sliceFromMark()));
_eol=ch;
_persistent=HttpVersions.CACHE.getOrdinal(version)>=HttpVersions.HTTP_1_1_ORDINAL;
_state=STATE_HEADER;
_tok0.setPutIndex(_tok0.getIndex());
_tok1.setPutIndex(_tok1.getIndex());
@ -487,7 +515,6 @@ public class HttpParser implements Parser
// handler last header if any
if (_cached!=null || _tok0.length() > 0 || _tok1.length() > 0 || _multiLineValue != null)
{
Buffer header=_cached!=null?_cached:HttpHeaders.CACHE.lookup(_tok0);
_cached=null;
Buffer value=_multiLineValue == null ? _tok1 : new ByteArrayBuffer(_multiLineValue);
@ -531,6 +558,36 @@ public class HttpParser implements Parser
throw new HttpException(400,null);
}
break;
case HttpHeaders.CONNECTION_ORDINAL:
switch(HttpHeaderValues.CACHE.getOrdinal(value))
{
case HttpHeaderValues.CLOSE_ORDINAL:
_persistent=false;
break;
case HttpHeaderValues.KEEP_ALIVE_ORDINAL:
_persistent=true;
break;
case -1: // No match, may be multi valued
{
for (String v : value.toString().split(","))
{
switch(HttpHeaderValues.CACHE.getOrdinal(v.trim()))
{
case HttpHeaderValues.CLOSE_ORDINAL:
_persistent=false;
break;
case HttpHeaderValues.KEEP_ALIVE_ORDINAL:
_persistent=true;
break;
}
}
break;
}
}
}
}
@ -577,8 +634,7 @@ public class HttpParser implements Parser
break;
case HttpTokens.NO_CONTENT:
_state=STATE_END;
returnBuffers();
_state=_persistent||(_responseStatus>=100&&_responseStatus<200)?STATE_END:STATE_SEEKING_EOF;
_handler.headerComplete();
_handler.messageComplete(_contentPosition);
break;
@ -743,9 +799,10 @@ public class HttpParser implements Parser
// Handle HEAD response
if (_responseStatus>0 && _headResponse)
{
_state=STATE_END;
_state=_persistent||(_responseStatus>=100&&_responseStatus<200)?STATE_END:STATE_SEEKING_EOF;
_handler.messageComplete(_contentLength);
}
// ==========================
@ -783,7 +840,7 @@ public class HttpParser implements Parser
long remaining=_contentLength - _contentPosition;
if (remaining == 0)
{
_state=STATE_END;
_state=_persistent?STATE_END:STATE_SEEKING_EOF;
_handler.messageComplete(_contentPosition);
return 1;
}
@ -802,7 +859,7 @@ public class HttpParser implements Parser
if(_contentPosition == _contentLength)
{
_state=STATE_END;
_state=_persistent?STATE_END:STATE_SEEKING_EOF;
_handler.messageComplete(_contentPosition);
}
// TODO adjust the _buffer to keep unconsumed content
@ -836,7 +893,7 @@ public class HttpParser implements Parser
{
if (_eol==HttpTokens.CARRIAGE_RETURN && _buffer.hasContent() && _buffer.peek()==HttpTokens.LINE_FEED)
_eol=_buffer.get();
_state=STATE_END;
_state=_persistent?STATE_END:STATE_SEEKING_EOF;
_handler.messageComplete(_contentPosition);
return 1;
}
@ -866,7 +923,7 @@ public class HttpParser implements Parser
{
if (_eol==HttpTokens.CARRIAGE_RETURN && _buffer.hasContent() && _buffer.peek()==HttpTokens.LINE_FEED)
_eol=_buffer.get();
_state=STATE_END;
_state=_persistent?STATE_END:STATE_SEEKING_EOF;
_handler.messageComplete(_contentPosition);
return 1;
}
@ -894,6 +951,13 @@ public class HttpParser implements Parser
// TODO adjust the _buffer to keep unconsumed content
return 1;
}
case STATE_SEEKING_EOF:
{
// Skip all data
_buffer.clear();
break;
}
}
length=_buffer.length();
@ -996,7 +1060,7 @@ public class HttpParser implements Parser
{
// reset state
_contentView.setGetIndex(_contentView.putIndex());
_state=STATE_START;
_state=_persistent?STATE_START:(_endp.isInputShutdown()?STATE_END:STATE_SEEKING_EOF);
_contentLength=HttpTokens.UNKNOWN_CONTENT;
_contentPosition=0;
_length=0;
@ -1110,7 +1174,8 @@ public class HttpParser implements Parser
{
if (_contentView.length()>0)
return _contentView;
if (getState() <= HttpParser.STATE_END)
if (getState() <= STATE_END || isState(STATE_SEEKING_EOF))
return null;
try
@ -1118,7 +1183,7 @@ public class HttpParser implements Parser
parseNext();
// parse until some progress is made (or IOException thrown for timeout)
while(_contentView.length() == 0 && !isState(HttpParser.STATE_END) && _endp!=null && _endp.isOpen())
while(_contentView.length() == 0 && !(isState(HttpParser.STATE_END)||isState(HttpParser.STATE_SEEKING_EOF)) && _endp!=null && _endp.isOpen())
{
if (!_endp.isBlocking())
{
@ -1137,10 +1202,11 @@ public class HttpParser implements Parser
}
catch(IOException e)
{
// TODO is this needed?
_endp.close();
throw e;
}
return _contentView.length()>0?_contentView:null;
}
@ -1198,6 +1264,9 @@ public class HttpParser implements Parser
*/
public abstract void startResponse(Buffer version, int status, Buffer reason)
throws IOException;
public void earlyEOF()
{}
}

View File

@ -26,13 +26,17 @@ public interface Parser
boolean isComplete();
/**
* @return An indication of progress, typically the number of bytes filled plus the events parsed: -1 means EOF read, 0 no progress, >0 progress
* @return True if progress made
* @throws IOException
*/
int parseAvailable() throws IOException;
boolean parseAvailable() throws IOException;
boolean isMoreInBuffer() throws IOException;
boolean isIdle();
boolean isPersistent();
void setPersistent(boolean persistent);
}

View File

@ -138,7 +138,7 @@ public class HttpGeneratorClientTest
String t="v="+v+",r="+r+",chunks="+chunks+",c="+c+",tr="+tr[r];
// System.err.println(t);
hb.reset(true);
hb.reset();
endp.reset();
fields.clear();

View File

@ -86,7 +86,7 @@ public class HttpGeneratorTest
String t="v="+v+",r="+r+",chunks="+chunks+",connect="+c+",tr="+tr[r];
// System.err.println(t);
hb.reset(true);
hb.reset();
endp.reset();
fields.clear();

View File

@ -210,6 +210,7 @@ public class HttpParserTest
StringEndPoint io=new StringEndPoint();
io.setInput(
"GET /mp HTTP/1.0\015\012"
+ "Connection: Keep-Alive\015\012"
+ "Header1: value1\015\012"
+ "Transfer-Encoding: chunked\015\012"
+ "\015\012"
@ -219,10 +220,12 @@ public class HttpParserTest
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ\015\012"
+ "0\015\012"
+ "POST /foo HTTP/1.0\015\012"
+ "Connection: Keep-Alive\015\012"
+ "Header2: value2\015\012"
+ "Content-Length: 0\015\012"
+ "\015\012"
+ "PUT /doodle HTTP/1.0\015\012"
+ "Connection: close\015\012"
+ "Header3: value3\015\012"
+ "Content-Length: 10\015\012"
+ "\015\012"
@ -238,27 +241,27 @@ public class HttpParserTest
assertEquals("GET", f0);
assertEquals("/mp", f1);
assertEquals("HTTP/1.0", f2);
assertEquals(1, h);
assertEquals("Header1", hdr[0]);
assertEquals("value1", val[0]);
assertEquals(2, h);
assertEquals("Header1", hdr[1]);
assertEquals("value1", val[1]);
assertEquals("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
parser.parse();
assertEquals("POST", f0);
assertEquals("/foo", f1);
assertEquals("HTTP/1.0", f2);
assertEquals(1, h);
assertEquals("Header2", hdr[0]);
assertEquals("value2", val[0]);
assertEquals(2, h);
assertEquals("Header2", hdr[1]);
assertEquals("value2", val[1]);
assertEquals(null, _content);
parser.parse();
assertEquals("PUT", f0);
assertEquals("/doodle", f1);
assertEquals("HTTP/1.0", f2);
assertEquals(1, h);
assertEquals("Header3", hdr[0]);
assertEquals("value3", val[0]);
assertEquals(2, h);
assertEquals("Header3", hdr[1]);
assertEquals("value3", val[1]);
assertEquals("0123456789", _content);
}
@ -266,7 +269,8 @@ public class HttpParserTest
public void testStreamParse() throws Exception
{
StringEndPoint io=new StringEndPoint();
String http="GET / HTTP/1.0\015\012"
String http="GET / HTTP/1.1\015\012"
+ "Host: test\015\012"
+ "Header1: value1\015\012"
+ "Transfer-Encoding: chunked\015\012"
+ "\015\012"
@ -275,11 +279,14 @@ public class HttpParserTest
+ "1a\015\012"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ\015\012"
+ "0\015\012"
+ "POST /foo HTTP/1.0\015\012"
+ "POST /foo HTTP/1.1\015\012"
+ "Host: test\015\012"
+ "Header2: value2\015\012"
+ "Content-Length: 0\015\012"
+ "\015\012"
+ "PUT /doodle HTTP/1.0\015\012"
+ "PUT /doodle HTTP/1.1\015\012"
+ "Host: test\015\012"
+ "Connection: close\015\012"
+ "Header3: value3\015\012"
+ "Content-Length: 10\015\012"
+ "\015\012"
@ -296,15 +303,17 @@ public class HttpParserTest
http.length() - 2,
http.length() / 2,
http.length() / 3,
64,
128,
32
};
for (int t= 0; t < tests.length; t++)
{
String tst="t"+tests[t];
String tst="t"+t+"="+tests[t];
try
{
{
f0=f1=f2=null;
h=0;
ByteArrayBuffer buffer= new ByteArrayBuffer(tests[t]);
ByteArrayBuffer content=new ByteArrayBuffer(8192);
SimpleBuffers buffers=new SimpleBuffers(buffer,content);
@ -314,31 +323,32 @@ public class HttpParserTest
io.setInput(http);
System.err.println(tst);
parser.parse();
assertEquals(tst,"GET", f0);
assertEquals(tst,"/", f1);
assertEquals(tst,"HTTP/1.0", f2);
assertEquals(tst,1, h);
assertEquals(tst,"Header1", hdr[0]);
assertEquals(tst,"value1", val[0]);
assertEquals(tst,"HTTP/1.1", f2);
assertEquals(tst,2, h);
assertEquals(tst,"Header1", hdr[1]);
assertEquals(tst,"value1", val[1]);
assertEquals(tst,"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ", _content);
parser.parse();
assertEquals(tst,"POST", f0);
assertEquals(tst,"/foo", f1);
assertEquals(tst,"HTTP/1.0", f2);
assertEquals(tst,1, h);
assertEquals(tst,"Header2", hdr[0]);
assertEquals(tst,"value2", val[0]);
assertEquals(tst,"HTTP/1.1", f2);
assertEquals(tst,2, h);
assertEquals(tst,"Header2", hdr[1]);
assertEquals(tst,"value2", val[1]);
assertEquals(tst,null, _content);
parser.parse();
assertEquals(tst,"PUT", f0);
assertEquals(tst,"/doodle", f1);
assertEquals(tst,"HTTP/1.0", f2);
assertEquals(tst,1, h);
assertEquals(tst,"Header3", hdr[0]);
assertEquals(tst,"value3", val[0]);
assertEquals(tst,"HTTP/1.1", f2);
assertEquals(tst,3, h);
assertEquals(tst,"Header3", hdr[2]);
assertEquals(tst,"value3", val[2]);
assertEquals(tst,"0123456789", _content);
}
catch(Exception e)
@ -401,7 +411,7 @@ public class HttpParserTest
StringEndPoint io=new StringEndPoint();
io.setInput(
"HTTP/1.1 204 No-Content\015\012"
+ "Connection: close\015\012"
+ "Header: value\015\012"
+ "\015\012"
+ "HTTP/1.1 200 Correct\015\012"
+ "Content-Length: 10\015\012"

View File

@ -20,7 +20,7 @@ public interface AsyncEndPoint extends EndPoint
* Dispatch the endpoint to a thread to attend to it.
*
*/
public void dispatch();
public void asyncDispatch();
/**
* @return true if this endpoint can accept a dispatch. False if the

View File

@ -103,6 +103,12 @@ public class BufferCache
return lookup(buffer).toString();
}
public int getOrdinal(String value)
{
CachedBuffer buffer = (CachedBuffer)_stringMap.get(value);
return buffer==null?-1:buffer.getOrdinal();
}
public int getOrdinal(Buffer buffer)
{
if (buffer instanceof CachedBuffer)

View File

@ -79,14 +79,34 @@ public class SocketEndPoint extends StreamEndPoint
@Override
public boolean isInputShutdown()
{
return !isOpen() || super.isInputShutdown();
if (_socket instanceof SSLSocket)
return super.isInputShutdown();
return _socket.isClosed() || _socket.isInputShutdown();
}
/* ------------------------------------------------------------ */
@Override
public boolean isOutputShutdown()
{
if (_socket instanceof SSLSocket)
return super.isOutputShutdown();
return _socket.isClosed() || _socket.isOutputShutdown();
}
/* ------------------------------------------------------------ */
/*
*/
protected final void shutdownSocketOutput() throws IOException
{
return !isOpen() || super.isOutputShutdown();
if (!_socket.isClosed())
{
if (!_socket.isOutputShutdown())
_socket.shutdownOutput();
if (_socket.isInputShutdown())
_socket.close();
}
}
/* ------------------------------------------------------------ */
@ -96,15 +116,27 @@ public class SocketEndPoint extends StreamEndPoint
@Override
public void shutdownOutput() throws IOException
{
if (!isOutputShutdown())
{
if (_socket instanceof SSLSocket)
super.shutdownOutput();
if (!(_socket instanceof SSLSocket))
_socket.shutdownOutput();
}
else
shutdownSocketOutput();
}
/* ------------------------------------------------------------ */
/*
*/
public void shutdownSocketInput() throws IOException
{
if (!_socket.isClosed())
{
if (!_socket.isInputShutdown())
_socket.shutdownInput();
if (_socket.isOutputShutdown())
_socket.close();
}
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.jetty.io.bio.StreamEndPoint#shutdownOutput()
@ -112,12 +144,10 @@ public class SocketEndPoint extends StreamEndPoint
@Override
public void shutdownInput() throws IOException
{
if (!isInputShutdown())
{
if (_socket instanceof SSLSocket)
super.shutdownInput();
if (!(_socket instanceof SSLSocket))
_socket.shutdownInput();
}
else
shutdownSocketInput();
}
/* ------------------------------------------------------------ */

View File

@ -21,6 +21,7 @@ import java.net.SocketTimeoutException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
public class StreamEndPoint implements EndPoint
{
@ -72,10 +73,8 @@ public class StreamEndPoint implements EndPoint
public void shutdownOutput() throws IOException
{
if (_oshut)
return;
_oshut = true;
if (_out!=null)
if (_ishut && _out!=null)
_out.close();
}
@ -86,10 +85,8 @@ public class StreamEndPoint implements EndPoint
public void shutdownInput() throws IOException
{
if (_ishut)
return;
_ishut = true;
if (_in!=null)
if (_oshut&&_in!=null)
_in.close();
}
@ -122,6 +119,8 @@ public class StreamEndPoint implements EndPoint
*/
public int fill(Buffer buffer) throws IOException
{
if (_ishut)
return -1;
if (_in==null)
return 0;
@ -136,13 +135,8 @@ public class StreamEndPoint implements EndPoint
try
{
int read=buffer.readFrom(_in, space);
if (read<0 && isOpen())
{
if (!isInputShutdown())
shutdownInput();
else if (isOutputShutdown())
close();
}
if (read<0)
shutdownInput();
return read;
}
catch(SocketTimeoutException e)
@ -157,8 +151,10 @@ public class StreamEndPoint implements EndPoint
*/
public int flush(Buffer buffer) throws IOException
{
if (_out==null)
if (_oshut)
return -1;
if (_out==null)
return 0;
int length=buffer.length();
if (length>0)
buffer.writeTo(_out);

View File

@ -53,6 +53,8 @@ public class StringEndPoint extends StreamEndPoint
_in=_bin;
_bout = new ByteArrayOutputStream();
_out=_bout;
_ishut=false;
_oshut=false;
}
catch(Exception e)
{

View File

@ -0,0 +1,22 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io.nio;
import java.io.IOException;
import org.eclipse.jetty.io.Connection;
public interface AsyncConnection extends Connection
{
void onInputShutdown() throws IOException;
}

View File

@ -101,50 +101,68 @@ public class ChannelEndPoint implements EndPoint
return _channel.isOpen();
}
/** Shutdown the channel Input.
* Cannot be overridden. To override, see {@link #shutdownInput()}
* @throws IOException
*/
protected final void shutdownChannelInput() throws IOException
{
if (_channel.isOpen())
{
if (_channel instanceof SocketChannel)
{
Socket socket= ((SocketChannel)_channel).socket();
if (!socket.isInputShutdown())
socket.shutdownInput();
if(socket.isOutputShutdown())
close();
}
else
close();
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
public void shutdownInput() throws IOException
{
if (_channel.isOpen() && _channel instanceof SocketChannel)
{
Socket socket= ((SocketChannel)_channel).socket();
if (!socket.isClosed())
{
if(socket.isOutputShutdown())
socket.close();
else if (!socket.isInputShutdown())
socket.shutdownInput();
}
}
shutdownChannelInput();
}
protected final void shutdownChannelOutput() throws IOException
{
if (_channel.isOpen())
{
if (_channel instanceof SocketChannel)
{
Socket socket= ((SocketChannel)_channel).socket();
if (!socket.isOutputShutdown())
socket.shutdownOutput();
if (socket.isInputShutdown())
close();
}
else
close();
}
}
/* (non-Javadoc)
* @see org.eclipse.io.EndPoint#close()
*/
public void shutdownOutput() throws IOException
{
if (_channel.isOpen() && _channel instanceof SocketChannel)
{
Socket socket= ((SocketChannel)_channel).socket();
if (!socket.isClosed())
{
if (socket.isInputShutdown())
socket.close();
else if (!socket.isOutputShutdown())
socket.shutdownOutput();
}
}
shutdownChannelOutput();
}
public boolean isOutputShutdown()
{
return _channel.isOpen() && _socket!=null && _socket.isOutputShutdown();
return !_channel.isOpen() || _socket!=null && _socket.isOutputShutdown();
}
public boolean isInputShutdown()
{
return _channel.isOpen() && _socket!=null && _socket.isInputShutdown();
return !_channel.isOpen() || _socket!=null && _socket.isInputShutdown();
}
/* (non-Javadoc)
@ -188,7 +206,7 @@ public class ChannelEndPoint implements EndPoint
{
if (!isInputShutdown())
shutdownInput();
else if (isOutputShutdown())
if (isOutputShutdown())
_channel.close();
}
}
@ -197,7 +215,7 @@ public class ChannelEndPoint implements EndPoint
LOG.debug(x);
try
{
close();
_channel.close();
}
catch (IOException xx)
{

View File

@ -53,13 +53,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
* ProxyConnect. The connection may change for an SCEP as it is upgraded
* from HTTP to proxy connect or websocket.
*/
private volatile Connection _connection;
private volatile AsyncConnection _connection;
/** true if a thread has been dispatched to handle this endpoint */
private boolean _dispatched = false;
/** true if a non IO dispatch (eg async resume) is outstanding */
private boolean _redispatched = false;
private boolean _asyncDispatch = false;
/** true if the last write operation succeed and wrote all offered bytes */
private volatile boolean _writable = true;
@ -76,6 +76,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private volatile long _idleTimestamp;
private boolean _ishut;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
throws IOException
@ -85,31 +87,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_manager = selectSet.getManager();
_selectSet = selectSet;
_dispatched = false;
_redispatched = false;
_open=true;
_key = key;
_connection = _manager.newConnection(channel,this);
scheduleIdle();
}
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
throws IOException
{
super(channel);
_manager = selectSet.getManager();
_selectSet = selectSet;
_dispatched = false;
_redispatched = false;
_asyncDispatch = false;
_open=true;
_key = key;
_connection = _manager.newConnection(channel,this);
scheduleIdle();
}
/* ------------------------------------------------------------ */
@ -136,8 +121,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
public void setConnection(Connection connection)
{
// TODO Only needed for local connection
Connection old=_connection;
_connection=connection;
_connection=(AsyncConnection)connection;
_manager.endPointUpgraded(this,old);
}
@ -211,6 +197,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
}
/* ------------------------------------------------------------ */
public void asyncDispatch()
{
synchronized(this)
{
if (_dispatched)
_asyncDispatch=true;
else
dispatch();
}
}
/* ------------------------------------------------------------ */
public void dispatch()
{
@ -218,7 +216,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
if (_dispatched)
{
_redispatched=true;
throw new IllegalStateException("dispatched");
}
else
{
@ -245,9 +243,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized (this)
{
if (_redispatched)
if (_asyncDispatch)
{
_redispatched=false;
_asyncDispatch=false;
return false;
}
_dispatched = false;
@ -357,12 +355,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized (this)
{
if (isInputShutdown())
throw new EofException();
long now=_selectSet.getNow();
long end=now+timeoutMs;
try
{
_readBlocked=true;
while (isOpen() && _readBlocked)
while (!isInputShutdown() && _readBlocked)
{
try
{
@ -399,7 +400,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized (this)
{
if (!isOpen() || isOutputShutdown())
if (isOutputShutdown())
throw new EofException();
long now=_selectSet.getNow();
@ -407,7 +408,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
try
{
_writeBlocked=true;
while (isOpen() && _writeBlocked && !isOutputShutdown())
while (_writeBlocked && !isOutputShutdown())
{
try
{
@ -426,16 +427,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
return false;
}
}
catch(Throwable e)
{
// TODO remove this if it finds nothing
LOG.warn(e);
if (e instanceof RuntimeException)
throw (RuntimeException)e;
if (e instanceof Error)
throw (Error)e;
throw new RuntimeException(e);
}
finally
{
_writeBlocked=false;
@ -471,9 +462,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
*/
private void updateKey()
{
int current_ops=-1;
synchronized (this)
{
int ops=-1;
if (getChannel().isOpen())
{
_interestOps =
@ -481,7 +472,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
| ((!_socket.isOutputShutdown()&& (!_writable || _writeBlocked)) ? SelectionKey.OP_WRITE : 0);
try
{
ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
}
catch(Exception e)
{
@ -489,12 +480,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
LOG.ignore(e);
}
}
if(_interestOps == ops && getChannel().isOpen())
return;
}
_selectSet.addChange(this);
_selectSet.wakeup();
if(_interestOps != current_ops)
{
_selectSet.addChange(this);
_selectSet.wakeup();
}
}
/* ------------------------------------------------------------ */
@ -583,11 +575,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
while(true)
{
final Connection next = _connection.handle();
final AsyncConnection next = (AsyncConnection)_connection.handle();
if (next!=_connection)
{
LOG.debug("{} replaced {}",next,_connection);
_connection=next;
_manager.endPointUpgraded(this,_connection);
continue;
}
break;
@ -600,23 +593,47 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
catch (EofException e)
{
LOG.debug("EOF", e);
try{getChannel().close();}
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (IOException e)
{
LOG.warn(e.toString());
LOG.debug(e);
try{getChannel().close();}
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
catch (Throwable e)
{
LOG.warn("handle failed", e);
try{getChannel().close();}
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
dispatched=!undispatch();
finally
{
dispatched=!undispatch();
if (!_ishut && isInputShutdown() && isOpen())
{
_ishut=true;
try
{
_connection.onInputShutdown();
}
catch(Throwable x)
{
if (x instanceof ThreadDeath)
throw (ThreadDeath)x;
LOG.warn("onInputShutdown failed", x);
try{close();}
catch(IOException e2){LOG.ignore(e2);}
}
finally
{
updateKey();
}
}
}
}
}
finally

View File

@ -337,7 +337,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */
protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
protected abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
/**
@ -584,6 +584,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
System.err.println(set+":\n"+set.dump());
}
public String toString() {return "Dump-"+super.toString();}
});
}
}
@ -713,6 +714,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
endp.checkIdleTimestamp(idle_now);
}
}
public String toString() {return "Idle-"+super.toString();}
});
}
@ -837,6 +839,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
{
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
LOG.debug("created {}",endp);
endPointOpened(endp);
_endPoints.put(endp,this);
return endp;

View File

@ -50,10 +50,13 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
private final SSLEngine _engine;
private final SSLSession _session;
private volatile NIOBuffer _inNIOBuffer;
private volatile NIOBuffer _outNIOBuffer;
private NIOBuffer _inNIOBuffer;
private NIOBuffer _outNIOBuffer;
private boolean _outNeeded;
private boolean _inNeeded;
private boolean _closing=false;
private boolean _ishut=false;
private boolean _oshut=false;
private SSLEngineResult _result;
private volatile boolean _handshook=false;
@ -75,37 +78,35 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
if (_debug) LOG.debug(_session+" channel="+channel);
}
/* ------------------------------------------------------------ */
public SslSelectChannelEndPoint(Buffers buffers,SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, SSLEngine engine)
throws IOException
{
super(channel,selectSet,key);
_buffers=buffers;
// ssl
_engine=engine;
_session=engine.getSession();
if (_debug) LOG.debug(_session+" channel="+channel);
}
/* ------------------------------------------------------------ */
private void needOutBuffer()
{
synchronized (this)
{
assert !_outNeeded;
_outNeeded=true;
if (_outNIOBuffer==null)
_outNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
}
}
/* ------------------------------------------------------------ */
private void releaseOutBuffer()
{
synchronized (this)
{
_outNeeded=false;
freeOutBuffer();
}
}
/* ------------------------------------------------------------ */
private void freeOutBuffer()
{
synchronized (this)
{
if (_outNIOBuffer!=null && _outNIOBuffer.length()==0)
if (!_outNeeded && _outNIOBuffer!=null && _outNIOBuffer.length()==0)
{
_buffers.returnBuffer(_outNIOBuffer);
_outNIOBuffer=null;
@ -118,17 +119,29 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
{
synchronized (this)
{
assert !_inNeeded;
_inNeeded=true;
if(_inNIOBuffer==null)
_inNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
}
}
/* ------------------------------------------------------------ */
private void releaseInBuffer()
{
synchronized (this)
{
_inNeeded=false;
freeInBuffer();
}
}
/* ------------------------------------------------------------ */
private void freeInBuffer()
{
synchronized (this)
{
if (_inNIOBuffer!=null && _inNIOBuffer.length()==0)
if (!_inNeeded && _inNIOBuffer!=null && _inNIOBuffer.length()==0)
{
_buffers.returnBuffer(_inNIOBuffer);
_inNIOBuffer=null;
@ -170,18 +183,45 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
}
/* ------------------------------------------------------------ */
@Override
public boolean isOutputShutdown()
{
return _engine!=null && _engine.isOutboundDone();
}
/* ------------------------------------------------------------ */
@Override
public boolean isInputShutdown()
{
return _engine!=null && _engine.isInboundDone();
return _ishut || (_engine!=null && _engine.isInboundDone()) || super.isInputShutdown();
}
/* ------------------------------------------------------------ */
@Override
public boolean isOutputShutdown()
{
return _oshut || (_engine!=null && _engine.isOutboundDone()) || super.isOutputShutdown();
}
/* ------------------------------------------------------------ */
@Override
public void shutdownInput() throws IOException
{
LOG.debug("{} shutdownInput",_session);
// All SSL closes should be graceful, as it is more secure.
// So normal SSL close can be used here.
if (_ishut)
return;
_ishut=true;
try
{
_engine.closeInbound();
process(null,null);
}
catch (IOException e)
{
// We could not write the SSL close message so close to be sure
LOG.ignore(e);
close();
}
}
/* ------------------------------------------------------------ */
@ -189,16 +229,32 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
public void shutdownOutput() throws IOException
{
LOG.debug("{} shutdownOutput",_session);
// All SSL closes should be graceful, as it is more secure.
// So normal SSL close can be used here.
close();
if (_oshut)
return;
try
{
_oshut=true;
_engine.closeOutbound();
process(null,null);
}
catch (IOException e)
{
// We could not write the SSL close message so close to be sure
LOG.ignore(e);
close();
}
}
/* ------------------------------------------------------------ */
private int process(ByteBuffer inBBuf, Buffer outBuf) throws IOException
private synchronized int process(ByteBuffer inBBuf, Buffer outBuf) throws IOException
{
if (_debug)
LOG.debug("{} process closing={} in={} out={}",_session,_closing,inBBuf,outBuf);
LOG.debug("{} process ishut={} oshut={} in={} out={}",_session,_ishut,_oshut,inBBuf,outBuf);
// If there is no place to put incoming application data,
if (inBBuf==null)
@ -239,13 +295,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
case NOT_HANDSHAKING:
// If closing, don't process application data
if (_closing)
if (isOutputShutdown())
{
if (outBuf!=null && outBuf.hasContent())
{
LOG.debug("Write while closing");
outBuf.clear();
}
throw new EofException("Write while closing");
break;
}
@ -320,7 +373,7 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
// Need more data to be unwrapped so try another call to unwrap
progress|=unwrap(inBBuf);
if (_closing && inBBuf.hasRemaining())
if (isInputShutdown())
inBBuf.clear();
break;
}
@ -336,35 +389,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
}
/* ------------------------------------------------------------ */
@Override
public void close() throws IOException
{
// For safety we always force a close calling super
try
{
if (!_closing)
{
_closing=true;
LOG.debug("{} close",_session);
_engine.closeOutbound();
process(null,null);
}
}
catch (IOException e)
{
// We could not write the SSL close message because the
// socket was already closed, nothing more we can do.
LOG.ignore(e);
}
finally
{
super.close();
}
}
/* ------------------------------------------------------------ */
/** Fill the buffer with unencrypted bytes.
* Called by a Http Parser when more data is
* needed to continue parsing a request or a response.
@ -493,119 +517,127 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
/**
* @return true if progress is made
*/
private boolean unwrap(ByteBuffer buffer) throws IOException
private synchronized boolean unwrap(ByteBuffer buffer) throws IOException
{
needInBuffer();
ByteBuffer in_buffer=_inNIOBuffer.getByteBuffer();
_inNIOBuffer.compact();
int total_filled=0;
boolean remoteClosed = false;
LOG.debug("{} unwrap space={} open={}",_session,_inNIOBuffer.space(),super.isOpen());
// loop filling as much encrypted data as we can into the buffer
while (_inNIOBuffer.space()>0 && super.isOpen())
{
int filled=super.fill(_inNIOBuffer);
if (_debug) LOG.debug("{} filled {}",_session,filled);
if (filled < 0)
remoteClosed = true;
// break the loop if no progress is made (we have read everything there is to read)
if (filled<=0)
break;
total_filled+=filled;
}
// If we have no progress and no data
if (total_filled==0 && _inNIOBuffer.length()==0)
{
// Do we need to close?
if (isOpen() && remoteClosed)
{
try
{
_engine.closeInbound();
}
catch (SSLException x)
{
// It may happen, for example, in case of truncation
// attacks, we close so that we do not spin forever
super.close();
}
}
if (!isOpen())
throw new EofException();
return false;
}
// We have some in data, so try to unwrap it.
try
{
// inBuffer is the NIO buffer inside the _inNIOBuffer,
// so update its position and limit from the inNIOBuffer.
in_buffer.position(_inNIOBuffer.getIndex());
in_buffer.limit(_inNIOBuffer.putIndex());
ByteBuffer in_buffer=_inNIOBuffer.getByteBuffer();
// Do the unwrap
_result=_engine.unwrap(in_buffer,buffer);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
if (_debug) LOG.debug("{} unwrap {}",_session,_result);
_inNIOBuffer.compact();
// skip the bytes consumed
_inNIOBuffer.skip(_result.bytesConsumed());
}
catch(SSLException e)
{
LOG.warn(getRemoteAddr() + ":" + getRemotePort() + " ",e);
super.close();
throw e;
int total_filled=0;
boolean remoteClosed = false;
LOG.debug("{} unwrap space={} open={}",_session,_inNIOBuffer.space(),super.isOpen());
// loop filling as much encrypted data as we can into the buffer
while (_inNIOBuffer.space()>0 && super.isOpen())
{
int filled=super.fill(_inNIOBuffer);
if (_debug) LOG.debug("{} filled {}",_session,filled);
if (filled < 0)
remoteClosed = true;
// break the loop if no progress is made (we have read everything there is to read)
if (filled<=0)
break;
total_filled+=filled;
}
// If we have no progress and no data
if (total_filled==0 && _inNIOBuffer.length()==0)
{
// Do we need to close?
if (isOpen() && remoteClosed)
{
try
{
_engine.closeInbound();
}
catch (SSLException x)
{
// It may happen, for example, in case of truncation
// attacks, we close so that we do not spin forever
super.close();
}
}
if (!isOpen())
throw new EofException();
return false;
}
// We have some in data, so try to unwrap it.
try
{
// inBuffer is the NIO buffer inside the _inNIOBuffer,
// so update its position and limit from the inNIOBuffer.
in_buffer.position(_inNIOBuffer.getIndex());
in_buffer.limit(_inNIOBuffer.putIndex());
// Do the unwrap
_result=_engine.unwrap(in_buffer,buffer);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
if (_debug) LOG.debug("{} unwrap {}",_session,_result);
// skip the bytes consumed
_inNIOBuffer.skip(_result.bytesConsumed());
}
catch(SSLException e)
{
LOG.warn(getRemoteAddr() + ":" + getRemotePort() + " ",e);
super.close();
throw e;
}
finally
{
// reset the buffer so it can be managed by the _inNIOBuffer again.
in_buffer.position(0);
in_buffer.limit(in_buffer.capacity());
}
// handle the unwrap results
switch(_result.getStatus())
{
case BUFFER_OVERFLOW:
LOG.debug("{} unwrap overflow",_session);
return false;
case BUFFER_UNDERFLOW:
// Not enough data,
// If we are closed, we will never get more, so EOF
// else return and we will be tried again
// later when more data arriving causes another dispatch.
if (LOG.isDebugEnabled()) LOG.debug("{} unwrap {}",_session,_result);
if(!isOpen())
{
_inNIOBuffer.clear();
if (_outNIOBuffer!=null)
_outNIOBuffer.clear();
throw new EofException();
}
return (total_filled > 0);
case CLOSED:
if (super.isOpen())
super.close();
// return true is some bytes somewhere were moved about.
return total_filled>0 ||_result.bytesConsumed()>0 || _result.bytesProduced()>0;
case OK:
// return true is some bytes somewhere were moved about.
return total_filled>0 ||_result.bytesConsumed()>0 || _result.bytesProduced()>0;
default:
LOG.warn("{} unwrap default: {}",_session,_result);
throw new IOException(_result.toString());
}
}
finally
{
// reset the buffer so it can be managed by the _inNIOBuffer again.
in_buffer.position(0);
in_buffer.limit(in_buffer.capacity());
}
// handle the unwrap results
switch(_result.getStatus())
{
case BUFFER_OVERFLOW:
LOG.debug("{} unwrap overflow",_session);
return false;
case BUFFER_UNDERFLOW:
// Not enough data,
// If we are closed, we will never get more, so EOF
// else return and we will be tried again
// later when more data arriving causes another dispatch.
if (LOG.isDebugEnabled()) LOG.debug("{} unwrap {}",_session,_result);
if(!isOpen())
{
_inNIOBuffer.clear();
if (_outNIOBuffer!=null)
_outNIOBuffer.clear();
throw new EofException();
}
return (total_filled > 0);
case CLOSED:
_closing=true;
// return true is some bytes somewhere were moved about.
return total_filled>0 ||_result.bytesConsumed()>0 || _result.bytesProduced()>0;
case OK:
// return true is some bytes somewhere were moved about.
return total_filled>0 ||_result.bytesConsumed()>0 || _result.bytesProduced()>0;
default:
LOG.warn("{} unwrap default: {}",_session,_result);
throw new IOException(_result.toString());
releaseInBuffer();
}
}
@ -619,52 +651,60 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
}
/* ------------------------------------------------------------ */
private int wrap(final Buffer buffer) throws IOException
private synchronized int wrap(final Buffer buffer) throws IOException
{
ByteBuffer bbuf=extractOutputBuffer(buffer);
synchronized(bbuf)
{
int consumed=0;
needOutBuffer();
_outNIOBuffer.compact();
ByteBuffer out_buffer=_outNIOBuffer.getByteBuffer();
synchronized(out_buffer)
try
{
try
_outNIOBuffer.compact();
ByteBuffer out_buffer=_outNIOBuffer.getByteBuffer();
synchronized(out_buffer)
{
bbuf.position(buffer.getIndex());
bbuf.limit(buffer.putIndex());
out_buffer.position(_outNIOBuffer.putIndex());
out_buffer.limit(out_buffer.capacity());
_result=_engine.wrap(bbuf,out_buffer);
if (_debug) LOG.debug("{} wrap {}",_session,_result);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
_outNIOBuffer.setPutIndex(out_buffer.position());
consumed=_result.bytesConsumed();
}
catch(SSLException e)
{
LOG.warn(getRemoteAddr()+":"+getRemotePort()+" ",e);
if (getChannel().isOpen())
getChannel().close();
throw e;
}
finally
{
out_buffer.position(0);
bbuf.position(0);
bbuf.limit(bbuf.capacity());
if (consumed>0)
try
{
int len=consumed<buffer.length()?consumed:buffer.length();
buffer.skip(len);
consumed-=len;
bbuf.position(buffer.getIndex());
bbuf.limit(buffer.putIndex());
out_buffer.position(_outNIOBuffer.putIndex());
out_buffer.limit(out_buffer.capacity());
_result=_engine.wrap(bbuf,out_buffer);
if (_debug) LOG.debug("{} wrap {}",_session,_result);
if (!_handshook && _result.getHandshakeStatus()==SSLEngineResult.HandshakeStatus.FINISHED)
_handshook=true;
_outNIOBuffer.setPutIndex(out_buffer.position());
consumed=_result.bytesConsumed();
}
catch(SSLException e)
{
LOG.warn(getRemoteAddr()+":"+getRemotePort()+" ",e);
if (getChannel().isOpen())
getChannel().close(); // TODO ???
throw e;
}
finally
{
out_buffer.position(0);
bbuf.position(0);
bbuf.limit(bbuf.capacity());
if (consumed>0)
{
int len=consumed<buffer.length()?consumed:buffer.length();
buffer.skip(len);
consumed-=len;
}
}
}
}
finally
{
releaseOutBuffer();
}
}
switch(_result.getStatus())
{
case BUFFER_UNDERFLOW:
@ -678,7 +718,8 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
case OK:
return _result.bytesConsumed();
case CLOSED:
_closing=true;
if (super.isOpen())
super.close();
return _result.bytesConsumed()>0?_result.bytesConsumed():-1;
default:

View File

@ -1,62 +1,144 @@
package org.eclipse.jetty.io;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.junit.Assert;
import org.junit.Test;
public class EndPointTest
public abstract class EndPointTest<T extends EndPoint>
{
public static class Connection<T>
{
public T client;
public T server;
}
protected abstract Connection<T> newConnection() throws Exception;
@Test
public void testSocketEndPoints() throws Exception
public void testClientServerExchange() throws Exception
{
final ServerSocket server = new ServerSocket();
server.bind(null);
Connection<T> c = newConnection();
Buffer buffer = new IndirectNIOBuffer(4096);
final Exchanger<Socket> accepted = new Exchanger<Socket>();
new Thread(){
public void run()
{
try
{
accepted.exchange(server.accept());
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
c.client.flush(new ByteArrayBuffer("request"));
int len = c.server.fill(buffer);
assertEquals(7,len);
assertEquals("request",buffer.toString());
assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown());
Socket s0 = new Socket(server.getInetAddress(),server.getLocalPort());
Socket s1 = accepted.exchange(null,5,TimeUnit.SECONDS);
c.server.flush(new ByteArrayBuffer("response"));
c.server.shutdownOutput();
SocketEndPoint in = new SocketEndPoint(s0);
SocketEndPoint out = new SocketEndPoint(s1);
assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
buffer.clear();
len = c.client.fill(buffer);
assertEquals(8,len);
assertEquals("response",buffer.toString());
assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
buffer.clear();
len = c.client.fill(buffer);
assertEquals(-1,len);
assertTrue(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
c.client.shutdownOutput();
assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
buffer.clear();
len = c.server.fill(buffer);
assertEquals(-1,len);
assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown());
assertFalse(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
check(in,out);
}
private void check(EndPoint in, EndPoint out) throws Exception
@Test
public void testClientClose() throws Exception
{
String data="Now is the time for all good men to come to the aid of the party";
Buffer send = new ByteArrayBuffer(data);
Buffer receive = new IndirectNIOBuffer(4096);
Connection<T> c = newConnection();
Buffer buffer = new IndirectNIOBuffer(4096);
int lo=out.flush(send);
int li=in.fill(receive);
c.client.flush(new ByteArrayBuffer("request"));
int len = c.server.fill(buffer);
assertEquals(7,len);
assertEquals("request",buffer.toString());
assertTrue(c.client.isOpen());
assertFalse(c.client.isInputShutdown());
assertFalse(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown());
Assert.assertEquals(data.length(),lo);
Assert.assertEquals(data.length(),li);
Assert.assertEquals(data,receive.toString());
c.client.close();
assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertFalse(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown());
in.close();
out.close();
}
len = c.server.fill(buffer);
assertEquals(-1,len);
assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown());
assertTrue(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertFalse(c.server.isOutputShutdown());
c.server.shutdownOutput();
assertFalse(c.client.isOpen());
assertTrue(c.client.isInputShutdown());
assertTrue(c.client.isOutputShutdown());
assertFalse(c.server.isOpen());
assertTrue(c.server.isInputShutdown());
assertTrue(c.server.isOutputShutdown());
}
}

View File

@ -132,4 +132,46 @@ public class IOTest
}
@Test
public void testReset() throws Exception
{
ServerSocket connector;
Socket client;
Socket server;
connector = new ServerSocket(9123);
client = new Socket("192.168.1.13",connector.getLocalPort());
server = connector.accept();
client.setTcpNoDelay(true);
client.setSoLinger(true,0);
server.setTcpNoDelay(true);
server.setSoLinger(true,0);
client.getOutputStream().write(1);
assertEquals(1,server.getInputStream().read());
server.getOutputStream().write(1);
assertEquals(1,client.getInputStream().read());
// Server generator shutdowns output after non persistent sending response.
server.shutdownOutput();
// client endpoint reads EOF and shutdown input as result
assertEquals(-1,client.getInputStream().read());
client.shutdownInput();
// client connection see's EOF and shutsdown output as no more requests to be sent.
client.shutdownOutput();
// Since input already shutdown, client also closes socket.
client.close();
// Server reads the EOF from client oshut and shut's down it's input
assertEquals(-1,server.getInputStream().read());
server.shutdownInput();
// Since output was already shutdown, server closes
server.close();
}
}

View File

@ -0,0 +1,38 @@
package org.eclipse.jetty.io.bio;
import java.net.ServerSocket;
import java.net.Socket;
import org.eclipse.jetty.io.EndPointTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class SocketEndPointTest extends EndPointTest<SocketEndPoint>
{
static ServerSocket connector;
@BeforeClass
public static void open() throws Exception
{
connector = new ServerSocket();
connector.bind(null);
}
@AfterClass
public static void close() throws Exception
{
connector.close();
connector=null;
}
@Override
protected Connection<SocketEndPoint> newConnection() throws Exception
{
Connection<SocketEndPoint> c = new Connection<SocketEndPoint>();
c.client=new SocketEndPoint(new Socket(connector.getInetAddress(),connector.getLocalPort()));
c.server=new SocketEndPoint(connector.accept());
return c;
}
}

View File

@ -0,0 +1,39 @@
package org.eclipse.jetty.io.nio;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.EndPointTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class ChannelEndPointTest extends EndPointTest<ChannelEndPoint>
{
static ServerSocketChannel connector;
@BeforeClass
public static void open() throws Exception
{
connector = ServerSocketChannel.open();
connector.socket().bind(null);
}
@AfterClass
public static void close() throws Exception
{
connector.close();
connector=null;
}
@Override
protected Connection<ChannelEndPoint> newConnection() throws Exception
{
Connection<ChannelEndPoint> c = new Connection<ChannelEndPoint>();
c.client=new ChannelEndPoint(SocketChannel.open(connector.socket().getLocalSocketAddress()));
c.server=new ChannelEndPoint(connector.accept());
return c;
}
}

View File

@ -38,9 +38,9 @@ public class NestedParser implements Parser
return false;
}
public int parseAvailable() throws IOException
public boolean parseAvailable() throws IOException
{
return 0;
return false;
}
public boolean isMoreInBuffer() throws IOException
@ -53,4 +53,13 @@ public class NestedParser implements Parser
return false;
}
public boolean isPersistent()
{
return false;
}
public void setPersistent(boolean persistent)
{
}
}

View File

@ -183,6 +183,26 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
}
}
/* ------------------------------------------------------------ */
/* (non-Javadoc)
* @see javax.servlet.ServletRequest#isSuspended()
*/
public boolean isSuspending()
{
synchronized(this)
{
switch(_state)
{
case __ASYNCSTARTED:
case __ASYNCWAIT:
return true;
default:
return false;
}
}
}
/* ------------------------------------------------------------ */
@Override
@ -539,7 +559,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
EndPoint endp=_connection.getEndPoint();
if (!endp.isBlocking())
{
((AsyncEndPoint)endp).dispatch();
((AsyncEndPoint)endp).asyncDispatch();
}
}

View File

@ -7,15 +7,17 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.ChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class AsyncHttpConnection extends HttpConnection
public class AsyncHttpConnection extends HttpConnection implements AsyncConnection
{
private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100);
private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200);
private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
private int _total_no_progress;
@ -27,42 +29,42 @@ public class AsyncHttpConnection extends HttpConnection
public Connection handle() throws IOException
{
Connection connection = this;
boolean some_progress=false;
boolean progress=true;
boolean some_progress=false;
boolean progress=true;
// Loop while more in buffer
try
{
setCurrentConnection(this);
boolean more_in_buffer =false;
while (_endp.isOpen() && (more_in_buffer || progress) && connection==this)
// While the endpoint is open
// AND we are not suspended
// AND we have more characters to read OR we made some progress
// AND the connection has not changed
while (_endp.isOpen() &&
!_request.getAsyncContinuation().isSuspending() &&
(_parser.isMoreInBuffer() || _endp.isBufferingInput() || progress) &&
connection==this)
{
progress=false;
try
{
// Handle resumed request
if (_request._async.isAsync() && !_request._async.isComplete())
handleRequest();
handleRequest();
// else Parse more input
else if (!_parser.isComplete() && _parser.parseAvailable()>0)
else if (!_parser.isComplete() && _parser.parseAvailable())
progress=true;
// Generate more output
if (_generator.isCommitted() && !_generator.isComplete() && _generator.flushBuffer()>0)
progress=true;
if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
if (_generator.flushBuffer()>0)
progress=true;
// Flush output from buffering endpoint
if (_endp.isBufferingOutput())
_endp.flush();
// Special case close handling.
// If we were dispatched and have made no progress, but io is shutdown, then close
if (!progress && !some_progress && (_endp.isInputShutdown()||_endp.isOutputShutdown()))
_endp.close();
}
catch (HttpException e)
{
@ -74,50 +76,32 @@ public class AsyncHttpConnection extends HttpConnection
}
_generator.sendError(e.getStatus(), e.getReason(), null, true);
_parser.reset();
_endp.close();
}
finally
{
// Do we need to complete a half close?
if (_endp.isInputShutdown() && (_parser.isIdle() || _parser.isComplete()))
{
LOG.debug("complete half close {}",this);
more_in_buffer=false;
_endp.close();
reset(true);
}
// else Is this request/response round complete?
else if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
{
// Is this request/response round complete and are fully flushed?
if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
{
// Reset the parser/generator
progress=true;
reset();
// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
{
_parser.reset();
_generator.reset(true);
connection=switched;
}
}
// Reset the parser/generator
// keep the buffers as we will cycle
progress=true;
reset(false);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
// TODO Is this required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
{
System.err.println("Safety net oshut!!!");
_endp.shutdownOutput();
}
}
// else Are we suspended?
else if (_request.isAsyncStarted())
{
LOG.debug("suspended {}",this);
more_in_buffer=false;
progress=false;
}
else
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
some_progress|=progress|((SelectChannelEndPoint)_endp).isProgressing();
}
}
@ -127,36 +111,32 @@ public class AsyncHttpConnection extends HttpConnection
setCurrentConnection(null);
_parser.returnBuffers();
_generator.returnBuffers();
// Check if we are write blocked
if (_generator.isCommitted() && !_generator.isComplete() && _endp.isOpen() && !_endp.isOutputShutdown())
((AsyncEndPoint)_endp).scheduleWrite(); // TODO. This should not be required
if (some_progress)
// Safety net to catch spinning
if (!some_progress)
{
_total_no_progress=0;
}
else
{
int totalNoProgress=++_total_no_progress;
if (NO_PROGRESS_INFO>0 && totalNoProgress==NO_PROGRESS_INFO && (NO_PROGRESS_CLOSE<=0 || totalNoProgress<NO_PROGRESS_CLOSE))
_total_no_progress++;
if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp);
if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
{
LOG.info("EndPoint making no progress: {} {}", totalNoProgress, _endp);
}
if (NO_PROGRESS_CLOSE>0 && totalNoProgress==NO_PROGRESS_CLOSE)
{
LOG.warn("Closing EndPoint making no progress: {} {}", totalNoProgress, _endp);
LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp);
if (_endp instanceof SelectChannelEndPoint)
{
System.err.println(((SelectChannelEndPoint)_endp).getSelectManager().dump());
((SelectChannelEndPoint)_endp).getChannel().close();
}
}
}
}
return connection;
}
public void onInputShutdown() throws IOException
{
// If we don't have a committed response and we are not suspended
if (_generator.isIdle() && !_request.getAsyncContinuation().isSuspended())
{
// then no more can happen, so close.
_endp.shutdownOutput();
}
}
}

View File

@ -30,42 +30,43 @@ public class BlockingHttpConnection extends HttpConnection
}
@Override
protected void handleRequest() throws IOException
{
super.handleRequest();
}
public Connection handle() throws IOException
{
Connection connection = this;
// Loop while more in buffer
boolean more_in_buffer =true; // assume true until proven otherwise
boolean progress=true;
try
{
setCurrentConnection(this);
while (more_in_buffer && _endp.isOpen())
// do while the endpoint is open
// AND the connection has not changed
while (_endp.isOpen() && connection==this)
{
try
{
progress=false;
// If we are not ended then parse available
if (!_parser.isComplete())
_parser.parseAvailable();
if (!_parser.isComplete() && !_endp.isInputShutdown())
progress |= _parser.parseAvailable();
// Do we have more generating to do?
// Loop here because some writes may take multiple steps and
// we need to flush them all before potentially blocking in the
// next loop.
while (_generator.isCommitted() && !_generator.isComplete())
{
long written=_generator.flushBuffer();
if (written<=0)
break;
if (_endp.isBufferingOutput())
_endp.flush();
}
if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
progress |= _generator.flushBuffer()>0;
// Flush buffers
if (_endp.isBufferingOutput())
_endp.flush();
}
catch (HttpException e)
{
@ -80,58 +81,37 @@ public class BlockingHttpConnection extends HttpConnection
_endp.shutdownOutput();
}
finally
{
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
// Is this request/response round complete?
{
// Is this request/response round complete and are fully flushed?
if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
{
// Reset the parser/generator
progress=true;
reset();
// look for a switched connection instance?
Connection switched=(_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
?(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection"):null;
// have we switched?
if (switched!=null)
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
_parser.reset();
_generator.reset(true);
connection=switched;
Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
}
else
// TODO Is this required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
{
// No switch, so cleanup and reset
if (!_generator.isPersistent() || _endp.isInputShutdown())
{
_parser.reset();
more_in_buffer=false;
_endp.close();
}
if (more_in_buffer)
{
reset(false);
more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
}
else
reset(true);
System.err.println("Safety net oshut!!!");
_endp.shutdownOutput();
}
}
else if (_parser.isIdle() && _endp.isInputShutdown())
{
more_in_buffer=false;
_endp.close();
}
if (_request.isAsyncStarted())
throw new IllegalStateException();
}
}
}
}
finally
{
_parser.returnBuffers();
setCurrentConnection(null);
_handling=false;
_parser.returnBuffers();
_generator.returnBuffers();
}
return connection;
}

View File

@ -378,18 +378,16 @@ public abstract class HttpConnection extends AbstractConnection
}
/* ------------------------------------------------------------ */
public void reset(boolean returnBuffers)
public void reset()
{
_parser.reset();
if (returnBuffers)
_parser.returnBuffers();
_parser.returnBuffers(); // TODO maybe only on unhandle
_requestFields.clear();
_request.recycle();
_generator.reset(returnBuffers); // TODO maybe only release when low on resources
_generator.reset();
_generator.returnBuffers();// TODO maybe only on unhandle
_responseFields.clear();
_response.recycle();
_uri.clear();
}
@ -560,7 +558,7 @@ public abstract class HttpConnection extends AbstractConnection
LOG.warn("header full: "+e);
_response.reset();
_generator.reset(true);
_generator.reset();
_generator.setResponse(HttpStatus.INTERNAL_SERVER_ERROR_500,null);
_generator.completeHeader(_responseFields,Generator.LAST);
_generator.complete();
@ -592,7 +590,7 @@ public abstract class HttpConnection extends AbstractConnection
LOG.debug(e);
_response.reset();
_generator.reset(true);
_generator.reset();
_generator.setResponse(HttpStatus.INTERNAL_SERVER_ERROR_500,null);
_generator.completeHeader(_responseFields,Generator.LAST);
_generator.complete();
@ -725,7 +723,8 @@ public abstract class HttpConnection extends AbstractConnection
case HttpMethods.HEAD_ORDINAL:
_head=true;
// fall through
_uri.parse(uri.array(), uri.getIndex(), uri.length());
break;
default:
_uri.parse(uri.array(), uri.getIndex(), uri.length());
@ -817,46 +816,6 @@ public abstract class HttpConnection extends AbstractConnection
value = MimeTypes.CACHE.lookup(value);
_charset=MimeTypes.getCharsetFromContentType(value);
break;
case HttpHeaders.CONNECTION_ORDINAL:
//looks rather clumsy, but the idea is to optimize for a single valued header
switch(HttpHeaderValues.CACHE.getOrdinal(value))
{
case -1:
{
String[] values = value.toString().split(",");
for (int i=0;values!=null && i<values.length;i++)
{
CachedBuffer cb = HttpHeaderValues.CACHE.get(values[i].trim());
if (cb!=null)
{
switch(cb.getOrdinal())
{
case HttpHeaderValues.CLOSE_ORDINAL:
_responseFields.add(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.CLOSE_BUFFER);
_generator.setPersistent(false);
break;
case HttpHeaderValues.KEEP_ALIVE_ORDINAL:
if (_version==HttpVersions.HTTP_1_0_ORDINAL)
_responseFields.add(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.KEEP_ALIVE_BUFFER);
break;
}
}
}
break;
}
case HttpHeaderValues.CLOSE_ORDINAL:
_responseFields.put(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.CLOSE_BUFFER);
_generator.setPersistent(false);
break;
case HttpHeaderValues.KEEP_ALIVE_ORDINAL:
if (_version==HttpVersions.HTTP_1_0_ORDINAL)
_responseFields.put(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.KEEP_ALIVE_BUFFER);
break;
}
}
_requestFields.add(name, value);
@ -878,14 +837,23 @@ public abstract class HttpConnection extends AbstractConnection
break;
case HttpVersions.HTTP_1_0_ORDINAL:
_generator.setHead(_head);
if (_parser.isPersistent())
{
_responseFields.add(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.KEEP_ALIVE_BUFFER);
_generator.setPersistent(true);
}
if (_server.getSendDateHeader())
_generator.setDate(_request.getTimeStampBuffer());
break;
case HttpVersions.HTTP_1_1_ORDINAL:
_generator.setHead(_head);
if (!_parser.isPersistent())
{
_responseFields.add(HttpHeaders.CONNECTION_BUFFER,HttpHeaderValues.CLOSE_BUFFER);
_generator.setPersistent(false);
}
if (_server.getSendDateHeader())
_generator.setDate(_request.getTimeStampBuffer());
@ -965,6 +933,7 @@ public abstract class HttpConnection extends AbstractConnection
{
LOG.debug("Bad request!: "+version+" "+status+" "+reason);
}
}

View File

@ -20,6 +20,7 @@ import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
@ -423,13 +424,13 @@ public class ConnectHandler extends HandlerWrapper
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey);
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, channel.socket().getSoTimeout());
endp.setMaxIdleTime(_writeTimeout);
return endp;
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment();
proxyToServer.setTimeStamp(System.currentTimeMillis());
@ -461,7 +462,9 @@ public class ConnectHandler extends HandlerWrapper
}
}
public class ProxyToServerConnection implements Connection
public class ProxyToServerConnection implements AsyncConnection
{
private final CountDownLatch _ready = new CountDownLatch(1);
private final Buffer _buffer = new IndirectNIOBuffer(1024);
@ -541,6 +544,11 @@ public class ConnectHandler extends HandlerWrapper
}
}
public void onInputShutdown() throws IOException
{
// TODO
}
private void writeData() throws IOException
{
// This method is called from handle() and closeServer()
@ -671,7 +679,7 @@ public class ConnectHandler extends HandlerWrapper
}
}
public class ClientToProxyConnection implements Connection
public class ClientToProxyConnection implements AsyncConnection
{
private final Buffer _buffer = new IndirectNIOBuffer(1024);
private final ConcurrentMap<String, Object> _context;
@ -758,6 +766,11 @@ public class ConnectHandler extends HandlerWrapper
_logger.debug("{}: end reading from client", this);
}
}
public void onInputShutdown() throws IOException
{
// TODO
}
public long getTimeStamp()
{

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
@ -287,7 +288,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
/* ------------------------------------------------------------------------------- */
protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
{
return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
}
@ -365,7 +366,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
}
@Override
protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
{
return SelectChannelConnector.this.newConnection(channel,endpoint);
}

View File

@ -32,9 +32,11 @@ import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
@ -549,9 +551,9 @@ public class SslSelectChannelConnector extends SelectChannelConnector implements
/* ------------------------------------------------------------------------------- */
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
HttpConnection connection=(HttpConnection)super.newConnection(channel,endpoint);
AsyncHttpConnection connection=(AsyncHttpConnection)super.newConnection(channel,endpoint);
((HttpParser)connection.getParser()).setForceContentBuffer(true);
return connection;
}

View File

@ -51,6 +51,7 @@ public class AsyncUploadTest
{
server = new Server();
connector = new SelectChannelConnector();
connector.setMaxIdleTime(10000);
server.addConnector(connector);
server.setHandler(new EmptyHandler());
server.start();
@ -91,11 +92,9 @@ public class AsyncUploadTest
InputStream in = socket.getInputStream();
String response = IO.toString(in);
// System.err.println(response);
assertTrue(response.indexOf("200 OK")>0);
long end = System.nanoTime();
System.err.println("upload time: " + TimeUnit.NANOSECONDS.toMillis(end - start));
assertEquals(content.length, total);
}
@ -120,7 +119,6 @@ public class AsyncUploadTest
int read;
while((read =in.read(b))>=0)
total += read;
System.err.println("Read "+ total);
}
catch(Exception e)
{

View File

@ -430,8 +430,8 @@ public class HttpConnectionTest
@Test
public void testOversizedResponse() throws Exception
{
String str = "thisisastringthatshouldreachover1kbytes";
for (int i=0;i<400;i++)
String str = "thisisastringthatshouldreachover1kbytes-";
for (int i=0;i<500;i++)
str+="xxxxxxxxxxxx";
final String longstr = str;

View File

@ -57,7 +57,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
private static final String REQUEST1=REQUEST1_HEADER+REQUEST1_CONTENT.getBytes().length+"\n\n"+REQUEST1_CONTENT;
/** The expected response. */
private static final String RESPONSE1="HTTP/1.1 200 OK\n"+"Connection: close\n"+"Server: Jetty("+Server.getVersion()+")\n"+"\n"+"Hello world\n";
private static final String RESPONSE1="HTTP/1.1 200 OK\n"+"Content-Length: 13\n"+"Server: Jetty("+Server.getVersion()+")\n"+"\n"+"Hello world\n";
// Break the request up into three pieces, splitting the header.
private static final String FRAGMENT1=REQUEST1.substring(0,16);
@ -143,14 +143,19 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
{
OutputStream os=client.getOutputStream();
os.write(("GET /R2 HTTP/1.1\015\012"+"Host: localhost\015\012"+"Transfer-Encoding: chunked\015\012"+"Content-Type: text/plain\015\012"
+"Connection: close\015\012"+"\015\012").getBytes());
os.write(("GET /R2 HTTP/1.1\015\012"+
"Host: localhost\015\012"+
"Transfer-Encoding: chunked\015\012"+
"Content-Type: text/plain\015\012"+
"Connection: close\015\012"+
"\015\012").getBytes());
os.flush();
Thread.sleep(PAUSE);
os.write(("5\015\012").getBytes());
os.flush();
Thread.sleep(PAUSE);
os.write(("ABCDE\015\012"+"0;\015\012\015\012").getBytes());
os.write(("ABCDE\015\012"+
"0;\015\012\015\012").getBytes());
os.flush();
// Read the response.

View File

@ -94,10 +94,10 @@ public class NetworkTrafficListenerTest
// Connect to the server
Socket socket = new Socket("localhost", port);
assertTrue(openedLatch.await(1, TimeUnit.SECONDS));
assertTrue(openedLatch.await(10, TimeUnit.SECONDS));
socket.close();
assertTrue(closedLatch.await(1, TimeUnit.SECONDS));
assertTrue(closedLatch.await(10, TimeUnit.SECONDS));
}
@Test

View File

@ -723,7 +723,7 @@ public class RFC2616Test
"GET /R2 HTTP/1.0\n"+"Host: localhost\n"+"Connection: close\n"+"\n"+
"GET /R3 HTTP/1.0\n"+"Host: localhost\n"+"Connection: close\n"+"\n");
offset=checkContains(response,offset,"HTTP/1.1 200 OK\015\012","19.6.2 Keep-alive 1")+1;
offset=checkContains(response,offset,"Connection: keep-alive","19.6.2 Keep-alive 1")+1;
@ -732,7 +732,6 @@ public class RFC2616Test
offset=checkContains(response,offset,"/R1","19.6.2 Keep-alive 1")+1;
offset=checkContains(response,offset,"HTTP/1.1 200 OK\015\012","19.6.2 Keep-alive 2")+11;
offset=checkContains(response,offset,"Connection: close","19.6.2 Keep-alive close")+1;
offset=checkContains(response,offset,"/R2","19.6.2 Keep-alive close")+3;
assertEquals("19.6.2 closed",-1,response.indexOf("/R3"));
@ -756,7 +755,6 @@ public class RFC2616Test
offset=checkContains(response,offset,"ABCDEFGHIJ","19.6.2 Keep-alive 1")+1;
offset=checkContains(response,offset,"HTTP/1.1 200 OK\015\012","19.6.2 Keep-alive 2")+11;
offset=checkContains(response,offset,"Connection: close","19.6.2 Keep-alive close")+1;
offset=checkContains(response,offset,"/R2","19.6.2 Keep-alive close")+3;
assertEquals("19.6.2 closed",-1,response.indexOf("/R3"));

View File

@ -467,13 +467,12 @@ public class RequestTest
"\n"
);
assertTrue(response.indexOf("200")>0);
assertTrue(response.indexOf("Connection: close")>0);
assertTrue(response.indexOf("Hello World")>0);
response=_connector.getResponses(
"GET / HTTP/1.0\n"+
"Host: whatever\n"+
"Connection: Other, keep-alive\n"+
"Connection: Other,,keep-alive\n"+
"\n"
);
assertTrue(response.indexOf("200")>0);

View File

@ -492,7 +492,7 @@ public class ResponseTest
endPoint.setOut(new ByteArrayBuffer(1024));
endPoint.setGrowOutput(true);
HttpConnection connection=new TestHttpConnection(connector, endPoint, connector.getServer());
connection.getGenerator().reset(false);
connection.getGenerator().reset();
HttpConnection.setCurrentConnection(connection);
Response response = connection.getResponse();
connection.getRequest().setRequestURI("/test");

View File

@ -24,14 +24,5 @@ public class SocketServerTest extends HttpServerTestBase
public static void init() throws Exception
{
startServer(new SocketConnector());
}
@Override
public void testBlockingWhileReadingRequestContent() throws Exception
{
// TODO Auto-generated method stub
super.testBlockingWhileReadingRequestContent();
}
}
}

View File

@ -40,6 +40,7 @@ public class ConnectHandlerSSLTest extends AbstractConnectHandlerTest
public static void init() throws Exception
{
SslSelectChannelConnector connector = new SslSelectChannelConnector();
connector.setMaxIdleTime(3600000); // TODO remove
String keyStorePath = MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath();
SslContextFactory cf = connector.getSslContextFactory();
@ -60,6 +61,7 @@ public class ConnectHandlerSSLTest extends AbstractConnectHandlerTest
"Host: " + hostPort + "\r\n" +
"\r\n";
Socket socket = newSocket();
socket.setSoTimeout(3600000); // TODO remove
try
{
OutputStream output = socket.getOutputStream();
@ -70,6 +72,7 @@ public class ConnectHandlerSSLTest extends AbstractConnectHandlerTest
// Expect 200 OK from the CONNECT request
Response response = readResponse(input);
System.err.println(response);
assertEquals("200", response.getCode());
// Be sure the buffered input does not have anything buffered
@ -82,7 +85,7 @@ public class ConnectHandlerSSLTest extends AbstractConnectHandlerTest
output = sslSocket.getOutputStream();
input = new BufferedReader(new InputStreamReader(sslSocket.getInputStream()));
request = "" +
request =
"GET /echo HTTP/1.1\r\n" +
"Host: " + hostPort + "\r\n" +
"\r\n";

View File

@ -19,6 +19,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.ssl.SslContextFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
@ -53,7 +54,7 @@ public class SslTruncationAttackTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException
{
return new SslSelectChannelEndPoint(getSslBuffers(), channel, selectSet, key, createSSLEngine(channel))
return new SslSelectChannelEndPoint(getSslBuffers(), channel, selectSet, key, createSSLEngine(channel),channel.socket().getSoTimeout())
{
@Override
public void close() throws IOException
@ -65,7 +66,7 @@ public class SslTruncationAttackTest
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
AsyncHttpConnection connection=new AsyncHttpConnection(this, endpoint, server)
{

View File

@ -540,9 +540,9 @@ public class StdErrLog implements Logger
{
StdErrLog sel = new StdErrLog(fullname);
// Preserve configuration for new loggers configuration
sel.setPrintLongNames(_printLongNames);
// Let Level come from configured Properties instead - sel.setLevel(_level);
sel.setSource(_source);
//sel.setPrintLongNames(_printLongNames);
//sel.setLevel(_level);
//sel.setSource(_source);
logger = __loggers.putIfAbsent(fullname,sel);
if (logger == null)
{

View File

@ -16,6 +16,7 @@ import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
@ -207,11 +208,11 @@ public class WebSocketClientFactory extends AggregateLifeCycle
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,sKey);
return new SelectChannelEndPoint(channel,selectSet,sKey,channel.socket().getSoTimeout());
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
@ -255,7 +256,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
class HandshakeConnection extends AbstractConnection implements AsyncConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketClient.WebSocketFuture _future;
@ -370,15 +371,11 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{
while (_endp.isOpen() && !_parser.isComplete())
{
switch (_parser.parseAvailable())
if (!_parser.parseAvailable())
{
case -1:
if (_endp.isInputShutdown())
_future.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
default:
break;
return this;
}
}
if (_error==null)
@ -407,6 +404,11 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return this;
}
public void onInputShutdown() throws IOException
{
// TODO
}
public boolean isIdle()
{
return false;

View File

@ -9,8 +9,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
public interface WebSocketConnection extends Connection
public interface WebSocketConnection extends AsyncConnection
{
void fillBuffersFrom(Buffer buffer);

View File

@ -202,6 +202,12 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
return this;
}
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */
private void doTheHixieHixieShake()
{

View File

@ -212,6 +212,12 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
return this;
}
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{

View File

@ -257,6 +257,12 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
return this;
}
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{

View File

@ -264,6 +264,12 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
return this;
}
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// TODO
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{