294563 Initial websocket implementation

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1040 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-11-09 01:22:06 +00:00
parent 7e74c61989
commit bb1bb0b960
9 changed files with 464 additions and 6 deletions

View File

@ -18,6 +18,7 @@ jetty-7.0.1-SNAPSHOT
+ 294154 Patched jetty-osgi
+ 294224 HttpClient timeout setting has no effect when connecting to host
+ 294345 Support for HTTP/301 + HTTP/302 response codes
+ 294563 Initial websocket implementation
+ JETTY-937 More JVM bug work arounds. Insert pause if all else fails
+ JETTY-983 Send content-length with multipart ranges
+ JETTY-1114 unsynchronised WebAppClassloader.getResource(String)

View File

@ -38,7 +38,7 @@ public class ChannelEndPoint implements EndPoint
{
protected final ByteChannel _channel;
protected final ByteBuffer[] _gather2=new ByteBuffer[2];
protected Socket _socket;
protected final Socket _socket;
protected InetSocketAddress _local;
protected InetSocketAddress _remote;
@ -49,8 +49,7 @@ public class ChannelEndPoint implements EndPoint
{
super();
this._channel = channel;
if (channel instanceof SocketChannel)
_socket=((SocketChannel)channel).socket();
_socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
}
public boolean isBlocking()

View File

@ -37,8 +37,8 @@ import org.eclipse.jetty.util.thread.Timeout;
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
{
private final SelectorManager.SelectSet _selectSet;
private final Connection _connection;
private final SelectorManager _manager;
private volatile Connection _connection;
private boolean _dispatched = false;
private boolean _redispatched = false;
private volatile boolean _writable = true;
@ -78,6 +78,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
return _manager;
}
/* ------------------------------------------------------------ */
public void setConnection(Connection connection)
{
_connection=connection;
}
/* ------------------------------------------------------------ */
/** Called by selectSet to schedule handling
@ -543,7 +549,5 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
}
}
}

63
jetty-websocket/pom.xml Normal file
View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>jetty-project</artifactId>
<groupId>org.eclipse.jetty</groupId>
<version>7.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
<name>Jetty :: Websocket</name>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>${felix.bundle.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>manifest</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>artifact-jar</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Connection;
public class WebSocketConnection implements Connection
{
WebSocketParser _parser;
WebSocketGenerator _generator;
public void handle() throws IOException
{
boolean more=true;
while (more)
{
int flushed=_generator.flush();
int filled=_parser.parseNext();
more = flushed>0 || filled>0 || !_parser.isBufferEmpty();
}
}
public boolean isIdle()
{
return _parser.isBufferEmpty() && _generator.isBufferEmpty();
}
public boolean isSuspended()
{
return false;
}
}

View File

@ -0,0 +1,18 @@
package org.eclipse.jetty.websocket;
public class WebSocketGenerator
{
public int flush()
{
// TODO Auto-generated method stub
return 0;
}
public boolean isBufferEmpty()
{
// TODO Auto-generated method stub
return false;
}
}

View File

@ -0,0 +1,188 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
/* ------------------------------------------------------------ */
/**
* Parser the WebSocket protocol.
*
*/
public class WebSocketParser
{
public static final int STATE_START=0;
public static final int STATE_SENTINEL_DATA=1;
public static final int STATE_LENGTH=2;
public static final int STATE_DATA=3;
private final Buffers _buffers;
private final EndPoint _endp;
private final EventHandler _handler;
private int _state;
private Buffer _buffer;
private byte _frame;
private int _length;
private Utf8StringBuilder _utf8;
/* ------------------------------------------------------------ */
/**
* @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used.
* This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data
* is mostly used.
* @param endp
* @param handler
*/
public WebSocketParser(Buffers buffers, EndPoint endp, EventHandler handler)
{
_buffers=buffers;
_endp=endp;
_handler=handler;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
/* ------------------------------------------------------------ */
public Buffer getBuffer()
{
return _buffer;
}
/* ------------------------------------------------------------ */
/** Parse to next event.
* Parse to the next {@link EventHandler} event or until no more data is
* available. Fill data from the {@link EndPoint} only as necessary.
* @return total bytes filled or -1 for EOF
*/
public int parseNext()
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
int total_filled=0;
// Loop until an datagram call back or can't fill anymore
while(true)
{
int length=_buffer.length();
// Fill buffer if we need a byte or need length
if (length == 0 || _state==STATE_DATA && length<_length)
{
// compact to mark (set at start of data)
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
throw new IllegalStateException("FULL");
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return total_filled>0?total_filled:-1;
total_filled+=filled;
length=_buffer.length();
}
catch(IOException e)
{
Log.debug(e);
return total_filled>0?total_filled:-1;
}
}
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
charloop: while (length-->0)
{
switch (_state)
{
case STATE_START:
b=_buffer.get();
_frame=b;
if (_frame<0)
{
_length=0;
_state=STATE_LENGTH;
}
else
{
if (_utf8==null)
_utf8=new Utf8StringBuilder();
_state=STATE_SENTINEL_DATA;
_buffer.mark();
}
continue;
case STATE_SENTINEL_DATA:
b=_buffer.get();
if ((b&0xff)==0xff)
{
String data=_utf8.toString();
_utf8.reset();
_state=STATE_START;
_handler.onUtf8Message(_frame,data);
_buffer.setMarkIndex(-1);
if (_buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return total_filled;
}
_utf8.append(b);
continue;
case STATE_LENGTH:
b=_buffer.get();
_length=_length<<7 | (0x7f&b);
if (b>=0)
{
_state=STATE_DATA;
_buffer.mark(0);
}
continue;
case STATE_DATA:
if (_buffer.markIndex()<0)
if (_buffer.length()<_length)
break charloop;
Buffer data=_buffer.sliceFromMark(_length);
_buffer.skip(_length);
_state=STATE_START;
_handler.onBinaryMessage(_frame,data);
if (_buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return total_filled;
}
}
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
public interface EventHandler
{
void onUtf8Message(byte frame,String data);
void onBinaryMessage(byte frame,Buffer buffer);
}
}

View File

@ -0,0 +1,149 @@
package org.eclipse.jetty.websocket;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.util.StringUtil;
import junit.framework.TestCase;
public class WebSocketParserTest extends TestCase
{
Buffers _buffers;
ByteArrayBuffer _in;
ByteArrayEndPoint _endp;
Handler _handler;
WebSocketParser _parser;
/* ------------------------------------------------------------ */
@Override
protected void setUp() throws Exception
{
_buffers=new SimpleBuffers(null,new ByteArrayBuffer(1024));
_endp = new ByteArrayEndPoint();
_handler = new Handler();
_parser=new WebSocketParser(_buffers,_endp,_handler);
_in = new ByteArrayBuffer(2048);
_endp.setIn(_in);
}
public void testOneUtf8() throws Exception
{
_in.put((byte)0x00);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
_in.put((byte)0xff);
int filled =_parser.parseNext();
assertEquals(13,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
public void testTwoUtf8() throws Exception
{
_in.put((byte)0x00);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
_in.put((byte)0xff);
_in.put((byte)0x00);
_in.put("Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8));
_in.put((byte)0xff);
int filled =_parser.parseNext();
assertEquals(30,filled);
assertEquals("Hello World",_handler._data.get(0));
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
filled =_parser.parseNext();
assertEquals(0,filled);
assertEquals("Hell\uFF4f W\uFF4Frld",_handler._data.get(1));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
public void testOneBinary() throws Exception
{
_in.put((byte)0x80);
_in.put((byte)11);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
int filled =_parser.parseNext();
assertEquals(13,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
public void testTwoBinary() throws Exception
{
_in.put((byte)0x80);
_in.put((byte)11);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
byte[] data = new byte[150];
for (int i=0;i<data.length;i++)
data[i]=(byte)('0'+(i%10));
_in.put((byte)0x80);
_in.put((byte)(0x80|(data.length>>7)));
_in.put((byte)(data.length&0x7f));
_in.put(data);
int filled =_parser.parseNext();
assertEquals(13+3+data.length,filled);
assertEquals("Hello World",_handler._data.get(0));
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
filled =_parser.parseNext();
assertEquals(0,filled);
String got=_handler._data.get(1);
assertEquals(data.length,got.length());
assertTrue(got.startsWith("012345678901234567890123"));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
// TODO test:
// blocking,
// async
// full
// EOF
// errors
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class Handler implements WebSocketParser.EventHandler
{
public List<String> _data = new ArrayList<String>();
public void onBinaryMessage(byte frame, Buffer buffer)
{
_data.add(buffer.toString(StringUtil.__UTF8));
}
public void onUtf8Message(byte frame, String data)
{
_data.add(data);
}
}
}

View File

@ -124,6 +124,7 @@
<module>jetty-util</module>
<module>jetty-io</module>
<module>jetty-http</module>
<module>jetty-websocket</module>
<module>jetty-continuation</module>
<module>jetty-server</module>
<module>jetty-client</module>