Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Jesse McConnell 2012-06-21 13:11:27 -05:00
commit d8c12b5813
27 changed files with 2333 additions and 2438 deletions

View File

@ -7,15 +7,44 @@ import org.eclipse.jetty.websocket.masks.Masker;
*/
public class WebSocketPolicy
{
public static WebSocketPolicy newClientPolicy()
{
return new WebSocketPolicy(WebSocketBehavior.CLIENT);
}
public static WebSocketPolicy newServerPolicy()
{
return new WebSocketPolicy(WebSocketBehavior.SERVER);
}
/**
* The maximum size of a text message during parsing/generating
* The maximum size of a text message during parsing/generating.
* <p>
* Default: 16384 (16 K)
*/
private int maxTextMessageSize = -1;
private int maxTextMessageSize = 16384;
/**
* The maximum size of a binary message during parsing/generating
* The maximum size of a binary message during parsing/generating.
* <p>
* Default: -1 (no validation)
*/
private int maxBinaryMessageSize = -1;
/**
* Buffer size, which is also the max frame byte size.
* <p>
* Default: 65535 (64 K)
*/
private int bufferSize = 65535;
/**
* The time in ms (milliseconds) that a websocket may be idle before closing.
* <p>
* Default: 300000 (ms)
*/
private int maxIdleTime = 300000;
/**
* The implementation for masking
*/
@ -60,6 +89,11 @@ public class WebSocketPolicy
return behavior;
}
public int getBufferSize()
{
return bufferSize;
}
public Masker getMasker()
{
return masker;
@ -70,11 +104,21 @@ public class WebSocketPolicy
return maxBinaryMessageSize;
}
public int getMaxIdleTime()
{
return maxIdleTime;
}
public int getMaxTextMessageSize()
{
return maxTextMessageSize;
}
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
}
public void setMasker(Masker masker)
{
this.masker = masker;
@ -85,6 +129,11 @@ public class WebSocketPolicy
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
public void setMaxIdleTime(int maxIdleTime)
{
this.maxIdleTime = maxIdleTime;
}
public void setMaxTextMessageSize(int maxTextMessageSize)
{
this.maxTextMessageSize = maxTextMessageSize;

View File

@ -8,7 +8,7 @@ import org.eclipse.jetty.websocket.api.WebSocket;
*/
public class CloseFrame extends ControlFrame
{
private final short statusCode;
private short statusCode;
private String reason;
public CloseFrame()
@ -43,6 +43,11 @@ public class CloseFrame extends ControlFrame
this.reason = reason;
}
public void setStatusCode(short statusCode)
{
this.statusCode = statusCode;
}
@Override
public String toString()
{

View File

@ -17,6 +17,18 @@ public class TextFrame extends DataFrame
super(OpCode.TEXT);
}
/**
* Construct text frame with message data
*
* @param message
* the message data
*/
public TextFrame(String message)
{
this();
setData(message);
}
/**
* Get the data
*

View File

@ -38,6 +38,8 @@ public class Generator {
public Generator(ByteBufferPool bufferPool, WebSocketPolicy policy)
{
generators.put(OpCode.BINARY,new BinaryFrameGenerator(bufferPool,policy));
generators.put(OpCode.TEXT,new TextFrameGenerator(bufferPool,policy));
generators.put(OpCode.PING,new PingFrameGenerator(bufferPool,policy));
generators.put(OpCode.PONG,new PongFrameGenerator(bufferPool,policy));
generators.put(OpCode.CLOSE,new CloseFrameGenerator(bufferPool,policy));

View File

@ -2,6 +2,8 @@ package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.CloseFrame;
@ -11,6 +13,8 @@ import org.eclipse.jetty.websocket.frames.CloseFrame;
public class ClosePayloadParser extends FrameParser<CloseFrame>
{
private CloseFrame frame;
private ByteBuffer payload;
private int payloadLength;
public ClosePayloadParser(WebSocketPolicy policy)
{
@ -34,7 +38,35 @@ public class ClosePayloadParser extends FrameParser<CloseFrame>
@Override
public boolean parsePayload(ByteBuffer buffer)
{
// TODO Auto-generated method stub
payloadLength = getFrame().getPayloadLength();
if (payloadLength == 0)
{
// no status code. no reason.
return true;
}
while (buffer.hasRemaining())
{
if (payload == null)
{
getPolicy().assertValidBinaryMessageSize(payloadLength);
payload = ByteBuffer.allocate(payloadLength);
}
copyBuffer(buffer,payload,payload.remaining());
if (payload.position() >= payloadLength)
{
payload.flip();
frame.setStatusCode(payload.getShort());
if (payload.remaining() > 0)
{
String reason = BufferUtil.toString(payload,StringUtil.__UTF8_CHARSET);
frame.setReason(reason);
}
return true;
}
}
return false;
}

View File

@ -70,7 +70,7 @@ public abstract class FrameParser<T extends BaseFrame>
int amt = Math.min(length,src.remaining());
if (getFrame().isMasked())
{
// Demask the content
// Demask the content 1 byte at a time
byte mask[] = getFrame().getMask();
for (int i = 0; i < amt; i++)
{
@ -80,6 +80,7 @@ public abstract class FrameParser<T extends BaseFrame>
else
{
// Copy the content as-is
// TODO: Look into having a BufferUtil.put(from,to,len) method
byte b[] = new byte[amt];
src.get(b,0,amt);
dest.put(b,0,amt);

View File

@ -109,7 +109,13 @@ public class Parser
boolean rsv1 = ((b & 0x40) != 0);
boolean rsv2 = ((b & 0x20) != 0);
boolean rsv3 = ((b & 0x10) != 0);
OpCode opcode = OpCode.from((byte)(b & 0x0F));
byte opc = (byte)(b & 0x0F);
OpCode opcode = OpCode.from(opc);
if (opcode == null)
{
throw new WebSocketException("Unknown opcode: " + opc);
}
if (opcode.isControlFrame() && !fin)
{

View File

@ -0,0 +1,87 @@
package org.eclipse.jetty.websocket;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.TextFrame;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.generator.TextFrameGenerator;
import org.eclipse.jetty.websocket.masks.RandomMasker;
import org.eclipse.jetty.websocket.parser.FrameParseCapture;
import org.eclipse.jetty.websocket.parser.Parser;
import org.eclipse.jetty.websocket.parser.TextPayloadParser;
import org.junit.Assert;
import org.junit.Test;
public class GeneratorParserRoundtripTest
{
@Test
public void testParserAndGenerator() throws Exception
{
Debug.enableDebugLogging(Generator.class);
Debug.enableDebugLogging(TextFrameGenerator.class);
Debug.enableDebugLogging(Parser.class);
Debug.enableDebugLogging(TextPayloadParser.class);
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
StandardByteBufferPool bufferPool = new StandardByteBufferPool();
Generator gen = new Generator(bufferPool,policy);
Parser parser = new Parser(policy);
// Generate Buffer
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = gen.generate(new TextFrame(message));
Debug.dumpState(out);
// Parse Buffer
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
out.flip();
parser.parse(out);
// Validate
capture.assertNoErrors();
capture.assertHasFrame(TextFrame.class,1);
TextFrame txt = (TextFrame)capture.getFrames().get(0);
Assert.assertThat("Text parsed",txt.getData().toString(),is(message));
}
@Test
public void testParserAndGeneratorMasked() throws Exception
{
Debug.enableDebugLogging(Generator.class);
Debug.enableDebugLogging(TextFrameGenerator.class);
Debug.enableDebugLogging(Parser.class);
Debug.enableDebugLogging(TextPayloadParser.class);
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
policy.setMasker(new RandomMasker());
StandardByteBufferPool bufferPool = new StandardByteBufferPool();
Generator gen = new Generator(bufferPool,policy);
Parser parser = new Parser(policy);
// Generate Buffer
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
ByteBuffer out = gen.generate(new TextFrame(message));
Debug.dumpState(out);
// Parse Buffer
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
out.flip();
parser.parse(out);
// Validate
capture.assertNoErrors();
capture.assertHasFrame(TextFrame.class,1);
TextFrame txt = (TextFrame)capture.getFrames().get(0);
Assert.assertTrue("Text.isMasked",txt.isMasked());
Assert.assertThat("Text parsed",txt.getData().toString(),is(message));
}
}

View File

@ -1,392 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket;
public class WebSocketParserRFC6455Test
{
/*
private ByteArrayEndPoint _endPoint;
private MaskedByteArrayBuffer _in;
private Handler _handler;
private WebSocketParserRFC6455 _parser;
private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF};
private int _m;
class MaskedByteArrayBuffer extends ByteArrayBuffer
{
MaskedByteArrayBuffer()
{
super(4096);
}
public void sendMask()
{
super.poke(putIndex(),_mask,0,4);
super.setPutIndex(putIndex()+4);
_m=0;
}
@Override
public int put(ByteBuffer src)
{
return put(src.asArray(),0,src.length());
}
public void putUnmasked(byte b)
{
super.put(b);
}
@Override
public void put(byte b)
{
super.put((byte)(b^_mask[_m++%4]));
}
@Override
public int put(byte[] b, int offset, int length)
{
byte[] mb = new byte[b.length];
final int end=offset+length;
for (int i=offset;i<end;i++)
{
mb[i]=(byte)(b[i]^_mask[_m++%4]);
}
return super.put(mb,offset,length);
}
@Override
public int put(byte[] b)
{
return put(b,0,b.length);
}
};
@Before
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
_endPoint = new ByteArrayEndPoint();
_endPoint.setNonBlocking(true);
_handler = new Handler();
_parser=new WebSocketParserRFC6455(buffers, _endPoint,_handler,true);
_parser.setFakeFragments(false);
_in = new MaskedByteArrayBuffer();
_endPoint.setIn(_in);
}
@Test
public void testCache() throws Exception
{
assertEquals(HttpHeaderValue.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValue.CACHE.lookup("Upgrade")).getOrdinal());
}
@Test
public void testFlagsOppcode() throws Exception
{
_in.putUnmasked((byte)0xff);
_in.putUnmasked((byte)0x80);
_in.sendMask();
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(0xf,_handler._flags);
assertEquals(0xf,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
_parser.returnBuffer();
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortText() throws Exception
{
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|11));
_in.sendMask();
_in.put("Hello World".getBytes(StringUtil.__UTF8));
// System.err.println("tosend="+TypeUtil.toHexString(_in.asArray()));
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals("Hello World",_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
_parser.returnBuffer();
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortUtf8() throws Exception
{
String string = "Hell\uFF4f W\uFF4Frld";
byte[] bytes = string.getBytes("UTF-8");
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|bytes.length));
_in.sendMask();
_in.put(bytes);
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
_parser.returnBuffer();
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testMediumText() throws Exception
{
String string = "Hell\uFF4f Medium W\uFF4Frld ";
for (int i=0;i<4;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes(StringUtil.__UTF8);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(bytes.length>>8));
_in.putUnmasked((byte)(bytes.length&0xff));
_in.sendMask();
_in.put(bytes);
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
_parser.returnBuffer();
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testLongText() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(0x20000);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
WebSocketParserRFC6455 parser=new WebSocketParserRFC6455(buffers, endPoint,_handler,false);
ByteArrayBuffer in = new ByteArrayBuffer(0x20000);
endPoint.setIn(in);
String string = "Hell\uFF4f Big W\uFF4Frld ";
for (int i=0;i<12;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
_in.sendMask();
in.put((byte)0x84);
in.put((byte)0x7F);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)(bytes.length>>16));
in.put((byte)((bytes.length>>8)&0xff));
in.put((byte)(bytes.length&0xff));
in.put(bytes);
int progress =parser.parseNext();
parser.returnBuffer();
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertTrue(parser.isBufferEmpty());
assertTrue(parser.getBuffer()==null);
}
@Test
public void testShortFragmentTest() throws Exception
{
_in.putUnmasked((byte)0x01);
_in.putUnmasked((byte)0x86);
_in.sendMask();
_in.put("Hello ".getBytes(StringUtil.__UTF8));
_in.putUnmasked((byte)0x80);
_in.putUnmasked((byte)0x85);
_in.sendMask();
_in.put("World".getBytes(StringUtil.__UTF8));
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(0,_handler._data.size());
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
progress =_parser.parseNext();
_parser.returnBuffer();
assertTrue(progress>0);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testFrameTooLarge() throws Exception
{
// Buffers are only 1024, so this frame is too large
_parser.setFakeFragments(false);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(2048>>8));
_in.putUnmasked((byte)(2048&0xff));
_in.sendMask();
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,_handler._code);
for (int i=0;i<2048;i++)
_in.put((byte)'a');
progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(0,_handler._data.size());
assertEquals(0,_handler._utf8.length());
_handler._code=0;
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)0xFE);
_in.putUnmasked((byte)(1024>>8));
_in.putUnmasked((byte)(1024&0xff));
_in.sendMask();
for (int i=0;i<1024;i++)
_in.put((byte)'a');
progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(0,_handler._data.size());
}
@Test
public void testFakeFragement() throws Exception
{
// Buffers are only 1024, so this frame will be fake fragmented
_parser.setFakeFragments(true);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(2048>>8));
_in.putUnmasked((byte)(2048&0xff));
_in.sendMask();
for (int i=0;i<2048;i++)
_in.put((byte)('a'+i%26));
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(2,_handler._frames);
assertEquals(WebSocketConnectionRFC6455.OP_CONTINUATION,_handler._opcode);
assertEquals(1,_handler._data.size());
String mesg=_handler._data.remove(0);
assertEquals(2048,mesg.length());
for (int i=0;i<2048;i++)
assertEquals(('a'+i%26),mesg.charAt(i));
}
@Test
public void testClose() throws Exception
{
String string = "Game Over";
byte[] bytes = string.getBytes("UTF-8");
_in.putUnmasked((byte)(0x80|0x08));
_in.putUnmasked((byte)(0x80|(2+bytes.length)));
_in.sendMask();
_in.put((byte)(1000/0x100));
_in.put((byte)(1000%0x100));
_in.put(bytes);
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0).substring(2));
assertEquals(0x8,_handler._flags);
assertEquals(0x8,_handler._opcode);
_parser.returnBuffer();
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
_in.clear();
_in.put(bytes);
_endPoint.setIn(_in);
progress =_parser.parseNext();
assertTrue(progress>0);
_endPoint.shutdownInput();
progress =_parser.parseNext();
assertEquals(-1,progress);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
private byte _flags;
private byte _opcode;
int _code;
int _frames;
public void onFrame(byte flags, byte opcode, ByteBuffer buffer)
{
_frames++;
_flags=flags;
_opcode=opcode;
if ((flags&0x8)==0)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(buffer.toString("utf-8"));
else
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_data.add(_utf8.toString());
_utf8.reset();
}
}
public void close(int code,String message)
{
_code=code;
}
}
*/
}

View File

@ -0,0 +1,50 @@
package org.eclipse.jetty.websocket.parser;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.Debug;
import org.eclipse.jetty.websocket.api.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.CloseFrame;
import org.junit.Assert;
import org.junit.Test;
public class ClosePayloadParserTest
{
@Test
public void testGameOver()
{
Debug.enableDebugLogging(Parser.class);
Debug.enableDebugLogging(ClosePayloadParser.class);
String expectedReason = "Game Over";
byte utf[] = expectedReason.getBytes(StringUtil.__UTF8_CHARSET);
ByteBuffer payload = ByteBuffer.allocate(utf.length + 2);
payload.putShort(WebSocket.CLOSE_NORMAL);
payload.put(utf,0,utf.length);
payload.flip();
ByteBuffer buf = ByteBuffer.allocate(24);
buf.put((byte)(0x80 | 0x08)); // fin + close
buf.put((byte)(0x80 | payload.remaining()));
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,payload);
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
FrameParseCapture capture = new FrameParseCapture();
parser.addListener(capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(CloseFrame.class,1);
CloseFrame txt = (CloseFrame)capture.getFrames().get(0);
Assert.assertThat("CloseFrame.statusCode",txt.getStatusCode(),is(WebSocket.CLOSE_NORMAL));
Assert.assertThat("CloseFrame.data",txt.getReason(),is(expectedReason));
}
}

View File

@ -0,0 +1,32 @@
package org.eclipse.jetty.websocket.parser;
import java.nio.ByteBuffer;
public class MaskedByteBuffer
{
private static byte[] mask = new byte[]
{ 0x00, (byte)0xF0, 0x0F, (byte)0xFF };
public static void putMask(ByteBuffer buffer)
{
buffer.put(mask,0,mask.length);
}
public static void putPayload(ByteBuffer buffer, byte[] payload)
{
int len = payload.length;
for (int i = 0; i < len; i++)
{
buffer.put((byte)(payload[i] ^ mask[i % 4]));
}
}
public static void putPayload(ByteBuffer buffer, ByteBuffer payload)
{
int len = payload.remaining();
for (int i = 0; i < len; i++)
{
buffer.put((byte)(payload.get() ^ mask[i % 4]));
}
}
}

View File

@ -16,11 +16,8 @@ import org.junit.Test;
public class TextPayloadParserTest
{
private final byte[] mask = new byte[]
{ 0x00, (byte)0xF0, 0x0F, (byte)0xFF };
@Test
public void testFrameTooLargeDueToPolicyText() throws Exception
public void testFrameTooLargeDueToPolicy() throws Exception
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setMaxTextMessageSize(1024); // set policy
@ -33,8 +30,8 @@ public class TextPayloadParserTest
buf.put((byte)0x81);
buf.put((byte)(0x80 | 0x7E)); // 0x7E == 126 (a 2 byte payload length)
buf.putShort((short)utf.length);
writeMask(buf);
writeMaskedPayload(buf,utf);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
Parser parser = new Parser(policy);
@ -68,8 +65,8 @@ public class TextPayloadParserTest
buf.put((byte)0x81);
buf.put((byte)(0x80 | 0x7F)); // 0x7F == 127 (a 4 byte payload length)
buf.putInt(utf.length);
writeMask(buf);
writeMaskedPayload(buf,utf);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
@ -103,8 +100,8 @@ public class TextPayloadParserTest
buf.put((byte)0x81);
buf.put((byte)(0x80 | 0x7E)); // 0x7E == 126 (a 2 byte payload length)
buf.putShort((short)utf.length);
writeMask(buf);
writeMaskedPayload(buf,utf);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
@ -133,14 +130,14 @@ public class TextPayloadParserTest
// part 1
buf.put((byte)0x01); // no fin + text
buf.put((byte)(0x80 | b1.length));
writeMask(buf);
writeMaskedPayload(buf,b1);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,b1);
// part 2
buf.put((byte)0x80); // fin + continuation
buf.put((byte)(0x80 | b2.length));
writeMask(buf);
writeMaskedPayload(buf,b2);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,b2);
buf.flip();
@ -162,12 +159,13 @@ public class TextPayloadParserTest
public void testShortMaskedText() throws Exception
{
String expectedText = "Hello World";
byte utf[] = expectedText.getBytes(StringUtil.__UTF8_CHARSET);
ByteBuffer buf = ByteBuffer.allocate(24);
buf.put((byte)0x81);
buf.put((byte)(0x80 | expectedText.length()));
writeMask(buf);
writeMaskedPayload(buf,expectedText.getBytes(StringUtil.__UTF8));
buf.put((byte)(0x80 | utf.length));
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
@ -192,8 +190,8 @@ public class TextPayloadParserTest
ByteBuffer buf = ByteBuffer.allocate(24);
buf.put((byte)0x81);
buf.put((byte)(0x80 | utf.length));
writeMask(buf);
writeMaskedPayload(buf,utf);
MaskedByteBuffer.putMask(buf);
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
@ -207,18 +205,4 @@ public class TextPayloadParserTest
TextFrame txt = (TextFrame)capture.getFrames().get(0);
Assert.assertThat("TextFrame.data",txt.getData().toString(),is(expectedText));
}
private void writeMask(ByteBuffer buf)
{
buf.put(mask,0,mask.length);
}
private void writeMaskedPayload(ByteBuffer buf, byte[] bytes)
{
int len = bytes.length;
for (int i = 0; i < len; i++)
{
buf.put((byte)(bytes[i] ^ mask[i % 4]));
}
}
}

View File

@ -0,0 +1,29 @@
package org.eclipse.jetty.websocket.server;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
public class AsyncWebSocketConnection extends AbstractAsyncConnection
{
// TODO: track extensions? (only those that need to operate at this level?)
// TODO: track generic WebSocket.Connection (for API access)?
public AsyncWebSocketConnection(AsyncEndPoint endp, Executor executor)
{
super(endp,executor);
}
public AsyncWebSocketConnection(AsyncEndPoint endp, Executor executor, boolean executeOnlyFailure)
{
super(endp,executor,executeOnlyFailure);
}
@Override
public void onFillable()
{
// TODO Auto-generated method stub
}
}

View File

@ -36,30 +36,37 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.websocket.server.WebSocketFactory.Acceptor;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
public abstract class WebSocketHandler extends HandlerWrapper implements WebSocketFactory.Acceptor
public abstract class WebSocketHandler extends HandlerWrapper implements WebSocketServerFactory.Acceptor
{
private final WebSocketFactory _webSocketFactory=new WebSocketFactory(this,32*1024);
private final WebSocketServerFactory webSocketFactory;
public WebSocketFactory getWebSocketFactory()
public WebSocketHandler()
{
return _webSocketFactory;
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
webSocketFactory = new WebSocketServerFactory(this,policy);
}
/* ------------------------------------------------------------ */
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (_webSocketFactory.acceptWebSocket(request,response) || response.isCommitted())
return;
super.handle(target,baseRequest,request,response);
}
/* ------------------------------------------------------------ */
public boolean checkOrigin(HttpServletRequest request, String origin)
{
return true;
}
public WebSocketServerFactory getWebSocketFactory()
{
return webSocketFactory;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (webSocketFactory.acceptWebSocket(request,response) || response.isCommitted())
{
return;
}
super.handle(target,baseRequest,request,response);
}
}

View File

@ -21,17 +21,19 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.extensions.deflate.DeflateFrameExtension;
import org.eclipse.jetty.websocket.extensions.fragment.FragmentExtension;
@ -40,13 +42,19 @@ import org.eclipse.jetty.websocket.extensions.identity.IdentityExtension;
/**
* Factory to create WebSocket connections
*/
public class WebSocketFactory extends AbstractLifeCycle
public class WebSocketServerFactory extends AbstractLifeCycle
{
private static final Logger LOG = Log.getLogger(WebSocketFactory.class);
private final Queue<WebSocketServletConnection> connections = new ConcurrentLinkedQueue<WebSocketServletConnection>();
public interface Acceptor
{
/* ------------------------------------------------------------ */
/**
* <p>Checks the origin of an incoming WebSocket handshake request.</p>
* @param request the incoming HTTP upgrade request
* @param origin the origin URI
* @return boolean to indicate that the origin is acceptable.
*/
boolean checkOrigin(HttpServletRequest request, String origin);
/* ------------------------------------------------------------ */
/**
* <p>Factory method that applications needs to implement to return a
@ -56,16 +64,10 @@ public class WebSocketFactory extends AbstractLifeCycle
* @return a new {@link WebSocket} object that will handle websocket events.
*/
WebSocket doWebSocketConnect(HttpServletRequest request, String protocol);
/* ------------------------------------------------------------ */
/**
* <p>Checks the origin of an incoming WebSocket handshake request.</p>
* @param request the incoming HTTP upgrade request
* @param origin the origin URI
* @return boolean to indicate that the origin is acceptable.
*/
boolean checkOrigin(HttpServletRequest request, String origin);
}
private static final Logger LOG = Log.getLogger(WebSocketServerFactory.class);
private final Queue<WebSocketServletConnection> connections = new ConcurrentLinkedQueue<WebSocketServletConnection>();
private final Map<String,Class<? extends Extension>> _extensionClasses = new HashMap<String, Class<? extends Extension>>();
{
@ -75,204 +77,12 @@ public class WebSocketFactory extends AbstractLifeCycle
}
private final Acceptor _acceptor;
private WebSocketBuffers _buffers;
private int _maxIdleTime = 300000;
private int _maxTextMessageSize = 16 * 1024;
private int _maxBinaryMessageSize = -1;
private WebSocketPolicy policy;
public WebSocketFactory(Acceptor acceptor)
public WebSocketServerFactory(Acceptor acceptor, WebSocketPolicy policy)
{
this(acceptor, 64 * 1024);
}
public WebSocketFactory(Acceptor acceptor, int bufferSize)
{
_buffers = new WebSocketBuffers(bufferSize);
_acceptor = acceptor;
}
/**
* @return A modifiable map of extension name to extension class
*/
public Map<String,Class<? extends Extension>> getExtensionClassesMap()
{
return _extensionClasses;
}
/**
* Get the maxIdleTime.
*
* @return the maxIdleTime
*/
public long getMaxIdleTime()
{
return _maxIdleTime;
}
/**
* Set the maxIdleTime.
*
* @param maxIdleTime the maxIdleTime to set
*/
public void setMaxIdleTime(int maxIdleTime)
{
_maxIdleTime = maxIdleTime;
}
/**
* Get the bufferSize.
*
* @return the bufferSize
*/
public int getBufferSize()
{
return _buffers.getBufferSize();
}
/**
* Set the bufferSize.
*
* @param bufferSize the bufferSize to set
*/
public void setBufferSize(int bufferSize)
{
if (bufferSize != getBufferSize())
_buffers = new WebSocketBuffers(bufferSize);
}
/**
* @return The initial maximum text message size (in characters) for a connection
*/
public int getMaxTextMessageSize()
{
return _maxTextMessageSize;
}
/**
* Set the initial maximum text message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
* @param maxTextMessageSize The default maximum text message size (in characters) for a connection
*/
public void setMaxTextMessageSize(int maxTextMessageSize)
{
_maxTextMessageSize = maxTextMessageSize;
}
/**
* @return The initial maximum binary message size (in bytes) for a connection
*/
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessageSize;
}
/**
* Set the initial maximum binary message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
* @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
*/
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
{
_maxBinaryMessageSize = maxBinaryMessageSize;
}
@Override
protected void doStop() throws Exception
{
closeConnections();
}
/**
* Upgrade the request/response to a WebSocket Connection.
* <p>This method will not normally return, but will instead throw a
* UpgradeConnectionException, to exit HTTP handling and initiate
* WebSocket handling of the connection.
*
* @param request The request to upgrade
* @param response The response to upgrade
* @param websocket The websocket handler implementation to use
* @param protocol The websocket protocol
* @throws IOException in case of I/O errors
*/
public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String protocol)
throws IOException
{
if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
throw new IllegalStateException("!Upgrade:websocket");
if (!"HTTP/1.1".equals(request.getProtocol()))
throw new IllegalStateException("!HTTP/1.1");
int draft = request.getIntHeader("Sec-WebSocket-Version");
if (draft < 0) {
// Old pre-RFC version specifications (header not present in RFC-6455)
draft = request.getIntHeader("Sec-WebSocket-Draft");
}
AbstractHttpConnection http = AbstractHttpConnection.getCurrentConnection();
if (http instanceof BlockingHttpConnection)
throw new IllegalStateException("Websockets not supported on blocking connectors");
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
List<String> extensions_requested = new ArrayList<String>();
@SuppressWarnings("unchecked")
Enumeration<String> e = request.getHeaders("Sec-WebSocket-Extensions");
while (e.hasMoreElements())
{
QuotedStringTokenizer tok = new QuotedStringTokenizer(e.nextElement(),",");
while (tok.hasMoreTokens())
{
extensions_requested.add(tok.nextToken());
}
}
final WebSocketServletConnection connection;
switch (draft)
{
case WebSocketConnectionRFC6455.VERSION: // RFC 6455 Version
{
List<Extension> extensions = initExtensions(extensions_requested, 8 - WebSocketConnectionRFC6455.OP_EXT_DATA, 16 - WebSocketConnectionRFC6455.OP_EXT_CTRL, 3);
connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
break;
}
default:
{
LOG.warn("Unsupported Websocket version: " + draft);
// Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
// Using the examples as outlined
response.setHeader("Sec-WebSocket-Version", "13, 8, 6, 0");
throw new HttpException(400, "Unsupported websocket version specification: " + draft);
}
}
addConnection(connection);
// Set the defaults
connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
// Let the connection finish processing the handshake
connection.handshake(request, response, protocol);
response.flushBuffer();
// Give the connection any unused data from the HTTP connection.
connection.fillBuffersFrom(((HttpParser)http.getParser()).getHeaderBuffer());
connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer());
// Tell jetty about the new connection
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection);
request.setAttribute("org.eclipse.jetty.io.Connection", connection);
}
protected String[] parseProtocols(String protocol)
{
if (protocol == null)
return new String[]{null};
protocol = protocol.trim();
if (protocol == null || protocol.length() == 0)
return new String[]{null};
String[] passed = protocol.split("\\s*,\\s*");
String[] protocols = new String[passed.length + 1];
System.arraycopy(passed, 0, protocols, 0, passed.length);
return protocols;
this._acceptor = acceptor;
this.policy = policy;
}
public boolean acceptWebSocket(HttpServletRequest request, HttpServletResponse response)
@ -282,7 +92,9 @@ public class WebSocketFactory extends AbstractLifeCycle
{
String origin = request.getHeader("Origin");
if (origin==null)
{
origin = request.getHeader("Sec-WebSocket-Origin");
}
if (!_acceptor.checkOrigin(request,origin))
{
response.sendError(HttpServletResponse.SC_FORBIDDEN);
@ -295,7 +107,7 @@ public class WebSocketFactory extends AbstractLifeCycle
@SuppressWarnings("unchecked")
Enumeration<String> protocols = request.getHeaders("Sec-WebSocket-Protocol");
String protocol=null;
while (protocol==null && protocols!=null && protocols.hasMoreElements())
while ((protocol==null) && (protocols!=null) && protocols.hasMoreElements())
{
String candidate = protocols.nextElement();
for (String p : parseProtocols(candidate))
@ -330,6 +142,43 @@ public class WebSocketFactory extends AbstractLifeCycle
return false;
}
protected boolean addConnection(WebSocketServletConnection connection)
{
return isRunning() && connections.add(connection);
}
protected void closeConnections()
{
for (WebSocketServletConnection connection : connections)
{
// TODO connection.shutdown();
}
}
@Override
protected void doStop() throws Exception
{
closeConnections();
}
/**
* @return A modifiable map of extension name to extension class
*/
public Map<String,Class<? extends Extension>> getExtensionClassesMap()
{
return _extensionClasses;
}
/**
* Get the policy in use for WebSockets.
*
* @return
*/
public WebSocketPolicy getPolicy()
{
return policy;
}
public List<Extension> initExtensions(List<String> requested,int maxDataOpcodes,int maxControlOpcodes,int maxReservedBits)
{
List<Extension> extensions = new ArrayList<Extension>();
@ -349,7 +198,9 @@ public class WebSocketFactory extends AbstractLifeCycle
Extension extension = newExtension(extName);
if (extension==null)
{
continue;
}
if (extension.init(parameters))
{
@ -367,8 +218,10 @@ public class WebSocketFactory extends AbstractLifeCycle
{
Class<? extends Extension> extClass = _extensionClasses.get(name);
if (extClass!=null)
{
return extClass.newInstance();
}
}
catch (Exception e)
{
LOG.warn(e);
@ -377,9 +230,21 @@ public class WebSocketFactory extends AbstractLifeCycle
return null;
}
protected boolean addConnection(WebSocketServletConnection connection)
protected String[] parseProtocols(String protocol)
{
return isRunning() && connections.add(connection);
if (protocol == null)
{
return new String[]{null};
}
protocol = protocol.trim();
if ((protocol == null) || (protocol.length() == 0))
{
return new String[]{null};
}
String[] passed = protocol.split("\\s*,\\s*");
String[] protocols = new String[passed.length + 1];
System.arraycopy(passed, 0, protocols, 0, passed.length);
return protocols;
}
protected boolean removeConnection(WebSocketServletConnection connection)
@ -387,9 +252,89 @@ public class WebSocketFactory extends AbstractLifeCycle
return connections.remove(connection);
}
protected void closeConnections()
/**
* Upgrade the request/response to a WebSocket Connection.
* <p>This method will not normally return, but will instead throw a
* UpgradeConnectionException, to exit HTTP handling and initiate
* WebSocket handling of the connection.
*
* @param request The request to upgrade
* @param response The response to upgrade
* @param websocket The websocket handler implementation to use
* @param protocol The websocket protocol
* @throws IOException in case of I/O errors
*/
public void upgrade(HttpServletRequest request, HttpServletResponse response, WebSocket websocket, String protocol)
throws IOException
{
for (WebSocketServletConnection connection : connections)
connection.shutdown();
if (!"websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
{
throw new IllegalStateException("!Upgrade:websocket");
}
if (!"HTTP/1.1".equals(request.getProtocol()))
{
throw new IllegalStateException("!HTTP/1.1");
}
int draft = request.getIntHeader("Sec-WebSocket-Version");
if (draft < 0) {
// Old pre-RFC version specifications (header not present in RFC-6455)
draft = request.getIntHeader("Sec-WebSocket-Draft");
}
HttpConnection http = HttpConnection.getCurrentConnection();
AsyncEndPoint endp = http.getEndPoint();
List<String> extensions_requested = new ArrayList<String>();
@SuppressWarnings("unchecked")
Enumeration<String> e = request.getHeaders("Sec-WebSocket-Extensions");
while (e.hasMoreElements())
{
QuotedStringTokenizer tok = new QuotedStringTokenizer(e.nextElement(),",");
while (tok.hasMoreTokens())
{
extensions_requested.add(tok.nextToken());
}
}
final WebSocketServletConnection connection;
switch (draft)
{
case org.eclipse.jetty.websocket.api.WebSocket.VERSION: // RFC 6455 Version
{
// List<Extension> extensions = initExtensions(extensions_requested,
// 8 - WebSocketConnectionRFC6455.OP_EXT_DATA,
// 16 - WebSocketConnectionRFC6455.OP_EXT_CTRL,
// 3);
// connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
break;
}
default:
{
LOG.warn("Unsupported Websocket version: " + draft);
// Per RFC 6455 - 4.4 - Supporting Multiple Versions of WebSocket Protocol
// Using the examples as outlined
response.setHeader("Sec-WebSocket-Version", "" + org.eclipse.jetty.websocket.api.WebSocket.VERSION /*+ ", 0"*/);
response.setStatus(HttpStatus.BAD_REQUEST_400);
return;
}
}
// addConnection(connection);
// Set the defaults
// connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
// connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
// Let the connection finish processing the handshake
// connection.handshake(request, response, protocol);
response.flushBuffer();
// Give the connection any unused data from the HTTP connection.
// connection.fillBuffersFrom(((HttpParser)http.getParser()).getHeaderBuffer());
// connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer());
// Tell jetty about the new connection
// LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection);
// request.setAttribute("org.eclipse.jetty.io.Connection", connection);
}
}

View File

@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* Copyright (c) 2011 Mort Bay Consulting Pty. Ltd.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
@ -13,22 +13,11 @@
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -36,9 +25,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.server.WebSocketFactory.Acceptor;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
/* ------------------------------------------------------------ */
/**
* Servlet to upgrade connections to WebSocket
* <p/>
@ -58,12 +47,30 @@ import org.eclipse.jetty.websocket.server.WebSocketFactory.Acceptor;
* that a websocket may be accept before closing.
*/
@SuppressWarnings("serial")
public abstract class WebSocketServlet extends HttpServlet implements WebSocketFactory.Acceptor
public abstract class WebSocketServlet extends HttpServlet implements WebSocketServerFactory.Acceptor
{
private final Logger LOG = Log.getLogger(getClass());
private WebSocketFactory _webSocketFactory;
private WebSocketServerFactory webSocketFactory;
@Override
public boolean checkOrigin(HttpServletRequest request, String origin)
{
return true;
}
@Override
public void destroy()
{
try
{
webSocketFactory.stop();
}
catch (Exception x)
{
LOG.ignore(x);
}
}
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.GenericServlet#init()
*/
@ -73,24 +80,27 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF
try
{
String bs = getInitParameter("bufferSize");
_webSocketFactory = new WebSocketFactory(this, bs == null ? 8192 : Integer.parseInt(bs));
_webSocketFactory.start();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
if(bs != null) {
policy.setBufferSize(Integer.parseInt(bs));
}
String max = getInitParameter("maxIdleTime");
if (max != null)
_webSocketFactory.setMaxIdleTime(Integer.parseInt(max));
if (max != null) {
policy.setMaxIdleTime(Integer.parseInt(max));
}
max = getInitParameter("maxTextMessageSize");
if (max != null)
_webSocketFactory.setMaxTextMessageSize(Integer.parseInt(max));
if (max != null) {
policy.setMaxTextMessageSize(Integer.parseInt(max));
}
max = getInitParameter("maxBinaryMessageSize");
if (max != null)
_webSocketFactory.setMaxBinaryMessageSize(Integer.parseInt(max));
if (max != null) {
policy.setMaxBinaryMessageSize(Integer.parseInt(max));
}
catch (ServletException x)
{
throw x;
webSocketFactory = new WebSocketServerFactory(this,policy);
}
catch (Exception x)
{
@ -98,35 +108,16 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF
}
}
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
*/
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (_webSocketFactory.acceptWebSocket(request, response) || response.isCommitted())
if (webSocketFactory.acceptWebSocket(request,response) || response.isCommitted())
{
return;
}
super.service(request, response);
}
/* ------------------------------------------------------------ */
public boolean checkOrigin(HttpServletRequest request, String origin)
{
return true;
}
/* ------------------------------------------------------------ */
@Override
public void destroy()
{
try
{
_webSocketFactory.stop();
}
catch (Exception x)
{
LOG.ignore(x);
}
}
}

View File

@ -16,12 +16,12 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.WebSocketConnection;
public interface WebSocketServletConnection extends WebSocketConnection
public interface WebSocketServletConnection /* extends WebSocketConnection */
{
void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException;
}

View File

@ -16,25 +16,22 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
import org.eclipse.jetty.websocket.extensions.Extension;
public class WebSocketServletConnectionRFC6455 extends WebSocketConnectionRFC6455 implements WebSocketServletConnection
public class WebSocketServletConnectionRFC6455 /* extends WebSocketConnectionRFC6455 implements WebSocketServletConnection */
{
private final WebSocketFactory factory;
private /* final */ WebSocketServerFactory factory;
public WebSocketServletConnectionRFC6455(WebSocketFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
/*
public WebSocketServletConnectionRFC6455(WebSocketServerFactory factory, WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol,
List<Extension> extensions, int draft) throws IOException
{
super(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft);
this.factory = factory;
}
*/
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
@ -43,27 +40,31 @@ public class WebSocketServletConnectionRFC6455 extends WebSocketConnectionRFC645
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.addHeader("Sec-WebSocket-Accept",hashKey(key));
// response.addHeader("Sec-WebSocket-Accept",hashKey(key));
if (subprotocol != null)
{
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
}
/*
for (Extension ext : getExtensions())
{
response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
}
*/
response.sendError(101);
onFrameHandshake();
onWebSocketOpen();
// onFrameHandshake();
// onWebSocketOpen();
}
/*
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
*/
}

View File

@ -22,199 +22,82 @@ import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.api.OpCode;
public class TestServer extends Server
{
private static final Logger LOG = Log.getLogger(TestServer.class);
boolean _verbose;
WebSocket _websocket;
SelectChannelConnector _connector;
WebSocketHandler _wsHandler;
ResourceHandler _rHandler;
ConcurrentLinkedQueue<TestWebSocket> _broadcast = new ConcurrentLinkedQueue<TestWebSocket>();
public TestServer(int port)
{
_connector = new SelectChannelConnector();
_connector.setPort(port);
addConnector(_connector);
_wsHandler = new WebSocketHandler()
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
if ("org.ietf.websocket.test-echo".equals(protocol) || "echo".equals(protocol) || "lws-mirror-protocol".equals(protocol))
{
_websocket = new TestEchoWebSocket();
}
else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol) || "echo-broadcast".equals(protocol))
{
_websocket = new TestEchoBroadcastWebSocket();
}
else if ("echo-broadcast-ping".equals(protocol))
{
_websocket = new TestEchoBroadcastPingWebSocket();
}
else if ("org.ietf.websocket.test-echo-assemble".equals(protocol) || "echo-assemble".equals(protocol))
{
_websocket = new TestEchoAssembleWebSocket();
}
else if ("org.ietf.websocket.test-echo-fragment".equals(protocol) || "echo-fragment".equals(protocol))
{
_websocket = new TestEchoFragmentWebSocket();
}
else if (protocol==null)
{
_websocket = new TestWebSocket();
}
return _websocket;
}
};
setHandler(_wsHandler);
_rHandler=new ResourceHandler();
_rHandler.setDirectoriesListed(true);
_rHandler.setResourceBase("src/test/webapp");
_wsHandler.setHandler(_rHandler);
}
/* ------------------------------------------------------------ */
public boolean isVerbose()
{
return _verbose;
}
/* ------------------------------------------------------------ */
public void setVerbose(boolean verbose)
{
_verbose = verbose;
}
/* ------------------------------------------------------------ */
public void setResourceBase(String dir)
{
_rHandler.setResourceBase(dir);
}
/* ------------------------------------------------------------ */
public String getResourceBase()
{
return _rHandler.getResourceBase();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl
{
protected FrameConnection _connection;
public FrameConnection getConnection()
{
return _connection;
}
public void onOpen(Connection connection)
{
if (_verbose)
System.err.printf("%s#onOpen %s %s\n",this.getClass().getSimpleName(),connection,connection.getProtocol());
}
public void onHandshake(FrameConnection connection)
{
if (_verbose)
System.err.printf("%s#onHandshake %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName());
_connection = connection;
}
public void onClose(int code,String message)
{
if (_verbose)
System.err.printf("%s#onDisonnect %d %s\n",this.getClass().getSimpleName(),code,message);
}
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),TypeUtil.toHexString(data,offset,length));
return false;
}
public boolean onControl(byte controlCode, byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onControl %s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(controlCode),TypeUtil.toHexString(data,offset,length));
return false;
}
public void onMessage(String data)
{
if (_verbose)
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),data);
}
class TestEchoAssembleWebSocket extends TestWebSocket
{
@Override
public void onMessage(byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(data,offset,length));
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoWebSocket extends TestWebSocket
{
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(-1);
connection.setMaxBinaryMessageSize(-1);
}
@Override
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
super.onFrame(flags,opcode,data,offset,length);
super.onMessage(data,offset,length);
try
{
if (!getConnection().isControl(opcode))
getConnection().sendFrame(flags,opcode,data,offset,length);
getConnection().sendMessage(data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
return false;
@Override
public void onMessage(final String data)
{
super.onMessage(data);
try
{
getConnection().sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024);
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoBroadcastPingWebSocket extends TestEchoBroadcastWebSocket
{
Thread _keepAlive; // A dedicated thread is not a good way to do this
CountDownLatch _latch = new CountDownLatch(1);
@Override
public void onClose(int code, String message)
{
_latch.countDown();
super.onClose(code,message);
}
@Override
public boolean onControl(byte controlCode, byte[] data, int offset, int length)
{
if (controlCode==OpCode.PONG.getCode())
{
System.err.println("Pong "+getConnection());
}
return super.onControl(controlCode,data,offset,length);
}
@Override
public void onHandshake(final FrameConnection connection)
{
@ -230,7 +113,7 @@ public class TestServer extends Server
{
System.err.println("Ping "+connection);
byte[] data = { (byte)1, (byte) 2, (byte) 3 };
connection.sendControl(WebSocketConnectionRFC6455.OP_PING,data,0,data.length);
connection.sendControl(OpCode.PING.getCode(),data,0,data.length);
}
}
catch (Exception e)
@ -243,37 +126,12 @@ public class TestServer extends Server
}
@Override
public boolean onControl(byte controlCode, byte[] data, int offset, int length)
{
if (controlCode==WebSocketConnectionRFC6455.OP_PONG)
System.err.println("Pong "+getConnection());
return super.onControl(controlCode,data,offset,length);
}
@Override
public void onClose(int code, String message)
{
_latch.countDown();
super.onClose(code,message);
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoBroadcastWebSocket extends TestWebSocket
{
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
_broadcast.add(this);
}
@Override
public void onClose(int code,String message)
{
@ -316,62 +174,18 @@ public class TestServer extends Server
}
}
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoAssembleWebSocket extends TestWebSocket
{
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024);
}
@Override
public void onMessage(byte[] data, int offset, int length)
{
super.onMessage(data,offset,length);
try
{
getConnection().sendMessage(data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
_broadcast.add(this);
}
}
@Override
public void onMessage(final String data)
{
super.onMessage(data);
try
{
getConnection().sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoFragmentWebSocket extends TestWebSocket
{
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024);
}
@Override
public void onMessage(byte[] data, int offset, int length)
{
@ -379,7 +193,7 @@ public class TestServer extends Server
try
{
getConnection().sendFrame((byte)0x0,getConnection().binaryOpcode(),data,offset,length/2);
getConnection().sendFrame((byte)0x8,getConnection().binaryOpcode(),data,offset+length/2,length-length/2);
getConnection().sendFrame((byte)0x8,getConnection().binaryOpcode(),data,offset+(length/2),length-(length/2));
}
catch (IOException e)
{
@ -397,24 +211,132 @@ public class TestServer extends Server
int offset=0;
int length=data.length;
getConnection().sendFrame((byte)0x0,getConnection().textOpcode(),data,offset,length/2);
getConnection().sendFrame((byte)0x8,getConnection().textOpcode(),data,offset+length/2,length-length/2);
getConnection().sendFrame((byte)0x8,getConnection().textOpcode(),data,offset+(length/2),length-(length/2));
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024);
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoWebSocket extends TestWebSocket
{
@Override
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
super.onFrame(flags,opcode,data,offset,length);
try
{
if (!getConnection().isControl(opcode))
{
getConnection().sendFrame(flags,opcode,data,offset,length);
}
}
catch (IOException e)
{
e.printStackTrace();
}
private static void usage()
{
System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]");
System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -v|--verbose ");
System.err.println(" -d|--docroot file (default 'src/test/webapp')");
System.exit(1);
return false;
}
@Override
public void onOpen(Connection connection)
{
super.onOpen(connection);
connection.setMaxTextMessageSize(-1);
connection.setMaxBinaryMessageSize(-1);
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl
{
protected FrameConnection _connection;
public FrameConnection getConnection()
{
return _connection;
}
@Override
public void onClose(int code,String message)
{
if (_verbose)
{
System.err.printf("%s#onDisonnect %d %s\n",this.getClass().getSimpleName(),code,message);
}
}
@Override
public boolean onControl(byte controlCode, byte[] data, int offset, int length)
{
if (_verbose)
{
System.err.printf("%s#onControl %s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(controlCode),TypeUtil.toHexString(data,offset,length));
}
return false;
}
@Override
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
if (_verbose)
{
System.err.printf("%s#onFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),TypeUtil.toHexString(data,offset,length));
}
return false;
}
@Override
public void onHandshake(FrameConnection connection)
{
if (_verbose)
{
System.err.printf("%s#onHandshake %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName());
}
_connection = connection;
}
@Override
public void onMessage(byte[] data, int offset, int length)
{
if (_verbose)
{
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(data,offset,length));
}
}
@Override
public void onMessage(String data)
{
if (_verbose)
{
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),data);
}
}
@Override
public void onOpen(Connection connection)
{
if (_verbose)
{
System.err.printf("%s#onOpen %s %s\n",this.getClass().getSimpleName(),connection,connection.getProtocol());
}
}
}
private static final Logger LOG = Log.getLogger(TestServer.class);
public static void main(String... args)
{
try
@ -427,14 +349,22 @@ public class TestServer extends Server
{
String a=args[i];
if ("-p".equals(a)||"--port".equals(a))
{
port=Integer.parseInt(args[++i]);
}
else if ("-v".equals(a)||"--verbose".equals(a))
{
verbose=true;
}
else if ("-d".equals(a)||"--docroot".equals(a))
{
docroot=args[++i];
}
else if (a.startsWith("-"))
{
usage();
}
}
TestServer server = new TestServer(port);
@ -449,5 +379,99 @@ public class TestServer extends Server
}
}
private static void usage()
{
System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]");
System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -v|--verbose ");
System.err.println(" -d|--docroot file (default 'src/test/webapp')");
System.exit(1);
}
boolean _verbose;
WebSocket _websocket;
SelectChannelConnector _connector;
WebSocketHandler _wsHandler;
ResourceHandler _rHandler;
ConcurrentLinkedQueue<TestWebSocket> _broadcast = new ConcurrentLinkedQueue<TestWebSocket>();
public TestServer(int port)
{
_connector = new SelectChannelConnector();
_connector.setPort(port);
addConnector(_connector);
_wsHandler = new WebSocketHandler()
{
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
if ("org.ietf.websocket.test-echo".equals(protocol) || "echo".equals(protocol) || "lws-mirror-protocol".equals(protocol))
{
_websocket = new TestEchoWebSocket();
}
else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol) || "echo-broadcast".equals(protocol))
{
_websocket = new TestEchoBroadcastWebSocket();
}
else if ("echo-broadcast-ping".equals(protocol))
{
_websocket = new TestEchoBroadcastPingWebSocket();
}
else if ("org.ietf.websocket.test-echo-assemble".equals(protocol) || "echo-assemble".equals(protocol))
{
_websocket = new TestEchoAssembleWebSocket();
}
else if ("org.ietf.websocket.test-echo-fragment".equals(protocol) || "echo-fragment".equals(protocol))
{
_websocket = new TestEchoFragmentWebSocket();
}
else if (protocol==null)
{
_websocket = new TestWebSocket();
}
return _websocket;
}
};
setHandler(_wsHandler);
_rHandler=new ResourceHandler();
_rHandler.setDirectoriesListed(true);
_rHandler.setResourceBase("src/test/webapp");
_wsHandler.setHandler(_rHandler);
}
/* ------------------------------------------------------------ */
public String getResourceBase()
{
return _rHandler.getResourceBase();
}
/* ------------------------------------------------------------ */
public boolean isVerbose()
{
return _verbose;
}
/* ------------------------------------------------------------ */
public void setResourceBase(String dir)
{
_rHandler.setResourceBase(dir);
}
/* ------------------------------------------------------------ */
public void setVerbose(boolean verbose)
{
_verbose = verbose;
}
}

View File

@ -15,26 +15,23 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.servlet.helper.CaptureSocket;
import org.eclipse.jetty.websocket.servlet.helper.MessageSender;
import org.eclipse.jetty.websocket.servlet.helper.WebSocketCaptureServlet;
import org.eclipse.jetty.websocket.server.helper.CaptureSocket;
import org.eclipse.jetty.websocket.server.helper.MessageSender;
import org.eclipse.jetty.websocket.server.helper.WebSocketCaptureServlet;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* WebSocketCommTest - to test reported undelivered messages in bug <a
* href="https://jira.codehaus.org/browse/JETTY-1463">JETTY-1463</a>
@ -42,6 +39,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class WebSocketCommTest
{
private Server server;
private SelectChannelConnector connector;
private WebSocketCaptureServlet servlet;
private URI serverUri;
@ -49,7 +47,9 @@ public class WebSocketCommTest
public void startServer() throws Exception
{
// Configure Server
server = new Server(0);
server = new Server();
connector = new SelectChannelConnector();
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
@ -62,13 +62,12 @@ public class WebSocketCommTest
// Start Server
server.start();
Connector conn = server.getConnectors()[0];
String host = conn.getHost();
String host = connector.getHost();
if (host == null)
{
host = "localhost";
}
int port = conn.getLocalPort();
int port = connector.getLocalPort();
serverUri = new URI(String.format("ws://%s:%d/",host,port));
System.out.printf("Server URI: %s%n",serverUri);
}
@ -89,12 +88,12 @@ public class WebSocketCommTest
@Test
public void testSendTextMessages() throws Exception
{
WebSocketClientFactory clientFactory = new WebSocketClientFactory();
clientFactory.start();
// WebSocketClientFactory clientFactory = new WebSocketClientFactory();
// clientFactory.start();
WebSocketClient wsc = clientFactory.newWebSocketClient();
// WebSocketClient wsc = clientFactory.newWebSocketClient();
MessageSender sender = new MessageSender();
wsc.open(serverUri,sender);
// wsc.open(serverUri,sender);
try
{

View File

@ -15,6 +15,8 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.server;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
@ -22,37 +24,177 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.management.ManagementFactory;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import junit.framework.Assert;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketGeneratorRFC6455;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.masks.FixedMasker;
import org.eclipse.jetty.websocket.servlet.WebSocketHandler;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.frames.BaseFrame;
import org.eclipse.jetty.websocket.frames.TextFrame;
import org.eclipse.jetty.websocket.generator.Generator;
import org.eclipse.jetty.websocket.parser.Parser;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class WebSocketLoadRFC6455Test
{
private static class EchoWebSocket implements WebSocket.OnTextMessage
{
private volatile Connection outbound;
@Override
public void onClose(int closeCode, String message)
{
}
@Override
public void onMessage(String data)
{
try
{
// System.err.println(">> "+data);
outbound.sendMessage(data);
}
catch (IOException x)
{
outbound.disconnect();
}
}
@Override
public void onOpen(Connection outbound)
{
this.outbound = outbound;
}
}
private class WebSocketClient implements Runnable
{
private final Socket socket;
private final BufferedWriter output;
private final BufferedReader input;
private final int iterations;
private final CountDownLatch latch;
private/* final */AsyncEndPoint _endp;
private final Generator _generator;
private final Parser _parser;
private final Parser.Listener _handler = new Parser.Listener()
{
/*
* public void close(int code,String message) { }
*
* public void onFrame(byte flags, byte opcode, ByteBuffer buffer) { _response=buffer; }
*/
@Override
public void onFrame(BaseFrame frame)
{
// TODO Auto-generated method stub
}
@Override
public void onWebSocketException(WebSocketException e)
{
// TODO Auto-generated method stub
}
};
private volatile ByteBuffer _response;
public WebSocketClient(String host, int port, int readTimeout, CountDownLatch latch, int iterations) throws IOException
{
this.latch = latch;
socket = new Socket(host, port);
socket.setSoTimeout(readTimeout);
output = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "ISO-8859-1"));
input = new BufferedReader(new InputStreamReader(socket.getInputStream(), "ISO-8859-1"));
this.iterations = iterations;
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
// _endp=new SocketEndPoint(socket);
StandardByteBufferPool bufferPool = new StandardByteBufferPool();
_generator = new Generator(bufferPool,policy);
_parser = new Parser(policy);
}
public void close() throws IOException
{
socket.close();
}
private void open() throws IOException
{
output.write("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n");
output.flush();
String responseLine = input.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Switching Protocols"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = input.readLine()) != null)
{
if (line.length() == 0)
{
break;
}
}
}
@Override
public void run()
{
try
{
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
for (int i = 0; i < iterations; ++i)
{
TextFrame txt = new TextFrame();
txt.setData(message);
ByteBuffer buf = _generator.generate(txt);
// TODO: Send it
// TODO: Receive response
Assert.assertEquals(message,_response.toString());
latch.countDown();
}
}
catch (Throwable x)
{
throw new RuntimeException(x);
}
}
}
private static Server _server;
private static Connector _connector;
private static SelectChannelConnector _connector;
@BeforeClass
public static void startServer() throws Exception
@ -68,6 +210,7 @@ public class WebSocketLoadRFC6455Test
WebSocketHandler wsHandler = new WebSocketHandler()
{
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return new EchoWebSocket();
@ -105,142 +248,25 @@ public class WebSocketLoadRFC6455Test
//long start = System.nanoTime();
for (WebSocketClient client : clients)
{
threadPool.execute(client);
}
int parallelism = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
long maxTimePerIteration = 5;
assertTrue(latch.await(iterations * (count / parallelism + 1) * maxTimePerIteration, TimeUnit.MILLISECONDS));
assertTrue(latch.await(iterations * ((count / parallelism) + 1) * maxTimePerIteration, TimeUnit.MILLISECONDS));
//long end = System.nanoTime();
// System.err.println("Elapsed: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
for (WebSocketClient client : clients)
{
client.close();
}
}
finally
{
threadPool.shutdown();
assertTrue(threadPool.awaitTermination(2, TimeUnit.SECONDS));
}
}
private static class EchoWebSocket implements WebSocket.OnTextMessage
{
private volatile Connection outbound;
public void onOpen(Connection outbound)
{
this.outbound = outbound;
}
public void onMessage(String data)
{
try
{
// System.err.println(">> "+data);
outbound.sendMessage(data);
}
catch (IOException x)
{
outbound.disconnect();
}
}
public void onClose(int closeCode, String message)
{
}
}
private class WebSocketClient implements Runnable
{
private final Socket socket;
private final BufferedWriter output;
private final BufferedReader input;
private final int iterations;
private final CountDownLatch latch;
private final SocketEndPoint _endp;
private final WebSocketGeneratorRFC6455 _generator;
private final WebSocketParserRFC6455 _parser;
private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler()
{
public void onFrame(byte flags, byte opcode, ByteBuffer buffer)
{
_response=buffer;
}
public void close(int code,String message)
{
}
};
private volatile ByteBuffer _response;
public WebSocketClient(String host, int port, int readTimeout, CountDownLatch latch, int iterations) throws IOException
{
this.latch = latch;
socket = new Socket(host, port);
socket.setSoTimeout(readTimeout);
output = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "ISO-8859-1"));
input = new BufferedReader(new InputStreamReader(socket.getInputStream(), "ISO-8859-1"));
this.iterations = iterations;
_endp=new SocketEndPoint(socket);
_generator = new WebSocketGeneratorRFC6455(new WebSocketBuffers(32*1024),_endp,new FixedMasker());
_parser = new WebSocketParserRFC6455(new WebSocketBuffers(32*1024),_endp,_handler,false);
}
private void open() throws IOException
{
output.write("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n");
output.flush();
String responseLine = input.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Switching Protocols"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = input.readLine()) != null)
if (line.length() == 0)
break;
}
public void run()
{
try
{
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
for (int i = 0; i < iterations; ++i)
{
byte[] data = message.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length);
_generator.flush();
//System.err.println("-> "+message);
_response=null;
while(_response==null)
_parser.parseNext();
//System.err.println("<- "+_response);
Assert.assertEquals(message,_response.toString());
latch.countDown();
}
}
catch (IOException x)
{
throw new RuntimeException(x);
}
}
public void close() throws IOException
{
socket.close();
}
}
}

View File

@ -16,10 +16,10 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Server;
@ -28,12 +28,6 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.masks.ZeroMasker;
import org.eclipse.jetty.websocket.servlet.WebSocketHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -43,9 +37,51 @@ public class WebSocketOverSSLTest
private Server _server;
private int _port;
private QueuedThreadPool _threadPool;
private WebSocketClientFactory _wsFactory;
// private WebSocketClientFactory _wsFactory;
private WebSocket.Connection _connection;
@After
public void destroy() throws Exception
{
if (_connection != null)
{
_connection.close();
}
// if (_wsFactory != null)
// _wsFactory.stop();
if (_threadPool != null)
{
_threadPool.stop();
}
if (_server != null)
{
_server.stop();
_server.join();
}
}
private void startClient(final WebSocket webSocket) throws Exception
{
Assert.assertTrue(_server.isStarted());
_threadPool = new QueuedThreadPool();
_threadPool.setName("wsc-" + _threadPool.getName());
_threadPool.start();
// _wsFactory = new WebSocketClientFactory(_threadPool, new ZeroMasker());
// SslContextFactory cf = _wsFactory.getSslContextFactory();
// cf.setKeyStorePath(MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath());
// cf.setKeyStorePassword("storepwd");
// cf.setKeyManagerPassword("keypwd");
// _wsFactory.start();
// WebSocketClient client = new WebSocketClient(_wsFactory);
// _connection = client.open(new URI("wss://localhost:" + _port), webSocket).get(5, TimeUnit.SECONDS);
}
private void startServer(final WebSocket webSocket) throws Exception
{
_server = new Server();
@ -57,6 +93,7 @@ public class WebSocketOverSSLTest
cf.setKeyManagerPassword("keypwd");
_server.setHandler(new WebSocketHandler()
{
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return webSocket;
@ -66,42 +103,71 @@ public class WebSocketOverSSLTest
_port = connector.getLocalPort();
}
private void startClient(final WebSocket webSocket) throws Exception
@Test
public void testManyMessages() throws Exception
{
Assert.assertTrue(_server.isStarted());
startServer(new WebSocket.OnTextMessage()
{
private Connection connection;
_threadPool = new QueuedThreadPool();
_threadPool.setName("wsc-" + _threadPool.getName());
_threadPool.start();
_wsFactory = new WebSocketClientFactory(_threadPool, new ZeroMasker());
SslContextFactory cf = _wsFactory.getSslContextFactory();
cf.setKeyStorePath(MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath());
cf.setKeyStorePassword("storepwd");
cf.setKeyManagerPassword("keypwd");
_wsFactory.start();
WebSocketClient client = new WebSocketClient(_wsFactory);
_connection = client.open(new URI("wss://localhost:" + _port), webSocket).get(5, TimeUnit.SECONDS);
@Override
public void onClose(int closeCode, String message)
{
}
@After
public void destroy() throws Exception
@Override
public void onMessage(String data)
{
if (_connection != null)
_connection.close();
if (_wsFactory != null)
_wsFactory.stop();
if (_threadPool != null)
_threadPool.stop();
if (_server != null)
try
{
_server.stop();
_server.join();
connection.sendMessage(data);
}
catch (IOException x)
{
x.printStackTrace();
}
}
@Override
public void onOpen(Connection connection)
{
this.connection = connection;
}
});
int count = 1000;
final CountDownLatch clientLatch = new CountDownLatch(count);
startClient(new WebSocket.OnTextMessage()
{
@Override
public void onClose(int closeCode, String message)
{
}
@Override
public void onMessage(String data)
{
clientLatch.countDown();
}
@Override
public void onOpen(Connection connection)
{
}
});
char[] chars = new char[256];
Arrays.fill(chars, 'x');
String message = new String(chars);
for (int i = 0; i < count; ++i)
{
_connection.sendMessage(message);
}
Assert.assertTrue(clientLatch.await(20, TimeUnit.SECONDS));
// While messages may have all arrived, the SSL close alert
// may be in the way so give some time for it to be processed.
TimeUnit.SECONDS.sleep(1);
}
@Test
@ -113,11 +179,12 @@ public class WebSocketOverSSLTest
{
private Connection connection;
public void onOpen(Connection connection)
@Override
public void onClose(int closeCode, String message)
{
this.connection = connection;
}
@Override
public void onMessage(String data)
{
try
@ -132,24 +199,29 @@ public class WebSocketOverSSLTest
}
}
public void onClose(int closeCode, String message)
@Override
public void onOpen(Connection connection)
{
this.connection = connection;
}
});
final CountDownLatch clientLatch = new CountDownLatch(1);
startClient(new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
@Override
public void onClose(int closeCode, String message)
{
}
@Override
public void onMessage(String data)
{
Assert.assertEquals(message, data);
clientLatch.countDown();
}
public void onClose(int closeCode, String message)
@Override
public void onOpen(Connection connection)
{
}
});
@ -158,63 +230,4 @@ public class WebSocketOverSSLTest
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testManyMessages() throws Exception
{
startServer(new WebSocket.OnTextMessage()
{
private Connection connection;
public void onOpen(Connection connection)
{
this.connection = connection;
}
public void onMessage(String data)
{
try
{
connection.sendMessage(data);
}
catch (IOException x)
{
x.printStackTrace();
}
}
public void onClose(int closeCode, String message)
{
}
});
int count = 1000;
final CountDownLatch clientLatch = new CountDownLatch(count);
startClient(new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
}
public void onMessage(String data)
{
clientLatch.countDown();
}
public void onClose(int closeCode, String message)
{
}
});
char[] chars = new char[256];
Arrays.fill(chars, 'x');
String message = new String(chars);
for (int i = 0; i < count; ++i)
_connection.sendMessage(message);
Assert.assertTrue(clientLatch.await(20, TimeUnit.SECONDS));
// While messages may have all arrived, the SSL close alert
// may be in the way so give some time for it to be processed.
TimeUnit.SECONDS.sleep(1);
}
}

View File

@ -16,13 +16,17 @@
package org.eclipse.jetty.websocket.server;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@ -32,15 +36,16 @@ public class WebSocketRedeployTest
private Server server;
private ServletContextHandler context;
private String uri;
private WebSocketClientFactory wsFactory;
// private WebSocketClientFactory wsFactory;
@After
public void destroy() throws Exception
{
if (wsFactory != null)
{
wsFactory.stop();
}
// if (wsFactory != null)
// {
// wsFactory.stop();
// }
if (server != null)
{
server.stop();
@ -52,7 +57,7 @@ public class WebSocketRedeployTest
{
server = new Server();
SelectChannelConnector connector = new SelectChannelConnector();
// connector.setPort(8080);
// connector.setPort(8080);
server.addConnector(connector);
HandlerCollection handlers = new HandlerCollection();
@ -63,6 +68,7 @@ public class WebSocketRedeployTest
WebSocketServlet servlet = new WebSocketServlet()
{
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return webSocket;
@ -75,8 +81,8 @@ public class WebSocketRedeployTest
uri = "ws://localhost:" + connector.getLocalPort() + contextPath + servletPath;
wsFactory = new WebSocketClientFactory();
wsFactory.start();
// wsFactory = new WebSocketClientFactory();
// wsFactory.start();
}
@Test
@ -104,30 +110,30 @@ public class WebSocketRedeployTest
}
});
WebSocketClient client = wsFactory.newWebSocketClient();
client.open(new URI(uri), new WebSocket.OnTextMessage()
{
@Override
public void onClose(int closeCode, String message)
{
closeLatch.countDown();
}
@Override
public void onMessage(String data)
{
}
@Override
public void onOpen(Connection connection)
{
openLatch.countDown();
}
}, 5, TimeUnit.SECONDS);
// WebSocketClient client = wsFactory.newWebSocketClient();
// client.open(new URI(uri), new WebSocket.OnTextMessage()
// {
// @Override
// public void onClose(int closeCode, String message)
// {
// closeLatch.countDown();
// }
//
// @Override
// public void onMessage(String data)
// {
// }
//
// @Override
// public void onOpen(Connection connection)
// {
// openLatch.countDown();
// }
// }, 5, TimeUnit.SECONDS);
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));
wsFactory.stop();
// wsFactory.stop();
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@ -157,26 +163,26 @@ public class WebSocketRedeployTest
}
});
WebSocketClient client = wsFactory.newWebSocketClient();
client.open(new URI(uri), new WebSocket.OnTextMessage()
{
@Override
public void onClose(int closeCode, String message)
{
closeLatch.countDown();
}
@Override
public void onMessage(String data)
{
}
@Override
public void onOpen(Connection connection)
{
openLatch.countDown();
}
}, 5, TimeUnit.SECONDS);
// WebSocketClient client = wsFactory.newWebSocketClient();
// client.open(new URI(uri), new WebSocket.OnTextMessage()
// {
// @Override
// public void onClose(int closeCode, String message)
// {
// closeLatch.countDown();
// }
//
// @Override
// public void onMessage(String data)
// {
// }
//
// @Override
// public void onOpen(Connection connection)
// {
// openLatch.countDown();
// }
// }, 5, TimeUnit.SECONDS);
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));

View File

@ -15,20 +15,14 @@ import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketGeneratorRFC6455Test;
import org.eclipse.jetty.websocket.WebSocketParserRFC6455Test;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.servlet.helper.MessageSender;
import org.eclipse.jetty.websocket.servlet.helper.WebSocketServlet;
import org.eclipse.jetty.websocket.server.helper.MessageSender;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -43,20 +37,27 @@ import org.junit.Test;
*/
public class WebSocketServletRFCTest
{
@SuppressWarnings("serial")
private static class RFCServlet extends WebSocketServlet
{
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return new RFCSocket();
}
}
private static class RFCSocket implements WebSocket, WebSocket.OnTextMessage
{
private Connection conn;
public void onOpen(Connection connection)
{
this.conn = connection;
}
@Override
public void onClose(int closeCode, String message)
{
this.conn = null;
}
@Override
public void onMessage(String data)
{
// Test the RFC 6455 close code 1011 that should close
@ -77,25 +78,25 @@ public class WebSocketServletRFCTest
}
}
@Override
public void onOpen(Connection connection)
{
this.conn = connection;
}
@SuppressWarnings("serial")
private static class RFCServlet extends WebSocketServlet
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return new RFCSocket();
}
}
private static Server server;
private static SelectChannelConnector connector;
private static URI serverUri;
@BeforeClass
public static void startServer() throws Exception
{
// Configure Server
server = new Server(0);
server = new Server();
connector = new SelectChannelConnector();
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
@ -107,13 +108,12 @@ public class WebSocketServletRFCTest
// Start Server
server.start();
Connector conn = server.getConnectors()[0];
String host = conn.getHost();
String host = connector.getHost();
if (host == null)
{
host = "localhost";
}
int port = conn.getLocalPort();
int port = connector.getLocalPort();
serverUri = new URI(String.format("ws://%s:%d/",host,port));
System.out.printf("Server URI: %s%n",serverUri);
}
@ -131,6 +131,59 @@ public class WebSocketServletRFCTest
}
}
private String readResponseHeader(InputStream in) throws IOException
{
InputStreamReader isr = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(isr);
StringBuilder header = new StringBuilder();
// Read the response header
String line = reader.readLine();
Assert.assertNotNull(line);
Assert.assertThat(line,startsWith("HTTP/1.1 "));
header.append(line).append("\r\n");
while ((line = reader.readLine()) != null)
{
if (line.trim().length() == 0)
{
break;
}
header.append(line).append("\r\n");
}
return header.toString();
}
/**
* Test the requirement of responding with server terminated close code 1011 when there is an unhandled (internal
* server error) being produced by the extended WebSocketServlet.
*/
@Test
public void testResponseOnInternalError() throws Exception
{
// WebSocketClientFactory clientFactory = new WebSocketClientFactory();
// clientFactory.start();
// WebSocketClient wsc = clientFactory.newWebSocketClient();
MessageSender sender = new MessageSender();
// wsc.open(serverUri,sender);
try
{
sender.awaitConnect();
sender.sendMessage("CRASH");
// Give servlet 500 millisecond to process messages
TimeUnit.MILLISECONDS.sleep(500);
Assert.assertThat("WebSocket should be closed",sender.isConnected(),is(false));
Assert.assertThat("WebSocket close clode",sender.getCloseCode(),is(1011));
}
finally
{
sender.close();
}
}
/**
* Test the requirement of responding with an http 400 when using a Sec-WebSocket-Version that is unsupported.
*/
@ -177,57 +230,4 @@ public class WebSocketServletRFCTest
socket.close();
}
}
private String readResponseHeader(InputStream in) throws IOException
{
InputStreamReader isr = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(isr);
StringBuilder header = new StringBuilder();
// Read the response header
String line = reader.readLine();
Assert.assertNotNull(line);
Assert.assertThat(line,startsWith("HTTP/1.1 "));
header.append(line).append("\r\n");
while ((line = reader.readLine()) != null)
{
if (line.trim().length() == 0)
{
break;
}
header.append(line).append("\r\n");
}
return header.toString();
}
/**
* Test the requirement of responding with server terminated close code 1011 when there is an unhandled (internal
* server error) being produced by the extended WebSocketServlet.
*/
@Test
public void testResponseOnInternalError() throws Exception
{
WebSocketClientFactory clientFactory = new WebSocketClientFactory();
clientFactory.start();
WebSocketClient wsc = clientFactory.newWebSocketClient();
MessageSender sender = new MessageSender();
wsc.open(serverUri,sender);
try
{
sender.awaitConnect();
sender.sendMessage("CRASH");
// Give servlet 500 millisecond to process messages
TimeUnit.MILLISECONDS.sleep(500);
Assert.assertThat("WebSocket should be closed",sender.isConnected(),is(false));
Assert.assertThat("WebSocket close clode",sender.getCloseCode(),is(1011));
}
finally
{
sender.close();
}
}
}

View File

@ -15,6 +15,8 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.server.helper;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -25,13 +27,12 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
public class SafariD00
{
private URI uri;
@ -63,6 +64,11 @@ public class SafariD00
return socket;
}
public void disconnect() throws IOException
{
socket.close();
}
/**
* Issue an Http websocket (Draft-0) upgrade request using the Safari particulars.
*
@ -126,7 +132,7 @@ public class SafariD00
len += (msg.length() + 2);
}
ByteArrayBuffer buf = new ByteArrayBuffer(len);
ByteBuffer buf = ByteBuffer.allocate(len);
for (String msg : msgs)
{
@ -135,12 +141,7 @@ public class SafariD00
buf.put((byte)0xFF);
}
out.write(buf.array());
BufferUtil.writeTo(buf,out);
out.flush();
}
public void disconnect() throws IOException
{
socket.close();
}
}

View File

@ -24,7 +24,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.servlet.helper.WebSocketServlet;
import org.eclipse.jetty.websocket.server.WebSocketServlet;
@SuppressWarnings("serial")
public class WebSocketCaptureServlet extends WebSocketServlet
@ -37,6 +37,7 @@ public class WebSocketCaptureServlet extends WebSocketServlet
resp.sendError(404);
}
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
CaptureSocket capture = new CaptureSocket();