From 8e913fe7c0410db58f5a7fc284e7f494f6cc1663 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 24 Nov 2011 20:58:59 +0100 Subject: [PATCH] Fixed bug in AsyncEndPoint.hasProgressed() handling: the progressing status must be remembered until a call to hasProgressed() is made. Additional code cleanups. --- .../jetty/client/AsyncHttpConnection.java | 8 +-- .../jetty/client/BlockingHttpConnection.java | 20 ++------ .../eclipse/jetty/client/SelectConnector.java | 10 ++-- .../eclipse/jetty/io/nio/SslConnection.java | 19 +++---- .../jetty/server/AsyncHttpConnection.java | 6 +-- .../jetty/server/BlockingHttpConnection.java | 17 ++----- .../websocket/WebSocketConnectionD13.java | 51 ++++++++++--------- 7 files changed, 53 insertions(+), 78 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncHttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncHttpConnection.java index cc2f9c0b549..455edfbeb92 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncHttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AsyncHttpConnection.java @@ -122,16 +122,16 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async exchange.getEventListener().onRequestComplete(); } - // Flush output - _endp.flush(); - // Read any input that is available if (!_parser.isComplete() && _parser.parseAvailable()) { - LOG.debug("parsed"); + LOG.debug("parsed {}",exchange); progress=true; } + // Flush output + _endp.flush(); + // Has any IO been done by the endpoint itself since last loop if (_asyncEndp.hasProgressed()) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/BlockingHttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/BlockingHttpConnection.java index 47f16f9d4f0..d241699bdac 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/BlockingHttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/BlockingHttpConnection.java @@ -31,16 +31,14 @@ import org.eclipse.jetty.util.log.Logger; */ public class BlockingHttpConnection extends AbstractHttpConnection { - private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class); private boolean _requestComplete; - private int _status; private Buffer _requestContentChunk; - BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) + BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint) { - super(requestBuffers,responseBuffers,endp); + super(requestBuffers, responseBuffers, endPoint); } protected void reset() throws IOException @@ -131,15 +129,14 @@ public class BlockingHttpConnection extends AbstractHttpConnection exchange.getEventListener().onRequestComplete(); } - // Flush output - _endp.flush(); - // Read any input that is available if (!_parser.isComplete() && _parser.parseAvailable()) { LOG.debug("parsed"); } + // Flush output + _endp.flush(); } catch (Throwable e) { @@ -231,7 +228,6 @@ public class BlockingHttpConnection extends AbstractHttpConnection if (_exchange==null && !isReserved()) // TODO how do we return switched connections? _destination.returnConnection(this, !persistent); } - } } } @@ -245,14 +241,6 @@ public class BlockingHttpConnection extends AbstractHttpConnection return connection; } - - public void onInputShutdown() throws IOException - { - if (_generator.isIdle()) - _endp.shutdownOutput(); - } - - @Override public boolean send(HttpExchange ex) throws IOException { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index a52ff7ffd83..f405be40e59 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -169,10 +169,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, // key should have destination at this point (will be replaced by endpoint after this call) HttpDestination dest=(HttpDestination)key.attachment(); - AsyncEndPoint ep=null; - - SelectChannelEndPoint scep= new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); - ep = scep; + SelectChannelEndPoint scep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); + AsyncEndPoint ep = scep; if (dest.isSecure()) { @@ -276,10 +274,10 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, public void upgrade() { - AsyncHttpConnection connection = (AsyncHttpConnection) ((SelectChannelEndPoint)_endp).getConnection(); + AsyncHttpConnection connection = (AsyncHttpConnection)_endp.getConnection(); SslConnection sslConnection = new SslConnection(_engine,_endp); - ((SelectChannelEndPoint)_endp).setConnection(sslConnection); + _endp.setConnection(sslConnection); _endp=sslConnection.getSslEndPoint(); sslConnection.getSslEndPoint().setConnection(connection); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java index ee891d6874d..66bee561c30 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslConnection.java @@ -178,7 +178,9 @@ public class SslConnection extends AbstractConnection implements AsyncConnection // If we are handshook let the delegate connection if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING) - progress|=process(null,null); + { + progress=process(null,null); + } else { // handle the delegate connection @@ -389,7 +391,8 @@ public class SslConnection extends AbstractConnection implements AsyncConnection finally { releaseBuffers(); - _progressed.set(some_progress); + if (some_progress) + _progressed.set(true); } return some_progress; } @@ -582,11 +585,6 @@ public class SslConnection extends AbstractConnection implements AsyncConnection return _engine; } - public SslConnection getSslConnection() - { - return SslConnection.this; - } - public void shutdownOutput() throws IOException { synchronized (SslConnection.this) @@ -643,10 +641,9 @@ public class SslConnection extends AbstractConnection implements AsyncConnection public int flush(Buffer buffer) throws IOException { - int size=buffer.length(); - process(null,buffer); - int flushed=size-buffer.length(); - return flushed; + int size = buffer.length(); + process(null, buffer); + return size-buffer.length(); } public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java index 203af125d90..9d79c6cd78f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java @@ -79,7 +79,6 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async // Has any IO been done by the endpoint itself since last loop if (_asyncEndp.hasProgressed()) progress=true; - } catch (HttpException e) { @@ -122,6 +121,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async { // The request is suspended, so even though progress has been made, break the while loop LOG.debug("suspended {}",this); + // TODO: breaking inside finally blocks is bad: rethink how we should exit from here break; } } @@ -131,11 +131,11 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async { setCurrentConnection(null); if (!_request.isAsyncStarted()) - { + { _parser.returnBuffers(); _generator.returnBuffers(); } - + // Safety net to catch spinning if (some_progress) _total_no_progress=0; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingHttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingHttpConnection.java index 20797d9a62c..ec15289f914 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingHttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingHttpConnection.java @@ -31,32 +31,26 @@ public class BlockingHttpConnection extends AbstractHttpConnection { private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class); - private volatile boolean _handling; - public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server) { super(connector,endpoint,server); } - public BlockingHttpConnection(Connector connector, EndPoint endpoint, Server server, Parser parser, Generator generator, Request request) { super(connector,endpoint,server,parser,generator,request); } - @Override protected void handleRequest() throws IOException { super.handleRequest(); } - public Connection handle() throws IOException { Connection connection = this; - boolean progress=true; try { setCurrentConnection(this); @@ -67,17 +61,16 @@ public class BlockingHttpConnection extends AbstractHttpConnection { try { - progress=false; // If we are not ended then parse available if (!_parser.isComplete() && !_endp.isInputShutdown()) - progress |= _parser.parseAvailable(); + _parser.parseAvailable(); // Do we have more generating to do? // Loop here because some writes may take multiple steps and // we need to flush them all before potentially blocking in the // next loop. if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown()) - progress |= _generator.flushBuffer()>0; + _generator.flushBuffer(); // Flush buffers _endp.flush(); @@ -100,9 +93,7 @@ public class BlockingHttpConnection extends AbstractHttpConnection if (_parser.isComplete() && _generator.isComplete()) { // Reset the parser/generator - progress=true; reset(); - _endp.flush(); // look for a switched connection instance? if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) @@ -121,6 +112,8 @@ public class BlockingHttpConnection extends AbstractHttpConnection } } } + + return connection; } finally { @@ -128,7 +121,5 @@ public class BlockingHttpConnection extends AbstractHttpConnection _parser.returnBuffers(); _generator.returnBuffers(); } - return connection; } - } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD13.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD13.java index 380594a2c3d..835c756e2ea 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD13.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD13.java @@ -18,7 +18,6 @@ import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.util.Collections; import java.util.List; - import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -67,7 +66,7 @@ import org.eclipse.jetty.websocket.WebSocket.OnTextMessage; public class WebSocketConnectionD13 extends AbstractConnection implements WebSocketConnection { private static final Logger LOG = Log.getLogger(WebSocketConnectionD13.class); - + final static byte OP_CONTINUATION = 0x00; final static byte OP_TEXT = 0x01; final static byte OP_BINARY = 0x02; @@ -247,7 +246,9 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc int filled=_parser.parseNext(); progress = flushed>0 || filled>0; - + + _endp.flush(); + if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed()) progress=true; } @@ -290,7 +291,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc if (!_closedIn) _endp.close(); } - + /* ------------------------------------------------------------ */ public boolean isIdle() { @@ -388,12 +389,12 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc { // Close code 1005/1006 are never to be sent as a status over // a Close control frame. Code<-1 also means no node. - + if (code<0 || (code == WebSocketConnectionD13.CLOSE_NO_CODE) || code==WebSocketConnectionD13.CLOSE_NO_CLOSE) code=-1; else if (code==0) code=WebSocketConnectionD13.CLOSE_NORMAL; - + byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); bytes[0]=(byte)(code/0x100); bytes[1]=(byte)(code%0x100); @@ -464,7 +465,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException { - // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented + // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented if (_closedOut) throw new IOException("closedOut "+_closeCode+":"+_closeMessage); _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length); @@ -613,7 +614,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc { close(CLOSE_NORMAL,null); } - + /* ------------------------------------------------------------ */ public void close() { @@ -653,7 +654,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc public void onFrame(final byte flags, final byte opcode, final Buffer buffer) { boolean lastFrame = isLastFrame(flags); - + synchronized(WebSocketConnectionD13.this) { // Ignore incoming after a close @@ -682,7 +683,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc { return; } - + // Deliver frame if websocket is a FrameWebSocket if (_onFrame!=null) { @@ -695,7 +696,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) return; } - + switch(opcode) { case WebSocketConnectionD13.OP_CONTINUATION: @@ -705,7 +706,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad Continuation"); return; } - + // If text, append to the message buffer if (_onTextMessage!=null && _opcode==WebSocketConnectionD13.OP_TEXT) { @@ -750,7 +751,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc case WebSocketConnectionD13.OP_PING: { LOG.debug("PING {}",this); - if (!_closedOut) + if (!_closedOut) { _connection.sendControl(WebSocketConnectionD13.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); } @@ -770,10 +771,10 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc if (buffer.length()>=2) { code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]); - + // Validate close status codes. if (code < WebSocketConnectionD13.CLOSE_NORMAL || - code == WebSocketConnectionD13.CLOSE_UNDEFINED || + code == WebSocketConnectionD13.CLOSE_UNDEFINED || code == WebSocketConnectionD13.CLOSE_NO_CLOSE || code == WebSocketConnectionD13.CLOSE_NO_CODE || ( code > 1010 && code <= 2999 ) || @@ -782,8 +783,8 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid close code " + code); return; } - - if (buffer.length()>2) + + if (buffer.length()>2) { if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize())) { @@ -791,10 +792,10 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc _utf8.reset(); } } - } + } else if(buffer.length() == 1) { - // Invalid length. use status code 1002 (Protocol error) + // Invalid length. use status code 1002 (Protocol error) errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Invalid payload length of 1"); return; } @@ -809,7 +810,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); return; } - + if(_onTextMessage!=null) { if (_connection.getMaxTextMessageSize()<=0) @@ -842,7 +843,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc } break; } - + case WebSocketConnectionD13.OP_BINARY: { if (_opcode!=-1) @@ -850,7 +851,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); return; } - + if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length())) { if (lastFrame) @@ -876,7 +877,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc default: errorClose(WebSocketConnectionD13.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode)); - return; + break; } } catch(Utf8Appendable.NotUtf8Exception notUtf8) @@ -896,7 +897,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc private void errorClose(int code, String message) { _connection.close(code,message); - + // Brutally drop the connection try { @@ -908,7 +909,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc LOG.debug(e); } } - + private boolean checkBinaryMessageSize(int bufferLen, int length) { int max = _connection.getMaxBinaryMessageSize();