428232 - Rework batch mode / buffering in websocket.

Refactored FrameFlusher to handle aggregation of frames to support
JSR 356's batch mode.

Now FrameFlusher can aggregate frames as long as the FlushMode they
were sent is AUTO. When a frame that has FlushMode SEND arrives,
it will trigger the flush of the aggregate buffer (and eventually
also other queued frames).
A special BINARY frame is used to implement explicit flush()
invocations.
This commit is contained in:
Simone Bordet 2014-02-18 12:20:00 +01:00
parent 8e5c06b95c
commit 4bdca367dd
42 changed files with 645 additions and 570 deletions

View File

@ -29,7 +29,6 @@ import javax.websocket.SendHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
@ -80,21 +79,19 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
@Override
public void flushBatch() throws IOException
{
BlockingWriteCallback callback = new BlockingWriteCallback();
jettyRemote.sendBytes(BufferUtil.EMPTY_BUFFER, callback);
callback.block();
jettyRemote.flush();
}
@Override
public boolean getBatchingAllowed()
{
return session.isBatching();
return jettyRemote.isBatching();
}
@Override
public void setBatchingAllowed(boolean allowed) throws IOException
{
session.setBatching(allowed);
jettyRemote.setBatching(allowed);
}
@SuppressWarnings(

View File

@ -71,11 +71,10 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
private Map<String, String> pathParameters = new HashMap<>();
private JsrAsyncRemote asyncRemote;
private JsrBasicRemote basicRemote;
private volatile boolean batching;
public JsrSession(URI requestURI, EventDriver websocket, LogicalConnection connection, ClientContainer container, String id, SessionListener... sessionListeners)
{
super(requestURI,websocket,connection,sessionListeners);
super(requestURI, websocket, connection, sessionListeners);
if (!(websocket instanceof AbstractJsrEventDriver))
{
throw new IllegalArgumentException("Cannot use, not a JSR WebSocket: " + websocket);
@ -90,13 +89,12 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
this.messageHandlerFactory = new MessageHandlerFactory();
this.wrappers = new MessageHandlerWrapper[MessageType.values().length];
this.messageHandlerSet = new HashSet<>();
}
@Override
public void addMessageHandler(MessageHandler handler) throws IllegalStateException
{
Objects.requireNonNull(handler,"MessageHandler cannot be null");
Objects.requireNonNull(handler, "MessageHandler cannot be null");
synchronized (wrappers)
{
@ -376,13 +374,8 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
}
@Override
public boolean isBatching()
public boolean getBatchingDefault()
{
return batching;
}
public void setBatching(boolean batching)
{
this.batching = batching;
return false;
}
}

View File

@ -18,9 +18,13 @@
package org.eclipse.jetty.websocket.jsr356;
import java.io.IOException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
/**
@ -33,7 +37,17 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
getRemote().sendBytes(BufferUtil.toBuffer(payload,offset,len),null);
try
{
RemoteEndpoint remote = getRemote();
remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
if (remote.isBatching())
remote.flush();
}
catch (IOException x)
{
throw new RuntimeIOException(x);
}
}
@Override
@ -45,6 +59,16 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override
public void onWebSocketText(String message)
{
getRemote().sendString(message,null);
try
{
RemoteEndpoint remote = getRemote();
remote.sendString(message, null);
if (remote.isBatching())
remote.flush();
}
catch (IOException x)
{
throw new RuntimeIOException(x);
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -74,10 +75,10 @@ public class JettyEchoSocket
}
@OnWebSocketMessage
public void onMessage(String msg)
public void onMessage(String msg) throws IOException
{
incomingMessages.add(msg);
remote.sendString(msg,null);
sendMessage(msg);
}
@OnWebSocketConnect
@ -88,8 +89,10 @@ public class JettyEchoSocket
this.remote = session.getRemote();
}
public void sendMessage(String msg)
public void sendMessage(String msg) throws IOException
{
remote.sendStringByFuture(msg);
if (remote.isBatching())
remote.flush();
}
}

View File

@ -121,4 +121,18 @@ public interface RemoteEndpoint
* callback to notify of success or failure of the write operation
*/
void sendString(String text, WriteCallback callback);
/**
* @return whether the implementation is allowed to batch messages.
* @see #flush()
*/
boolean isBatching();
/**
* Flushes messages that may have been batched by the implementation.
* @throws IOException if the flush fails
* @see #isBatching()
*/
void flush() throws IOException;
}

View File

@ -175,9 +175,4 @@ public interface Session extends Closeable
* @return the suspend token suitable for resuming the reading of data on the connection.
*/
SuspendToken suspend();
/**
* @return true if this session is batching network data, false if it flushes it immediately.
*/
boolean isBatching();
}

View File

@ -41,21 +41,21 @@ public interface OutgoingFrames
void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode);
/**
* The possible flush modes when invoking {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}.
* The possible flush modes when invoking {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}.
*/
public enum FlushMode
{
/**
* Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}
* Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}
* are free to decide whether to flush or not the given frame
* to the network layer.
*/
AUTO,
/**
* Implementers of {@link #outgoingFrame(Frame, org.eclipse.jetty.websocket.api.WriteCallback, org.eclipse.jetty.websocket.api.extensions.OutgoingFrames.FlushMode)}
* must flush the given frame to the network layer.
* Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}
* must send the given frame to the network layer.
*/
FLUSH
SEND
}
}

View File

@ -18,10 +18,8 @@
package org.eclipse.jetty.websocket.client;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.log.Log;
@ -79,10 +77,13 @@ public class ClientWriteThread extends Thread
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
if (remote.isBatching())
remote.flush();
// block on write of last message
lastMessage.get(2,TimeUnit.MINUTES); // block on write
if (lastMessage != null)
lastMessage.get(2,TimeUnit.MINUTES); // block on write
}
catch (InterruptedException | ExecutionException | TimeoutException e)
catch (Exception e)
{
LOG.warn(e);
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
@ -37,6 +35,8 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
public class ServerReadThread extends Thread
{
private static final int BUFFER_SIZE = 8192;
@ -44,13 +44,13 @@ public class ServerReadThread extends Thread
private final ServerConnection conn;
private boolean active = true;
private int slowness = -1; // disabled is default
private AtomicInteger frameCount = new AtomicInteger();
private CountDownLatch expectedMessageCount;
private final AtomicInteger frameCount = new AtomicInteger();
private final CountDownLatch expectedMessageCount;
public ServerReadThread(ServerConnection conn)
public ServerReadThread(ServerConnection conn, int expectedMessages)
{
this.conn = conn;
this.expectedMessageCount = new CountDownLatch(1);
this.expectedMessageCount = new CountDownLatch(expectedMessages);
}
public void cancel()
@ -75,14 +75,12 @@ public class ServerReadThread extends Thread
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
int len = 0;
try
{
while (active)
{
BufferUtil.clearToFill(buf);
len = conn.read(buf);
int len = conn.read(buf);
if (len > 0)
{
@ -108,7 +106,7 @@ public class ServerReadThread extends Thread
}
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
TimeUnit.MILLISECONDS.sleep(getSlowness());
}
}
}
@ -122,11 +120,6 @@ public class ServerReadThread extends Thread
}
}
public void setExpectedMessageCount(int expectedMessageCount)
{
this.expectedMessageCount = new CountDownLatch(expectedMessageCount);
}
public void setSlowness(int slowness)
{
this.slowness = slowness;

View File

@ -18,13 +18,12 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
@ -34,6 +33,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class SessionTest
{
private BlockheadServer server;
@ -81,7 +83,10 @@ public class SessionTest
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getSession().getRemote().sendStringByFuture("Hello World!");
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
if (remote.isBatching())
remote.flush();
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.is;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -36,6 +34,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class SlowClientTest
{
@Rule
@ -79,24 +79,23 @@ public class SlowClientTest
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<Session> future = client.connect(tsocket,wsUri);
Future<Session> future = client.connect(tsocket, wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
future.get(500, TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500, TimeUnit.MILLISECONDS);
int messageCount = 10;
// Setup server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(Integer.MAX_VALUE); // keep reading till I tell you to stop
ServerReadThread reader = new ServerReadThread(sconnection, messageCount);
reader.start();
// Have client write slowly.
int messageCount = 1000;
ClientWriteThread writer = new ClientWriteThread(tsocket.getSession());
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
@ -104,13 +103,15 @@ public class SlowClientTest
writer.start();
writer.join();
reader.waitForExpectedMessageCount(1, TimeUnit.MINUTES);
// Verify receive
Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount));
Assert.assertThat("Frame Receive Count", reader.getFrameCount(), is(messageCount));
// Close
tsocket.getSession().close(StatusCode.NORMAL,"Done");
tsocket.getSession().close(StatusCode.NORMAL, "Done");
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(3,TimeUnit.MINUTES));
Assert.assertTrue("Client Socket Closed", tsocket.closeLatch.await(3, TimeUnit.MINUTES));
tsocket.assertCloseCode(StatusCode.NORMAL);
reader.cancel(); // stop reading

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.is;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -37,6 +35,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class SlowServerTest
{
@Rule
@ -91,11 +91,10 @@ public class SlowServerTest
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
int messageCount = 10; // TODO: increase to 1000
int messageCount = 10;
// Setup slow server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(messageCount);
ServerReadThread reader = new ServerReadThread(sconnection, messageCount);
reader.setSlowness(100); // slow it down
reader.start();

View File

@ -18,11 +18,6 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
@ -33,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
@ -45,6 +41,11 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@RunWith(AdvancedRunner.class)
public class WebSocketClientTest
{
@ -118,7 +119,10 @@ public class WebSocketClientTest
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getSession().getRemote().sendStringByFuture("Hello World!");
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
if (remote.isBatching())
remote.flush();
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);

View File

@ -58,7 +58,7 @@ public class Generator
/**
* The overhead (maximum) for a framing header. Assuming a maximum sized payload with masking key.
*/
public static final int OVERHEAD = 28;
public static final int MAX_HEADER_LENGTH = 28;
private final WebSocketBehavior behavior;
private final ByteBufferPool bufferPool;
@ -196,7 +196,7 @@ public class Generator
// we need a framing header
assertFrameValid(frame);
ByteBuffer buffer = bufferPool.acquire(OVERHEAD,true);
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH,true);
BufferUtil.clearToFill(buffer);
/*

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
import org.eclipse.jetty.websocket.common.frames.PingFrame;
import org.eclipse.jetty.websocket.common.frames.PongFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.FrameFlusher;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
/**
@ -44,7 +45,6 @@ import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
*/
public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
/** Message Type*/
private enum MsgType
{
BLOCKING,
@ -68,21 +68,25 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
};
private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
public final LogicalConnection connection;
public final OutgoingFrames outgoing;
/** JSR-356 blocking send behaviour message and Type sanity to support partial send properly */
private final static int ASYNC_MASK = 0x0000FFFF;
private final static int BLOCK_MASK = 0x00010000;
private final static int STREAM_MASK = 0x00020000;
private final static int PARTIAL_TEXT_MASK= 0x00040000;
private final static int PARTIAL_BINARY_MASK= 0x00080000;
private final static int ASYNC_MASK = 0x0000FFFF;
private final static int BLOCK_MASK = 0x00010000;
private final static int STREAM_MASK = 0x00020000;
private final static int PARTIAL_TEXT_MASK = 0x00040000;
private final static int PARTIAL_BINARY_MASK = 0x00080000;
private final LogicalConnection connection;
private final OutgoingFrames outgoing;
private final AtomicInteger msgState = new AtomicInteger();
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
private volatile boolean batching;
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
this(connection, outgoing, true);
}
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, boolean batching)
{
if (connection == null)
{
@ -90,11 +94,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
this.connection = connection;
this.outgoing = outgoing;
this.batching = batching;
}
private void blockingWrite(WebSocketFrame frame) throws IOException
{
uncheckedSendFrame(frame,blocker);
uncheckedSendFrame(frame, blocker);
blocker.block();
}
@ -107,100 +112,100 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
// Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_TEXT ; Stream -> Pending!!
// Blocking -> Pending!! ; Async -> Pending!! ; Partial -> PARTIAL_BIN ; Stream -> Pending!!
while(true)
while (true)
{
int state = msgState.get();
switch (type)
{
case BLOCKING:
if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0)
throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type));
if ((state&BLOCK_MASK)!=0)
throw new IllegalStateException(String.format("Blocking message pending %x for %s",state,type));
if (msgState.compareAndSet(state,state|BLOCK_MASK))
return state==0;
if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
if ((state & BLOCK_MASK) != 0)
throw new IllegalStateException(String.format("Blocking message pending %x for %s", state, type));
if (msgState.compareAndSet(state, state | BLOCK_MASK))
return state == 0;
break;
case ASYNC:
if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0)
throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type));
if ((state&ASYNC_MASK)==ASYNC_MASK)
throw new IllegalStateException(String.format("Too many async sends: %x",state));
if (msgState.compareAndSet(state,state+1))
return state==0;
if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
if ((state & ASYNC_MASK) == ASYNC_MASK)
throw new IllegalStateException(String.format("Too many async sends: %x", state));
if (msgState.compareAndSet(state, state + 1))
return state == 0;
break;
case STREAMING:
if ((state&(PARTIAL_BINARY_MASK+PARTIAL_TEXT_MASK))!=0)
throw new IllegalStateException(String.format("Partial message pending %x for %s",state,type));
if ((state&STREAM_MASK)!=0)
throw new IllegalStateException(String.format("Already streaming %x for %s",state,type));
if (msgState.compareAndSet(state,state|STREAM_MASK))
return state==0;
if ((state & (PARTIAL_BINARY_MASK + PARTIAL_TEXT_MASK)) != 0)
throw new IllegalStateException(String.format("Partial message pending %x for %s", state, type));
if ((state & STREAM_MASK) != 0)
throw new IllegalStateException(String.format("Already streaming %x for %s", state, type));
if (msgState.compareAndSet(state, state | STREAM_MASK))
return state == 0;
break;
case PARTIAL_BINARY:
if (state==PARTIAL_BINARY_MASK)
if (state == PARTIAL_BINARY_MASK)
return false;
if (state==0)
if (state == 0)
{
if (msgState.compareAndSet(0,state|PARTIAL_BINARY_MASK))
if (msgState.compareAndSet(0, state | PARTIAL_BINARY_MASK))
return true;
}
throw new IllegalStateException(String.format("Cannot send %s in state %x",type,state));
throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
case PARTIAL_TEXT:
if (state==PARTIAL_TEXT_MASK)
if (state == PARTIAL_TEXT_MASK)
return false;
if (state==0)
if (state == 0)
{
if (msgState.compareAndSet(0,state|PARTIAL_TEXT_MASK))
if (msgState.compareAndSet(0, state | PARTIAL_TEXT_MASK))
return true;
}
throw new IllegalStateException(String.format("Cannot send %s in state %x",type,state));
throw new IllegalStateException(String.format("Cannot send %s in state %x", type, state));
}
}
}
private void unlockMsg(MsgType type)
{
while(true)
while (true)
{
int state = msgState.get();
switch (type)
{
case BLOCKING:
if ((state&BLOCK_MASK)==0)
throw new IllegalStateException(String.format("Not Blocking in state %x",state));
if (msgState.compareAndSet(state,state&~BLOCK_MASK))
if ((state & BLOCK_MASK) == 0)
throw new IllegalStateException(String.format("Not Blocking in state %x", state));
if (msgState.compareAndSet(state, state & ~BLOCK_MASK))
return;
break;
case ASYNC:
if ((state&ASYNC_MASK)==0)
throw new IllegalStateException(String.format("Not Async in %x",state));
if (msgState.compareAndSet(state,state-1))
if ((state & ASYNC_MASK) == 0)
throw new IllegalStateException(String.format("Not Async in %x", state));
if (msgState.compareAndSet(state, state - 1))
return;
break;
case STREAMING:
if ((state&STREAM_MASK)==0)
throw new IllegalStateException(String.format("Not Streaming in state %x",state));
if (msgState.compareAndSet(state,state&~STREAM_MASK))
if ((state & STREAM_MASK) == 0)
throw new IllegalStateException(String.format("Not Streaming in state %x", state));
if (msgState.compareAndSet(state, state & ~STREAM_MASK))
return;
break;
case PARTIAL_BINARY:
if (msgState.compareAndSet(PARTIAL_BINARY_MASK,0))
if (msgState.compareAndSet(PARTIAL_BINARY_MASK, 0))
return;
throw new IllegalStateException(String.format("Not Partial Binary in state %x",state));
throw new IllegalStateException(String.format("Not Partial Binary in state %x", state));
case PARTIAL_TEXT:
if (msgState.compareAndSet(PARTIAL_TEXT_MASK,0))
if (msgState.compareAndSet(PARTIAL_TEXT_MASK, 0))
return;
throw new IllegalStateException(String.format("Not Partial Text in state %x",state));
throw new IllegalStateException(String.format("Not Partial Text in state %x", state));
}
}
@ -215,14 +220,13 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
/**
* Internal
*
* @param frame
* the frame to write
* @param frame the frame to write
* @return the future for the network write of the frame
*/
private Future<Void> sendAsyncFrame(WebSocketFrame frame)
{
FutureWriteCallback future = new FutureWriteCallback();
uncheckedSendFrame(frame,future);
uncheckedSendFrame(frame, future);
return future;
}
@ -238,7 +242,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
LOG.debug("sendBytes with {}", BufferUtil.toDetailString(data));
}
blockingWrite(new BinaryFrame().setPayload(data));
}
@ -256,7 +260,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
LOG.debug("sendBytesByFuture with {}", BufferUtil.toDetailString(data));
}
return sendAsyncFrame(new BinaryFrame().setPayload(data));
}
@ -274,9 +278,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
LOG.debug("sendBytes({}, {})", BufferUtil.toDetailString(data), callback);
}
uncheckedSendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
uncheckedSendFrame(new BinaryFrame().setPayload(data), callback == null ? NOOP_CALLBACK : callback);
}
finally
{
@ -288,12 +292,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
try
{
connection.getIOState().assertOutputOpen();
OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.FLUSH;
WebSocketSession session = connection.getSession();
if (session != null && session.isBatching())
OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.SEND;
if (frame.isDataFrame() && isBatching())
flushMode = OutgoingFrames.FlushMode.AUTO;
outgoing.outgoingFrame(frame,callback,flushMode);
connection.getIOState().assertOutputOpen();
outgoing.outgoingFrame(frame, callback, flushMode);
}
catch (IOException e)
{
@ -304,14 +307,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
{
boolean first=lockMsg(MsgType.PARTIAL_BINARY);
boolean first = lockMsg(MsgType.PARTIAL_BINARY);
try
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
LOG.debug("sendPartialBytes({}, {})", BufferUtil.toDetailString(fragment), isLast);
}
DataFrame frame = first?new BinaryFrame():new ContinuationFrame();
DataFrame frame = first ? new BinaryFrame() : new ContinuationFrame();
frame.setPayload(fragment);
frame.setFin(isLast);
blockingWrite(frame);
@ -326,15 +329,15 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendPartialString(String fragment, boolean isLast) throws IOException
{
boolean first=lockMsg(MsgType.PARTIAL_TEXT);
boolean first = lockMsg(MsgType.PARTIAL_TEXT);
try
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialString({}, {})",fragment,isLast);
LOG.debug("sendPartialString({}, {})", fragment, isLast);
}
DataFrame frame = first?new TextFrame():new ContinuationFrame();
frame.setPayload(BufferUtil.toBuffer(fragment,StandardCharsets.UTF_8));
DataFrame frame = first ? new TextFrame() : new ContinuationFrame();
frame.setPayload(BufferUtil.toBuffer(fragment, StandardCharsets.UTF_8));
frame.setFin(isLast);
blockingWrite(frame);
}
@ -350,7 +353,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPing with {}",BufferUtil.toDetailString(applicationData));
LOG.debug("sendPing with {}", BufferUtil.toDetailString(applicationData));
}
sendAsyncFrame(new PingFrame().setPayload(applicationData));
}
@ -360,7 +363,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
if (LOG.isDebugEnabled())
{
LOG.debug("sendPong with {}",BufferUtil.toDetailString(applicationData));
LOG.debug("sendPong with {}", BufferUtil.toDetailString(applicationData));
}
sendAsyncFrame(new PongFrame().setPayload(applicationData));
}
@ -374,7 +377,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
WebSocketFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
LOG.debug("sendString with {}", BufferUtil.toDetailString(frame.getPayload()));
}
blockingWrite(frame);
}
@ -393,7 +396,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
LOG.debug("sendStringByFuture with {}", BufferUtil.toDetailString(frame.getPayload()));
}
return sendAsyncFrame(frame);
}
@ -412,9 +415,37 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
LOG.debug("sendString({},{})", BufferUtil.toDetailString(frame.getPayload()), callback);
}
uncheckedSendFrame(frame,callback==null?NOOP_CALLBACK:callback);
uncheckedSendFrame(frame, callback == null ? NOOP_CALLBACK : callback);
}
finally
{
unlockMsg(MsgType.ASYNC);
}
}
@Override
public boolean isBatching()
{
return batching;
}
// Only the JSR needs to have this method exposed.
// In the Jetty implementation the batching is set
// at the moment of opening the session.
public void setBatching(boolean batching)
{
this.batching = batching;
}
public void flush() throws IOException
{
lockMsg(MsgType.ASYNC);
try
{
uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker);
blocker.block();
}
finally
{

View File

@ -57,7 +57,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private final URI requestURI;
private final EventDriver websocket;
private final LogicalConnection connection;
private final boolean batching;
private final SessionListener[] sessionListeners;
private final Executor executor;
private ExtensionFactory extensionFactory;
@ -71,11 +70,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
private UpgradeResponse upgradeResponse;
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, SessionListener... sessionListeners)
{
this(requestURI, websocket, connection, true, sessionListeners);
}
public WebSocketSession(URI requestURI, EventDriver websocket, LogicalConnection connection, boolean batching, SessionListener... sessionListeners)
{
if (requestURI == null)
{
@ -85,32 +79,30 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
this.requestURI = requestURI;
this.websocket = websocket;
this.connection = connection;
this.batching = batching;
this.sessionListeners = sessionListeners;
this.executor = connection.getExecutor();
this.outgoingHandler = connection;
this.incomingHandler = websocket;
this.connection.getIOState().addListener(this);
}
@Override
public void close()
{
this.close(StatusCode.NORMAL,null);
this.close(StatusCode.NORMAL, null);
}
@Override
public void close(CloseStatus closeStatus)
{
this.close(closeStatus.getCode(),closeStatus.getPhrase());
this.close(closeStatus.getCode(), closeStatus.getPhrase());
}
@Override
public void close(int statusCode, String reason)
{
connection.close(statusCode,reason);
notifyClose(statusCode,reason);
connection.close(statusCode, reason);
notifyClose(statusCode, reason);
}
/**
@ -122,7 +114,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.disconnect();
// notify of harsh disconnect
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect");
}
public void dispatch(Runnable runnable)
@ -133,11 +125,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out,indent);
super.dump(out, indent);
out.append(indent).append(" +- incomingHandler : ");
if (incomingHandler instanceof Dumpable)
{
((Dumpable)incomingHandler).dump(out,indent + " ");
((Dumpable)incomingHandler).dump(out, indent + " ");
}
else
{
@ -147,7 +139,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
out.append(indent).append(" +- outgoingHandler : ");
if (outgoingHandler instanceof Dumpable)
{
((Dumpable)outgoingHandler).dump(out,indent + " ");
((Dumpable)outgoingHandler).dump(out, indent + " ");
}
else
{
@ -280,7 +272,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((connection == null)?0:connection.hashCode());
result = (prime * result) + ((connection == null) ? 0 : connection.hashCode());
return result;
}
@ -335,7 +327,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
public void notifyClose(int statusCode, String reason)
{
websocket.onClose(new CloseInfo(statusCode,reason));
websocket.onClose(new CloseInfo(statusCode, reason));
}
public void notifyError(Throwable cause)
@ -370,9 +362,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
if (ioState.wasAbnormalClose())
{
CloseInfo close = ioState.getCloseInfo();
LOG.debug("Detected abnormal close: {}",close);
LOG.debug("Detected abnormal close: {}", close);
// notify local endpoint
notifyClose(close.getStatusCode(),close.getReason());
notifyClose(close.getStatusCode(), close.getReason());
}
break;
case OPEN:
@ -407,7 +399,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.getIOState().onConnected();
// Connect remote
remote = new WebSocketRemoteEndpoint(connection,outgoingHandler);
remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchingDefault());
// Open WebSocket
websocket.openSession(this);
@ -417,7 +409,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
if (LOG.isDebugEnabled())
{
LOG.debug("open -> {}",dump());
LOG.debug("open -> {}", dump());
}
}
@ -457,11 +449,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
List<String> values = entry.getValue();
if (values != null)
{
this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()]));
}
else
{
this.parameterMap.put(entry.getKey(),new String[0]);
this.parameterMap.put(entry.getKey(), new String[0]);
}
}
}
@ -478,10 +470,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return connection;
}
@Override
public boolean isBatching()
/**
* @return the default (initial) value for the batching mode.
*/
public boolean getBatchingDefault()
{
return batching;
return true;
}
@Override

View File

@ -156,6 +156,32 @@ public abstract class CompressExtension extends AbstractExtension
flusher.iterate();
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
@ -210,16 +236,8 @@ public abstract class CompressExtension extends AbstractExtension
{
Frame frame = entry.frame;
FlushMode flushMode = entry.flushMode;
if (OpCode.isControlFrame(frame.getOpCode()))
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload())
{
// Skip, cannot compress control frames.
nextOutgoingFrame(frame, this, flushMode);
return;
}
if (!frame.hasPayload())
{
// Pass through, nothing to do
nextOutgoingFrame(frame, this, flushMode);
return;
}
@ -323,30 +341,4 @@ public abstract class CompressExtension extends AbstractExtension
notifyCallbackFailure(entry.callback, x);
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
}

View File

@ -63,9 +63,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
private class Flusher extends FrameFlusher
{
private Flusher(Generator generator, EndPoint endpoint)
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
{
super(generator,endpoint);
super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
}
@Override
@ -151,7 +151,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
/**
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
*/
private static final int MIN_BUFFER_SIZE = Generator.OVERHEAD;
private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
private final ByteBufferPool bufferPool;
private final Scheduler scheduler;
@ -178,7 +178,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState();
this.ioState.addListener(this);
this.flusher = new Flusher(generator,endp);
this.flusher = new Flusher(bufferPool,generator,endp);
this.setInputBufferSize(policy.getInputBufferSize());
this.setMaxIdleTimeout(policy.getIdleTimeout());
}
@ -263,11 +263,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
super.fillInterested();
}
public void flush()
{
flusher.flush();
}
@Override
public ByteBufferPool getBufferPool()
{
@ -381,7 +376,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH);
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.SEND);
}
else
{
@ -392,7 +387,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// append close frame
outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.FLUSH);
outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.SEND);
default:
break;
}

View File

@ -19,15 +19,16 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
@ -37,149 +38,103 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
/**
* Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
*/
public class FrameFlusher
{
private static final int MAX_GATHER = Integer.getInteger("org.eclipse.jetty.websocket.common.io.FrameFlusher.MAX_GATHER",8);
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
/** The endpoint to flush to */
private final ByteBufferPool bufferPool;
private final EndPoint endpoint;
/** The websocket generator */
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
private final Flusher flusher = new Flusher();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failure;
/** Backlog of frames */
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
private final FlusherCB flusherCB = new FlusherCB();
/** the buffer input size */
private int bufferSize = 2048;
/** Tracking for failure */
private Throwable failure;
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
private boolean closed;
/**
* Create a WriteBytesProvider with specified Generator and "flush" Callback.
*
* @param generator
* the generator to use for converting {@link Frame} objects to network {@link ByteBuffer}s
* @param endpoint
* the endpoint to flush to.
*/
public FrameFlusher(Generator generator, EndPoint endpoint)
{
this.endpoint=endpoint;
this.generator = Objects.requireNonNull(generator);
}
/**
* Set the buffer size used for generating ByteBuffers from the frames.
* <p>
* Value usually obtained from {@link AbstractConnection#getInputBufferSize()}
*
* @param bufferSize
* the buffer size to use
*/
public void setBufferSize(int bufferSize)
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endpoint = endpoint;
this.bufferSize = bufferSize;
}
public int getBufferSize()
{
return bufferSize;
}
/**
* Force closure of write bytes
*/
public void close()
{
synchronized (lock)
{
if (!closed)
{
closed=true;
EOFException eof = new EOFException("Connection has been disconnected");
flusherCB.failed(eof);
for (FrameEntry frame : queue)
frame.notifyFailed(eof);
queue.clear();
}
}
}
/**
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
*
* @return true if close frame has been enqueued already.
*/
public boolean isClosed()
{
synchronized (lock)
{
return closed;
}
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
}
public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
{
Objects.requireNonNull(frame);
FrameEntry entry = new FrameEntry(frame,callback,flushMode);
LOG.debug("enqueue({})",entry);
Throwable failure=null;
synchronized (lock)
if (closed.get())
{
if (closed)
{
// Closed for more frames.
LOG.debug("Write is closed: {} {}",frame,callback);
failure=new IOException("Write is closed");
}
else if (this.failure!=null)
{
failure=this.failure;
}
switch (frame.getOpCode())
{
case OpCode.PING:
queue.add(0,entry);
break;
case OpCode.CLOSE:
closed=true;
queue.add(entry);
break;
default:
queue.add(entry);
}
notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
return;
}
if (failure != null)
if (flusher.isFailed())
{
// no changes when failed
LOG.debug("Write is in failure: {} {}",frame,callback);
entry.notifyFailed(failure);
notifyCallbackFailure(callback, failure);
return;
}
flush();
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
synchronized (lock)
{
switch (frame.getOpCode())
{
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.add(0, entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.add(entry);
break;
}
default:
{
queue.add(entry);
break;
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} queued {}", this, entry);
flusher.iterate();
}
void flush()
public void close()
{
flusherCB.iterate();
if (closed.compareAndSet(false, true))
{
LOG.debug("{} closing {}", this);
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>();
synchronized (lock)
{
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
for (FrameEntry entry : entries)
notifyCallbackFailure(entry.callback, eof);
}
}
protected void onFailure(Throwable x)
@ -187,173 +142,217 @@ public class FrameFlusher
LOG.warn(x);
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
StringBuilder b = new StringBuilder();
b.append("WriteBytesProvider[");
if (failure != null)
{
b.append("failure=").append(failure.getClass().getName());
b.append(":").append(failure.getMessage()).append(',');
}
else
{
b.append("queue.size=").append(queue.size());
}
b.append(']');
return b.toString();
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
getClass().getSimpleName(),
queue.size(),
aggregate == null ? 0 : aggregate.position(),
failure);
}
private class FlusherCB extends IteratingCallback
private class Flusher extends IteratingCallback
{
private final ArrayQueue<FrameEntry> active = new ArrayQueue<>(lock);
private final List<ByteBuffer> buffers = new ArrayList<>(MAX_GATHER*2);
private final List<FrameEntry> succeeded = new ArrayList<>(MAX_GATHER+1);
@Override
protected void completed()
{
// will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException();
}
private final List<FrameEntry> entries = new ArrayList<>(maxGather);
private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
private ByteBuffer aggregate;
private boolean releaseAggregate;
@Override
protected Action process() throws Exception
{
int space = aggregate == null ? bufferSize : aggregate.remaining();
boolean batch = true;
synchronized (lock)
{
succeeded.clear();
// If we exited the loop above without hitting the gatheredBufferLimit
// then all the active frames are done, so we can add some more.
while (buffers.size()<MAX_GATHER && !queue.isEmpty())
while (entries.size() <= maxGather && !queue.isEmpty())
{
FrameEntry frame = queue.remove(0);
active.add(frame);
buffers.add(frame.getHeaderBytes());
FrameEntry entry = queue.remove(0);
batch &= entry.flushMode == OutgoingFrames.FlushMode.AUTO;
ByteBuffer payload = frame.getPayload();
if (payload!=null)
buffers.add(payload);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
batch = false;
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
batch = false;
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
batch = false;
entries.add(entry);
}
if (LOG.isDebugEnabled())
LOG.debug("process {} active={} buffers={}",FrameFlusher.this,active,buffers);
}
if (buffers.size()==0)
return Action.IDLE;
if (LOG.isDebugEnabled())
LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
if (entries.isEmpty())
{
if (releaseAggregate)
{
bufferPool.release(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} released aggregate buffer {}", FrameFlusher.this, aggregate);
aggregate = null;
}
return Action.IDLE;
}
if (batch)
batch();
else
flush();
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
private void flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
BufferUtil.flipToFlush(aggregate, 0);
buffers.add(aggregate);
releaseAggregate = true;
if (LOG.isDebugEnabled())
LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
}
for (FrameEntry entry : entries)
{
// Skip "synthetic" frames used for flushing.
if (entry.frame == FLUSH_FRAME)
continue;
buffers.add(entry.getHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
buffers.add(payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
}
private void batch()
{
if (aggregate == null)
{
aggregate = bufferPool.acquire(bufferSize, true);
BufferUtil.flipToFill(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
releaseAggregate = false;
}
for (FrameEntry entry : entries)
{
// TODO: would be better to generate the header bytes directly into the aggregate buffer.
ByteBuffer header = entry.getHeaderBytes();
aggregate.put(header);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
aggregate.put(payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
succeeded();
}
@Override
public void succeeded()
{
synchronized (lock)
for (FrameEntry entry : entries)
{
succeeded.addAll(active);
active.clear();
notifyCallbackSuccess(entry.callback);
entry.release();
}
entries.clear();
for (FrameEntry frame:succeeded)
{
frame.notifySucceeded();
frame.freeBuffers();
}
// Do not release the aggregate yet, in case there are more frames to process.
if (releaseAggregate)
BufferUtil.clearToFill(aggregate);
super.succeeded();
}
@Override
protected void completed()
{
// This IteratingCallback never completes.
}
@Override
public void failed(Throwable x)
{
synchronized (lock)
for (FrameEntry entry : entries)
{
succeeded.addAll(active);
active.clear();
notifyCallbackFailure(entry.callback, x);
entry.release();
}
for (FrameEntry frame : succeeded)
{
frame.notifyFailed(x);
frame.freeBuffers();
}
succeeded.clear();
entries.clear();
super.failed(x);
failure = x;
onFailure(x);
}
}
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
private final Frame frame;
private final WriteCallback callback;
private final OutgoingFrames.FlushMode flushMode;
protected final WriteCallback callback;
/** holds reference to header ByteBuffer, as it needs to be released on success/failure */
private ByteBuffer headerBuffer;
public FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
private FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
{
this.frame = frame;
this.frame = Objects.requireNonNull(frame);
this.callback = callback;
this.flushMode = flushMode;
}
public ByteBuffer getHeaderBytes()
private ByteBuffer getHeaderBytes()
{
ByteBuffer buf = generator.generateHeaderBytes(frame);
headerBuffer = buf;
return buf;
return headerBuffer = generator.generateHeaderBytes(frame);
}
public ByteBuffer getPayload()
{
// There is no need to release this ByteBuffer, as it is just a slice of the user provided payload
return frame.getPayload();
}
public void notifyFailed(Throwable t)
{
freeBuffers();
if (failed.getAndSet(true) == false)
{
try
{
if (callback!=null)
callback.writeFailed(t);
}
catch (Throwable e)
{
LOG.warn("Uncaught exception",e);
}
}
}
public void notifySucceeded()
{
freeBuffers();
if (callback == null)
{
return;
}
try
{
callback.writeSuccess();
}
catch (Throwable t)
{
LOG.debug(t);
}
}
public void freeBuffers()
private void release()
{
if (headerBuffer != null)
{
@ -364,7 +363,7 @@ public class FrameFlusher
public String toString()
{
return "["+callback+","+frame+","+failure+"]";
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, flushMode, failure);
}
}
}

View File

@ -43,7 +43,7 @@ public class FramePipes
@Override
public void incomingFrame(Frame frame)
{
this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.FLUSH);
this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.SEND);
}
}

View File

@ -137,7 +137,7 @@ public class MessageOutputStream extends OutputStream
try
{
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH);
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND);
// block on write
blocker.block();
// block success

View File

@ -118,7 +118,7 @@ public class MessageWriter extends Writer
try
{
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.FLUSH);
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND);
// block on write
blocker.block();
// write success

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -41,6 +39,8 @@ import org.eclipse.jetty.websocket.common.util.Hex;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class GeneratorTest
{
private static final Logger LOG = Log.getLogger(GeneratorTest.WindowHelper.class);
@ -64,7 +64,7 @@ public class GeneratorTest
int completeBufSize = 0;
for (Frame f : frames)
{
completeBufSize += Generator.OVERHEAD + f.getPayloadLength();
completeBufSize += Generator.MAX_HEADER_LENGTH + f.getPayloadLength();
}
ByteBuffer completeBuf = ByteBuffer.allocate(completeBufSize);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.MappedByteBufferPool;
@ -37,6 +35,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class WebSocketFrameTest
{
@Rule
@ -47,7 +47,7 @@ public class WebSocketFrameTest
private ByteBuffer generateWholeFrame(Generator generator, Frame frame)
{
ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.OVERHEAD);
ByteBuffer buf = ByteBuffer.allocate(frame.getPayloadLength() + Generator.MAX_HEADER_LENGTH);
generator.generateWholeFrame(frame,buf);
BufferUtil.flipToFlush(buf,0);
return buf;

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -41,6 +39,8 @@ import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class TestABCase2
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@ -288,7 +288,7 @@ public class TestABCase2
byte[] bytes = new byte[126];
Arrays.fill(bytes,(byte)0x00);
ByteBuffer expected = ByteBuffer.allocate(bytes.length + Generator.OVERHEAD);
ByteBuffer expected = ByteBuffer.allocate(bytes.length + Generator.MAX_HEADER_LENGTH);
byte b;

View File

@ -165,7 +165,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
}
// Expected Frames
@ -237,7 +237,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
}
// Expected Frames
@ -294,7 +294,7 @@ public class FragmentExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -81,7 +81,7 @@ public class IdentityExtensionTest
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload("hello");
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT, 1);

View File

@ -127,7 +127,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload(text);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
capture.assertBytes(0, expectedHex);
}
@ -234,9 +234,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
init(ext);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
List<String> actual = capture.getCaptured();
@ -308,8 +308,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.SEND);
capture.assertBytes(0, "c107f248cdc9c90700");
}
@ -430,7 +430,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
BinaryFrame frame = new BinaryFrame();
frame.setPayload(input);
frame.setFin(true);
clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.FLUSH);
clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
Assert.assertArrayEquals(input, result.toByteArray());
}

View File

@ -318,7 +318,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.FLUSH);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -712,7 +712,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
{
frame.setMask(clientmask);
}
extensionStack.outgoingFrame(frame,null,FlushMode.FLUSH);
extensionStack.outgoingFrame(frame,null,FlushMode.SEND);
}
public void writeRaw(ByteBuffer buf) throws IOException

View File

@ -561,7 +561,7 @@ public class BlockheadServer
public void write(Frame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
outgoing.outgoingFrame(frame,null,FlushMode.FLUSH);
outgoing.outgoingFrame(frame,null,FlushMode.SEND);
}
public void write(int b) throws IOException

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
@ -41,6 +39,9 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.junit.Assert;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
/**
* Fuzzing utility for the AB tests.
*/
@ -103,7 +104,7 @@ public class Fuzzer
int buflen = 0;
for (Frame f : send)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH;
}
ByteBuffer buf = ByteBuffer.allocate(buflen);
@ -260,7 +261,7 @@ public class Fuzzer
int buflen = 0;
for (Frame f : send)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH;
}
ByteBuffer buf = ByteBuffer.allocate(buflen);
@ -295,7 +296,7 @@ public class Fuzzer
{
f.setMask(MASK); // make sure we have mask set
// Using lax generator, generate and send
ByteBuffer fullframe = ByteBuffer.allocate(f.getPayloadLength() + Generator.OVERHEAD);
ByteBuffer fullframe = ByteBuffer.allocate(f.getPayloadLength() + Generator.MAX_HEADER_LENGTH);
BufferUtil.clearToFill(fullframe);
generator.generateWholeFrame(f,fullframe);
BufferUtil.flipToFlush(fullframe,0);

View File

@ -64,7 +64,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
{
ByteBuffer buf = ByteBuffer.allocate(Generator.OVERHEAD + frame.getPayloadLength());
ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
generator.generateWholeFrame(frame,buf);
BufferUtil.flipToFlush(buf,0);
captured.add(buf);

View File

@ -61,7 +61,7 @@ public class UnitGenerator extends Generator
int buflen = 0;
for (Frame f : frames)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH;
}
ByteBuffer completeBuf = ByteBuffer.allocate(buflen);
BufferUtil.clearToFill(completeBuf);
@ -96,7 +96,7 @@ public class UnitGenerator extends Generator
int buflen = 0;
for (Frame f : frames)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
buflen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH;
}
ByteBuffer completeBuf = ByteBuffer.allocate(buflen);
BufferUtil.clearToFill(completeBuf);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
@ -32,6 +30,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class FirefoxTest
{
private static SimpleServletServer server;
@ -52,8 +52,7 @@ public class FirefoxTest
@Test
public void testConnectionKeepAlive() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());
try
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
{
// Odd Connection Header value seen in Firefox
client.setConnectionValue("keep-alive, Upgrade");
@ -66,13 +65,9 @@ public class FirefoxTest
client.write(new TextFrame().setPayload(msg));
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
IncomingFramesCapture capture = client.readFrames(1, TimeUnit.MILLISECONDS, 500);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
{
client.close();
Assert.assertThat("Text Frame.status code", tf.getPayloadAsUTF8(), is(msg));
}
}
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -27,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPool;
@ -38,6 +37,8 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class WebSocketOverSSLTest
{
@Rule
@ -84,7 +85,10 @@ public class WebSocketOverSSLTest
// Generate text frame
String msg = "this is an echo ... cho ... ho ... o";
session.getRemote().sendString(msg);
RemoteEndpoint remote = session.getRemote();
remote.sendString(msg);
if (remote.isBatching())
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS);
@ -122,7 +126,10 @@ public class WebSocketOverSSLTest
Session session = fut.get(5,TimeUnit.SECONDS);
// Generate text frame
session.getRemote().sendString("session.isSecure");
RemoteEndpoint remote = session.getRemote();
remote.sendString("session.isSecure");
if (remote.isBatching())
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS);
@ -160,7 +167,10 @@ public class WebSocketOverSSLTest
Session session = fut.get(5,TimeUnit.SECONDS);
// Generate text frame
session.getRemote().sendString("session.upgradeRequest.requestURI");
RemoteEndpoint remote = session.getRemote();
remote.sendString("session.upgradeRequest.requestURI");
if (remote.isBatching())
remote.flush();
// Read frame (hopefully text frame)
clientSocket.messages.awaitEventCount(1,500,TimeUnit.MILLISECONDS);

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
@ -36,6 +34,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.is;
/**
* Testing various aspects of the server side support for WebSocket {@link Session}
*/
@ -61,8 +61,7 @@ public class WebSocketServerSessionTest
public void testDisconnect() throws Exception
{
URI uri = server.getServerUri().resolve("/test/disconnect");
BlockheadClient client = new BlockheadClient(uri);
try
try (BlockheadClient client = new BlockheadClient(uri))
{
client.connect();
client.sendStandardRequest();
@ -70,11 +69,7 @@ public class WebSocketServerSessionTest
client.write(new TextFrame().setPayload("harsh-disconnect"));
client.awaitDisconnect(1,TimeUnit.SECONDS);
}
finally
{
client.close();
client.awaitDisconnect(1, TimeUnit.SECONDS);
}
}
@ -82,8 +77,7 @@ public class WebSocketServerSessionTest
public void testUpgradeRequestResponse() throws Exception
{
URI uri = server.getServerUri().resolve("/test?snack=cashews&amount=handful&brand=off");
BlockheadClient client = new BlockheadClient(uri);
try
try (BlockheadClient client = new BlockheadClient(uri))
{
client.connect();
client.sendStandardRequest();
@ -93,24 +87,19 @@ public class WebSocketServerSessionTest
client.write(new TextFrame().setPayload("getParameterMap|snack"));
client.write(new TextFrame().setPayload("getParameterMap|amount"));
client.write(new TextFrame().setPayload("getParameterMap|brand"));
client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionall invalid
client.write(new TextFrame().setPayload("getParameterMap|cost")); // intentionally invalid
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(4,TimeUnit.MILLISECONDS,500);
IncomingFramesCapture capture = client.readFrames(4, TimeUnit.SECONDS, 5);
Queue<WebSocketFrame> frames = capture.getFrames();
WebSocketFrame tf = frames.poll();
Assert.assertThat("Parameter Map[snack]",tf.getPayloadAsUTF8(),is("[cashews]"));
Assert.assertThat("Parameter Map[snack]", tf.getPayloadAsUTF8(), is("[cashews]"));
tf = frames.poll();
Assert.assertThat("Parameter Map[amount]",tf.getPayloadAsUTF8(),is("[handful]"));
Assert.assertThat("Parameter Map[amount]", tf.getPayloadAsUTF8(), is("[handful]"));
tf = frames.poll();
Assert.assertThat("Parameter Map[brand]",tf.getPayloadAsUTF8(),is("[off]"));
Assert.assertThat("Parameter Map[brand]", tf.getPayloadAsUTF8(), is("[off]"));
tf = frames.poll();
Assert.assertThat("Parameter Map[cost]",tf.getPayloadAsUTF8(),is("<null>"));
}
finally
{
client.close();
Assert.assertThat("Parameter Map[cost]", tf.getPayloadAsUTF8(), is("<null>"));
}
}
}

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.server.examples;
import java.io.IOException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
/**
@ -38,11 +40,14 @@ public class MyEchoSocket extends WebSocketAdapter
try
{
// echo the data back
getRemote().sendString(message);
RemoteEndpoint remote = getRemote();
remote.sendString(message);
if (remote.isBatching())
remote.flush();
}
catch (IOException e)
{
e.printStackTrace();
throw new RuntimeIOException(e);
}
}
}

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.websocket.server.examples.echo;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@ -35,24 +37,30 @@ public class BigEchoSocket
private static final Logger LOG = Log.getLogger(BigEchoSocket.class);
@OnWebSocketMessage
public void onBinary(Session session, byte buf[], int offset, int length)
public void onBinary(Session session, byte buf[], int offset, int length) throws IOException
{
if (!session.isOpen())
{
LOG.warn("Session is closed");
return;
}
session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,length),null);
RemoteEndpoint remote = session.getRemote();
remote.sendBytes(ByteBuffer.wrap(buf, offset, length), null);
if (remote.isBatching())
remote.flush();
}
@OnWebSocketMessage
public void onText(Session session, String message)
public void onText(Session session, String message) throws IOException
{
if (!session.isOpen())
{
LOG.warn("Session is closed");
return;
}
session.getRemote().sendString(message,null);
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
remote.flush();
}
}

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.websocket.server.helper;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
@ -38,13 +40,16 @@ public class EchoSocket
private Session session;
@OnWebSocketMessage
public void onBinary(byte buf[], int offset, int len)
public void onBinary(byte buf[], int offset, int len) throws IOException
{
LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len);
// echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytes(data,null);
RemoteEndpoint remote = this.session.getRemote();
remote.sendBytes(data, null);
if (remote.isBatching())
remote.flush();
}
@OnWebSocketConnect
@ -54,11 +59,14 @@ public class EchoSocket
}
@OnWebSocketMessage
public void onText(String message)
public void onText(String message) throws IOException
{
LOG.debug("onText({})",message);
// echo the message back.
this.session.getRemote().sendString(message,null);
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
remote.flush();
}
}

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.websocket.server.helper;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
@ -35,13 +37,16 @@ public class RFCSocket
private Session session;
@OnWebSocketMessage
public void onBinary(byte buf[], int offset, int len)
public void onBinary(byte buf[], int offset, int len) throws IOException
{
LOG.debug("onBinary(byte[{}],{},{})",buf.length,offset,len);
// echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytes(data,null);
RemoteEndpoint remote = session.getRemote();
remote.sendBytes(data, null);
if (remote.isBatching())
remote.flush();
}
@OnWebSocketConnect
@ -51,7 +56,7 @@ public class RFCSocket
}
@OnWebSocketMessage
public void onText(String message)
public void onText(String message) throws IOException
{
LOG.debug("onText({})",message);
// Test the RFC 6455 close code 1011 that should close
@ -62,6 +67,9 @@ public class RFCSocket
}
// echo the message back.
this.session.getRemote().sendString(message,null);
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
remote.flush();
}
}

View File

@ -18,11 +18,13 @@
package org.eclipse.jetty.websocket.server.helper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
@ -61,7 +63,7 @@ public class SessionSocket
if (values == null)
{
session.getRemote().sendString("<null>",null);
sendString("<null>");
return;
}
@ -78,21 +80,22 @@ public class SessionSocket
delim = true;
}
valueStr.append(']');
session.getRemote().sendString(valueStr.toString(),null);
System.err.println("valueStr = " + valueStr);
sendString(valueStr.toString());
return;
}
if ("session.isSecure".equals(message))
{
String issecure = String.format("session.isSecure=%b",session.isSecure());
session.getRemote().sendString(issecure,null);
sendString(issecure);
return;
}
if ("session.upgradeRequest.requestURI".equals(message))
{
String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString());
session.getRemote().sendString(response,null);
sendString(response);
return;
}
@ -103,11 +106,19 @@ public class SessionSocket
}
// echo the message back.
this.session.getRemote().sendString(message,null);
sendString(message);
}
catch (Throwable t)
{
LOG.warn(t);
}
}
protected void sendString(String text) throws IOException
{
RemoteEndpoint remote = session.getRemote();
remote.sendString(text, null);
if (remote.isBatching())
remote.flush();
}
}