Fixed bug in AsyncEndPoint.hasProgressed() handling: the progressing status

must be remembered until a call to hasProgressed() is made.
Additional code cleanups.
This commit is contained in:
Simone Bordet 2011-11-24 20:58:59 +01:00
parent c6103f2d53
commit 8e913fe7c0
7 changed files with 53 additions and 78 deletions

View File

@ -122,16 +122,16 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
exchange.getEventListener().onRequestComplete(); exchange.getEventListener().onRequestComplete();
} }
// Flush output
_endp.flush();
// Read any input that is available // Read any input that is available
if (!_parser.isComplete() && _parser.parseAvailable()) if (!_parser.isComplete() && _parser.parseAvailable())
{ {
LOG.debug("parsed"); LOG.debug("parsed {}",exchange);
progress=true; progress=true;
} }
// Flush output
_endp.flush();
// Has any IO been done by the endpoint itself since last loop // Has any IO been done by the endpoint itself since last loop
if (_asyncEndp.hasProgressed()) if (_asyncEndp.hasProgressed())
{ {

View File

@ -31,16 +31,14 @@ import org.eclipse.jetty.util.log.Logger;
*/ */
public class BlockingHttpConnection extends AbstractHttpConnection public class BlockingHttpConnection extends AbstractHttpConnection
{ {
private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class); private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
private boolean _requestComplete; private boolean _requestComplete;
private int _status;
private Buffer _requestContentChunk; private Buffer _requestContentChunk;
BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
{ {
super(requestBuffers,responseBuffers,endp); super(requestBuffers, responseBuffers, endPoint);
} }
protected void reset() throws IOException protected void reset() throws IOException
@ -131,15 +129,14 @@ public class BlockingHttpConnection extends AbstractHttpConnection
exchange.getEventListener().onRequestComplete(); exchange.getEventListener().onRequestComplete();
} }
// Flush output
_endp.flush();
// Read any input that is available // Read any input that is available
if (!_parser.isComplete() && _parser.parseAvailable()) if (!_parser.isComplete() && _parser.parseAvailable())
{ {
LOG.debug("parsed"); LOG.debug("parsed");
} }
// Flush output
_endp.flush();
} }
catch (Throwable e) catch (Throwable e)
{ {
@ -231,7 +228,6 @@ public class BlockingHttpConnection extends AbstractHttpConnection
if (_exchange==null && !isReserved()) // TODO how do we return switched connections? if (_exchange==null && !isReserved()) // TODO how do we return switched connections?
_destination.returnConnection(this, !persistent); _destination.returnConnection(this, !persistent);
} }
} }
} }
} }
@ -245,14 +241,6 @@ public class BlockingHttpConnection extends AbstractHttpConnection
return connection; return connection;
} }
public void onInputShutdown() throws IOException
{
if (_generator.isIdle())
_endp.shutdownOutput();
}
@Override @Override
public boolean send(HttpExchange ex) throws IOException public boolean send(HttpExchange ex) throws IOException
{ {

View File

@ -169,10 +169,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
// key should have destination at this point (will be replaced by endpoint after this call) // key should have destination at this point (will be replaced by endpoint after this call)
HttpDestination dest=(HttpDestination)key.attachment(); HttpDestination dest=(HttpDestination)key.attachment();
AsyncEndPoint ep=null; SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
AsyncEndPoint ep = scep;
SelectChannelEndPoint scep= new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
ep = scep;
if (dest.isSecure()) if (dest.isSecure())
{ {
@ -276,10 +274,10 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
public void upgrade() public void upgrade()
{ {
AsyncHttpConnection connection = (AsyncHttpConnection) ((SelectChannelEndPoint)_endp).getConnection(); AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection();
SslConnection sslConnection = new SslConnection(_engine,_endp); SslConnection sslConnection = new SslConnection(_engine,_endp);
((SelectChannelEndPoint)_endp).setConnection(sslConnection); _endp.setConnection(sslConnection);
_endp=sslConnection.getSslEndPoint(); _endp=sslConnection.getSslEndPoint();
sslConnection.getSslEndPoint().setConnection(connection); sslConnection.getSslEndPoint().setConnection(connection);

View File

@ -178,7 +178,9 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
// If we are handshook let the delegate connection // If we are handshook let the delegate connection
if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING) if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
progress|=process(null,null); {
progress=process(null,null);
}
else else
{ {
// handle the delegate connection // handle the delegate connection
@ -389,7 +391,8 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
finally finally
{ {
releaseBuffers(); releaseBuffers();
_progressed.set(some_progress); if (some_progress)
_progressed.set(true);
} }
return some_progress; return some_progress;
} }
@ -582,11 +585,6 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
return _engine; return _engine;
} }
public SslConnection getSslConnection()
{
return SslConnection.this;
}
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
synchronized (SslConnection.this) synchronized (SslConnection.this)
@ -643,10 +641,9 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
public int flush(Buffer buffer) throws IOException public int flush(Buffer buffer) throws IOException
{ {
int size=buffer.length(); int size = buffer.length();
process(null,buffer); process(null, buffer);
int flushed=size-buffer.length(); return size-buffer.length();
return flushed;
} }
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException

View File

@ -79,7 +79,6 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
// Has any IO been done by the endpoint itself since last loop // Has any IO been done by the endpoint itself since last loop
if (_asyncEndp.hasProgressed()) if (_asyncEndp.hasProgressed())
progress=true; progress=true;
} }
catch (HttpException e) catch (HttpException e)
{ {
@ -122,6 +121,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{ {
// The request is suspended, so even though progress has been made, break the while loop // The request is suspended, so even though progress has been made, break the while loop
LOG.debug("suspended {}",this); LOG.debug("suspended {}",this);
// TODO: breaking inside finally blocks is bad: rethink how we should exit from here
break; break;
} }
} }
@ -131,11 +131,11 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{ {
setCurrentConnection(null); setCurrentConnection(null);
if (!_request.isAsyncStarted()) if (!_request.isAsyncStarted())
{ {
_parser.returnBuffers(); _parser.returnBuffers();
_generator.returnBuffers(); _generator.returnBuffers();
} }
// Safety net to catch spinning // Safety net to catch spinning
if (some_progress) if (some_progress)
_total_no_progress=0; _total_no_progress=0;

View File

@ -31,32 +31,26 @@ public class BlockingHttpConnection extends AbstractHttpConnection
{ {
private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class); private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
private volatile boolean _handling;
public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server) public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server)
{ {
super(connector,endpoint,server); super(connector,endpoint,server);
} }
public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server, Parser parser, Generator generator, Request request) public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server, Parser parser, Generator generator, Request request)
{ {
super(connector,endpoint,server,parser,generator,request); super(connector,endpoint,server,parser,generator,request);
} }
@Override @Override
protected void handleRequest() throws IOException protected void handleRequest() throws IOException
{ {
super.handleRequest(); super.handleRequest();
} }
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
Connection connection = this; Connection connection = this;
boolean progress=true;
try try
{ {
setCurrentConnection(this); setCurrentConnection(this);
@ -67,17 +61,16 @@ public class BlockingHttpConnection extends AbstractHttpConnection
{ {
try try
{ {
progress=false;
// If we are not ended then parse available // If we are not ended then parse available
if (!_parser.isComplete() && !_endp.isInputShutdown()) if (!_parser.isComplete() && !_endp.isInputShutdown())
progress |= _parser.parseAvailable(); _parser.parseAvailable();
// Do we have more generating to do? // Do we have more generating to do?
// Loop here because some writes may take multiple steps and // Loop here because some writes may take multiple steps and
// we need to flush them all before potentially blocking in the // we need to flush them all before potentially blocking in the
// next loop. // next loop.
if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown()) if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
progress |= _generator.flushBuffer()>0; _generator.flushBuffer();
// Flush buffers // Flush buffers
_endp.flush(); _endp.flush();
@ -100,9 +93,7 @@ public class BlockingHttpConnection extends AbstractHttpConnection
if (_parser.isComplete() && _generator.isComplete()) if (_parser.isComplete() && _generator.isComplete())
{ {
// Reset the parser/generator // Reset the parser/generator
progress=true;
reset(); reset();
_endp.flush();
// look for a switched connection instance? // look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
@ -121,6 +112,8 @@ public class BlockingHttpConnection extends AbstractHttpConnection
} }
} }
} }
return connection;
} }
finally finally
{ {
@ -128,7 +121,5 @@ public class BlockingHttpConnection extends AbstractHttpConnection
_parser.returnBuffers(); _parser.returnBuffers();
_generator.returnBuffers(); _generator.returnBuffers();
} }
return connection;
} }
} }

View File

@ -18,7 +18,6 @@ import java.io.UnsupportedEncodingException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -67,7 +66,7 @@ import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
public class WebSocketConnectionD13 extends AbstractConnection implements WebSocketConnection public class WebSocketConnectionD13 extends AbstractConnection implements WebSocketConnection
{ {
private static final Logger LOG = Log.getLogger(WebSocketConnectionD13.class); private static final Logger LOG = Log.getLogger(WebSocketConnectionD13.class);
final static byte OP_CONTINUATION = 0x00; final static byte OP_CONTINUATION = 0x00;
final static byte OP_TEXT = 0x01; final static byte OP_TEXT = 0x01;
final static byte OP_BINARY = 0x02; final static byte OP_BINARY = 0x02;
@ -247,7 +246,9 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
int filled=_parser.parseNext(); int filled=_parser.parseNext();
progress = flushed>0 || filled>0; progress = flushed>0 || filled>0;
_endp.flush();
if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed()) if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed())
progress=true; progress=true;
} }
@ -290,7 +291,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
if (!_closedIn) if (!_closedIn)
_endp.close(); _endp.close();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public boolean isIdle() public boolean isIdle()
{ {
@ -388,12 +389,12 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{ {
// Close code 1005/1006 are never to be sent as a status over // Close code 1005/1006 are never to be sent as a status over
// a Close control frame. Code<-1 also means no node. // a Close control frame. Code<-1 also means no node.
if (code<0 || (code == WebSocketConnectionD13.CLOSE_NO_CODE) || code==WebSocketConnectionD13.CLOSE_NO_CLOSE) if (code<0 || (code == WebSocketConnectionD13.CLOSE_NO_CODE) || code==WebSocketConnectionD13.CLOSE_NO_CLOSE)
code=-1; code=-1;
else if (code==0) else if (code==0)
code=WebSocketConnectionD13.CLOSE_NORMAL; code=WebSocketConnectionD13.CLOSE_NORMAL;
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0x100); bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100); bytes[1]=(byte)(code%0x100);
@ -464,7 +465,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
{ {
// TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented
if (_closedOut) if (_closedOut)
throw new IOException("closedOut "+_closeCode+":"+_closeMessage); throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length); _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
@ -613,7 +614,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{ {
close(CLOSE_NORMAL,null); close(CLOSE_NORMAL,null);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void close() public void close()
{ {
@ -653,7 +654,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
public void onFrame(final byte flags, final byte opcode, final Buffer buffer) public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
{ {
boolean lastFrame = isLastFrame(flags); boolean lastFrame = isLastFrame(flags);
synchronized(WebSocketConnectionD13.this) synchronized(WebSocketConnectionD13.this)
{ {
// Ignore incoming after a close // Ignore incoming after a close
@ -682,7 +683,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{ {
return; return;
} }
// Deliver frame if websocket is a FrameWebSocket // Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null) if (_onFrame!=null)
{ {
@ -695,7 +696,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return; return;
} }
switch(opcode) switch(opcode)
{ {
case WebSocketConnectionD13.OP_CONTINUATION: case WebSocketConnectionD13.OP_CONTINUATION:
@ -705,7 +706,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad Continuation"); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad Continuation");
return; return;
} }
// If text, append to the message buffer // If text, append to the message buffer
if (_onTextMessage!=null && _opcode==WebSocketConnectionD13.OP_TEXT) if (_onTextMessage!=null && _opcode==WebSocketConnectionD13.OP_TEXT)
{ {
@ -750,7 +751,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
case WebSocketConnectionD13.OP_PING: case WebSocketConnectionD13.OP_PING:
{ {
LOG.debug("PING {}",this); LOG.debug("PING {}",this);
if (!_closedOut) if (!_closedOut)
{ {
_connection.sendControl(WebSocketConnectionD13.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); _connection.sendControl(WebSocketConnectionD13.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
} }
@ -770,10 +771,10 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
if (buffer.length()>=2) if (buffer.length()>=2)
{ {
code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]); code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]);
// Validate close status codes. // Validate close status codes.
if (code < WebSocketConnectionD13.CLOSE_NORMAL || if (code < WebSocketConnectionD13.CLOSE_NORMAL ||
code == WebSocketConnectionD13.CLOSE_UNDEFINED || code == WebSocketConnectionD13.CLOSE_UNDEFINED ||
code == WebSocketConnectionD13.CLOSE_NO_CLOSE || code == WebSocketConnectionD13.CLOSE_NO_CLOSE ||
code == WebSocketConnectionD13.CLOSE_NO_CODE || code == WebSocketConnectionD13.CLOSE_NO_CODE ||
( code > 1010 && code <= 2999 ) || ( code > 1010 && code <= 2999 ) ||
@ -782,8 +783,8 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid close code " + code); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid close code " + code);
return; return;
} }
if (buffer.length()>2) if (buffer.length()>2)
{ {
if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize())) if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize()))
{ {
@ -791,10 +792,10 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
_utf8.reset(); _utf8.reset();
} }
} }
} }
else if(buffer.length() == 1) else if(buffer.length() == 1)
{ {
// Invalid length. use status code 1002 (Protocol error) // Invalid length. use status code 1002 (Protocol error)
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid payload length of 1"); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid payload length of 1");
return; return;
} }
@ -809,7 +810,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
return; return;
} }
if(_onTextMessage!=null) if(_onTextMessage!=null)
{ {
if (_connection.getMaxTextMessageSize()<=0) if (_connection.getMaxTextMessageSize()<=0)
@ -842,7 +843,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
} }
break; break;
} }
case WebSocketConnectionD13.OP_BINARY: case WebSocketConnectionD13.OP_BINARY:
{ {
if (_opcode!=-1) if (_opcode!=-1)
@ -850,7 +851,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
return; return;
} }
if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length())) if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
{ {
if (lastFrame) if (lastFrame)
@ -876,7 +877,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
default: default:
errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode)); errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode));
return; break;
} }
} }
catch(Utf8Appendable.NotUtf8Exception notUtf8) catch(Utf8Appendable.NotUtf8Exception notUtf8)
@ -896,7 +897,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
private void errorClose(int code, String message) private void errorClose(int code, String message)
{ {
_connection.close(code,message); _connection.close(code,message);
// Brutally drop the connection // Brutally drop the connection
try try
{ {
@ -908,7 +909,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
LOG.debug(e); LOG.debug(e);
} }
} }
private boolean checkBinaryMessageSize(int bufferLen, int length) private boolean checkBinaryMessageSize(int bufferLen, int length)
{ {
int max = _connection.getMaxBinaryMessageSize(); int max = _connection.getMaxBinaryMessageSize();