307898 Handle large/async websocket messages

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1434 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2010-04-01 16:39:26 +00:00
parent 50dd4c94b1
commit 54117dad5b
5 changed files with 43 additions and 7 deletions
VERSION.txt
jetty-client/src/main/java/org/eclipse/jetty/client
jetty-io/src/main/java/org/eclipse/jetty/io
jetty-server/src/main/java/org/eclipse/jetty/server
jetty-websocket/src/main/java/org/eclipse/jetty/websocket

View File

@ -1,3 +1,7 @@
jetty-7.1-SNAPSHOT
+ 307898 Handle large/async websocket messages
jetty-7.0.2.v20100331 jetty-7.0.2.v20100331
+ 297552 Don't call Continuation timeouts from acceptor tick + 297552 Don't call Continuation timeouts from acceptor tick
+ 298236 Additional unit tests for jetty-client + 298236 Additional unit tests for jetty-client

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersions; import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint; import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.ByteArrayBuffer;
@ -127,7 +128,7 @@ public class HttpConnection implements Connection
} }
else else
{ {
SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp; AsyncEndPoint scep = (AsyncEndPoint)_endp;
scep.scheduleWrite(); scep.scheduleWrite();
} }
_destination.getHttpClient().schedule(_timeout); _destination.getHttpClient().schedule(_timeout);

View File

@ -28,4 +28,21 @@ public interface AsyncEndPoint extends EndPoint
*/ */
public boolean isReadyForDispatch(); public boolean isReadyForDispatch();
/* ------------------------------------------------------------ */
/** Set the writable status.
* The writable status is considered next time the async scheduling
* is calculated.
*
* @param writable true if the endpoint is known to be writable or false
* if it is known to not be writable.
*/
public void setWritable(boolean writable);
/* ------------------------------------------------------------ */
/** Schedule a write dispatch.
* Set the endpoint to not be writable and schedule a dispatch when
* it becomes writable.
*/
public void scheduleWrite();
} }

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersions; import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.Parser; import org.eclipse.jetty.http.Parser;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -494,8 +495,8 @@ public class HttpConnection implements Connection
Log.debug("return with suspended request"); Log.debug("return with suspended request");
more_in_buffer=false; more_in_buffer=false;
} }
else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO remove SelectChannel dependency else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof AsyncEndPoint)
((SelectChannelEndPoint)_endp).setWritable(false); ((AsyncEndPoint)_endp).setWritable(false);
} }
} }
} }

View File

@ -2,6 +2,7 @@ package org.eclipse.jetty.websocket;
import java.io.IOException; import java.io.IOException;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -94,11 +95,10 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{ {
int flushed=_generator.flush(); int flushed=_generator.flush();
int filled=_parser.parseNext(); int filled=_parser.parseNext();
more = flushed>0 || filled>0 || !_parser.isBufferEmpty() || !_generator.isBufferEmpty();
// System.err.println("flushed="+flushed+" filled="+filled+" more="+more+" p.e="+_parser.isBufferEmpty()+" g.e="+_generator.isBufferEmpty());
// TODO remove this potential busy loop. more should be true if content was parsed even if no bytes were filled!
more = flushed>0 || filled>0 || !_parser.isBufferEmpty();
if (filled<0 || flushed<0) if (filled<0 || flushed<0)
{ {
_endp.close(); _endp.close();
@ -115,7 +115,12 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
finally finally
{ {
if (_endp.isOpen()) if (_endp.isOpen())
{
_idle.access(_endp); _idle.access(_endp);
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
}
else else
// TODO - not really the best way // TODO - not really the best way
_websocket.onDisconnect(); _websocket.onDisconnect();
@ -147,6 +152,8 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{ {
_generator.addFrame(WebSocket.SENTINEL_FRAME,content,_maxIdleTimeMs); _generator.addFrame(WebSocket.SENTINEL_FRAME,content,_maxIdleTimeMs);
_generator.flush(); _generator.flush();
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
_idle.access(_endp); _idle.access(_endp);
} }
@ -154,6 +161,8 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{ {
_generator.addFrame(frame,content,_maxIdleTimeMs); _generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush(); _generator.flush();
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
_idle.access(_endp); _idle.access(_endp);
} }
@ -161,6 +170,8 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{ {
_generator.addFrame(frame,content,_maxIdleTimeMs); _generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush(); _generator.flush();
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
_idle.access(_endp); _idle.access(_endp);
} }
@ -168,6 +179,8 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{ {
_generator.addFrame(frame,content,offset,length,_maxIdleTimeMs); _generator.addFrame(frame,content,offset,length,_maxIdleTimeMs);
_generator.flush(); _generator.flush();
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
_idle.access(_endp); _idle.access(_endp);
} }