Working out some fallout of the Session split

This commit is contained in:
Joakim Erdfelt 2012-07-17 11:20:18 -07:00
parent 6ece593c58
commit 4aa54d244a
8 changed files with 95 additions and 37 deletions

View File

@ -19,9 +19,10 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* For working with the {@link WebSocketConnection} in a blocking technique.
@ -30,6 +31,30 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
*/
public class WebSocketBlockingConnection
{
private static class Blocker extends FutureCallback<Void>
{
@Override
public void completed(Void context)
{
LOG.debug("completed({})",context);
super.completed(context);
}
@Override
public void failed(Void context, Throwable cause)
{
LOG.debug("failed({},{})",context,cause);
super.failed(context,cause);
}
@Override
public String toString()
{
return String.format("%s[%s]",Blocker.class.getSimpleName(),super.toString());
}
}
private static final Logger LOG = Log.getLogger(WebSocketBlockingConnection.class);
private final WebSocketSession conn;
public WebSocketBlockingConnection(WebSocketConnection conn)
@ -51,12 +76,11 @@ public class WebSocketBlockingConnection
*/
public void write(byte[] data, int offset, int length) throws IOException
{
WebSocketFrame frame = WebSocketFrame.binary().setPayload(data,offset,length);
try
{
FutureCallback<Void> blocking = new FutureCallback<>();
conn.output(null,blocking,frame);
blocking.get(); // block till finished
Blocker blocker = new Blocker();
conn.write(null,blocker,data,offset,length);
blocker.get(); // block till finished
}
catch (InterruptedException e)
{
@ -75,12 +99,11 @@ public class WebSocketBlockingConnection
*/
public void write(String message) throws IOException
{
WebSocketFrame frame = WebSocketFrame.text(message);
try
{
FutureCallback<Void> blocking = new FutureCallback<>();
conn.output(null,blocking,frame);
blocking.get(); // block till finished
Blocker blocker = new Blocker();
conn.write(null,blocker,message);
blocker.get(); // block till finished
}
catch (InterruptedException e)
{

View File

@ -21,10 +21,13 @@ import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public abstract class FrameBytes<C> implements Callback<C>, Runnable
{
private final static Logger LOG = Log.getLogger(FrameBytes.class);
protected final WebSocketAsyncConnection connection;
protected final Callback<C> callback;
protected final C context;
@ -52,6 +55,10 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
@Override
public void completed(C context)
{
if (LOG.isDebugEnabled())
{
LOG.debug("completed({})",context);
}
cancelTask();
connection.complete(this);
callback.completed(context);
@ -60,6 +67,10 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
@Override
public void failed(C context, Throwable x)
{
if (LOG.isDebugEnabled())
{
LOG.debug("failed({},{})",context,x);
}
cancelTask();
callback.failed(context,x);
}

View File

@ -339,8 +339,20 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
private <C> void write(ByteBuffer buffer, WebSocketAsyncConnection webSocketAsyncConnection, FrameBytes<C> frameBytes)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
LOG.debug("EndPoint: {}",getEndPoint());
}
try
{
getEndPoint().write(frameBytes.context,frameBytes,buffer);
}
catch (Throwable t)
{
LOG.debug(t);
frameBytes.failed(frameBytes.context,t);
}
}
}

View File

@ -96,6 +96,10 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
if (LOG.isDebugEnabled())
{
LOG.debug("output({},{},{})",context,callback,frame);
}
// forward on to chain
outgoing.output(context,callback,frame);
}
@ -108,7 +112,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
frame.setFin(true);
outgoing.output(context,callback,frame);
output(context,callback,frame);
}
public void setOutgoing(OutgoingFrames outgoing)
@ -128,7 +132,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
frame.setFin(true);
outgoing.output(context,callback,frame);
output(context,callback,frame);
}
/**
@ -143,7 +147,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
}
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
frame.setFin(true);
outgoing.output(context,callback,frame);
output(context,callback,frame);
}
/**
@ -158,6 +162,6 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
}
WebSocketFrame frame = WebSocketFrame.text(message);
frame.setFin(true);
outgoing.output(context,callback,frame);
output(context,callback,frame);
}
}

View File

@ -47,6 +47,7 @@ public class Parser
private int cursor = 0;
// Frame
private WebSocketFrame frame;
private OpCode lastDataOpcode;
// payload specific
private ByteBuffer payload;
private int payloadLength;
@ -267,7 +268,7 @@ public class Parser
throw new ProtocolException("Fragment continuation frame without prior !FIN");
}
// Be careful to use the original opcode
opcode = frame.getOpCode();
opcode = lastDataOpcode;
}
// base framing flags
@ -278,6 +279,11 @@ public class Parser
frame.setRsv3(rsv3);
frame.setOpCode(opcode);
if (opcode.isDataFrame())
{
lastDataOpcode = opcode;
}
state = State.PAYLOAD_LEN;
break;
}

View File

@ -86,8 +86,10 @@ public class TestABCase5
}
}
private static SimpleServletServer server;
private static final byte FIN = (byte)0x80;
private static final byte NOFIN = 0x00;
private static SimpleServletServer server;
private static Generator laxGenerator;
@BeforeClass
@ -126,7 +128,7 @@ public class TestABCase5
String fragment1 = "fragment1";
buf.put((byte)(0x00 | OpCode.PING.getCode()));
buf.put((byte)(NOFIN | OpCode.PING.getCode()));
byte b = 0x00; // no masking
b |= fragment1.length() & 0x7F;
@ -141,7 +143,7 @@ public class TestABCase5
String fragment2 = "fragment2";
buf2.put((byte)(0x80 | OpCode.PING.getCode()));
buf2.put((byte)(FIN | OpCode.PING.getCode()));
b = 0x00; // no masking
b |= fragment2.length() & 0x7F;
buf2.put(b);
@ -213,7 +215,7 @@ public class TestABCase5
String fragment1 = "fragment1";
buf.put((byte)(0x00 | OpCode.PONG.getCode()));
buf.put((byte)(NOFIN | OpCode.PONG.getCode()));
byte b = 0x00; // no masking
b |= fragment1.length() & 0x7F;
@ -228,7 +230,7 @@ public class TestABCase5
String fragment2 = "fragment2";
buf2.put((byte)(0x80 | OpCode.CONTINUATION.getCode()));
buf2.put((byte)(FIN | OpCode.CONTINUATION.getCode()));
b = 0x00; // no masking
b |= fragment2.length() & 0x7F;
buf2.put(b);
@ -280,7 +282,7 @@ public class TestABCase5
}
finally
{
client.close();
client.disconnect();
}
}
@ -299,7 +301,7 @@ public class TestABCase5
String fragment1 = "fragment1";
buf.put((byte)(0x00 | OpCode.TEXT.getCode()));
buf.put((byte)(NOFIN | OpCode.TEXT.getCode()));
byte b = 0x00; // no masking
b |= fragment1.length() & 0x7F;
@ -314,7 +316,7 @@ public class TestABCase5
String fragment2 = "fragment2";
buf2.put((byte)(0x80 | OpCode.CONTINUATION.getCode()));
buf2.put((byte)(FIN | OpCode.CONTINUATION.getCode()));
b = 0x00; // no masking
b |= fragment2.length() & 0x7F;
buf2.put(b);
@ -354,7 +356,7 @@ public class TestABCase5
String fragment1 = "fragment1";
buf.put((byte)(0x00 | OpCode.TEXT.getCode()));
buf.put((byte)(NOFIN | OpCode.TEXT.getCode()));
byte b = 0x00; // no masking
b |= fragment1.length() & 0x7F;
@ -371,7 +373,7 @@ public class TestABCase5
String pingPayload = "ping payload";
pingBuf.put((byte)(0x00 | OpCode.PING.getCode()));
pingBuf.put((byte)(FIN | OpCode.PING.getCode()));
b = 0x00; // no masking
b |= pingPayload.length() & 0x7F;
@ -379,7 +381,7 @@ public class TestABCase5
pingBuf.put(pingPayload.getBytes());
BufferUtil.flipToFlush(pingBuf,0);
client.writeRaw(buf);
client.writeRaw(pingBuf);
// Send remaining text as continuation
@ -388,7 +390,7 @@ public class TestABCase5
String fragment2 = "fragment2";
buf2.put((byte)(0x80 | OpCode.CONTINUATION.getCode()));
buf2.put((byte)(FIN | OpCode.CONTINUATION.getCode()));
b = 0x00; // no masking
b |= fragment2.length() & 0x7F;
buf2.put(b);
@ -398,7 +400,7 @@ public class TestABCase5
client.writeRaw(buf2);
// Should be 2 frames, pong frame followed by combined echo'd text frame
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,500);
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove();
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PING));
@ -487,9 +489,9 @@ public class TestABCase5
String fragment1 = "fragment";
// continutation w / FIN
// continuation w / FIN
buf.put((byte)(0x80 | OpCode.CONTINUATION.getCode()));
buf.put((byte)(FIN | OpCode.CONTINUATION.getCode()));
byte b = 0x00; // no masking
b |= fragment1.length() & 0x7F;
@ -507,10 +509,8 @@ public class TestABCase5
Assert.assertThat("CloseFrame.status code",new CloseInfo(frame).getStatusCode(),is(1002));
Assert.assertThat("CloseFrame.reason",new CloseInfo(frame).getReason(),is("Bad Continuation")); // TODO put close reasons into public strings in
// impl
// someplace
Assert.assertThat("CloseFrame.reason",new CloseInfo(frame).getReason(),is("Bad Continuation"));
// TODO put close reasons into public strings in impl someplace?
}
finally
{

View File

@ -121,6 +121,7 @@ public class BlockheadClient implements IncomingFrames
public void close()
{
LOG.debug("close()");
close(-1,null);
}
@ -150,7 +151,7 @@ public class BlockheadClient implements IncomingFrames
in = socket.getInputStream();
}
private void disconnect()
public void disconnect()
{
LOG.debug("disconnect");
IO.close(in);
@ -257,6 +258,7 @@ public class BlockheadClient implements IncomingFrames
public Queue<WebSocketFrame> readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrameQueue.size();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=INFO
org.eclipse.jetty.io.LEVEL=DEBUG
org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.generator.LEVEL=DEBUG