428232 - Rework batch mode / buffering in websocket.

Introduced the automatic batch mode, akin to Jetty 8's WebSocket
implementation.
Now, if there are no more frames to process, and the previous frames
have been aggregated, FrameFlusher auto-flushes the aggregated frames.
This simplifies applications because they don't need to call flush()
explicitly.
This commit is contained in:
Simone Bordet 2014-02-18 18:22:05 +01:00
parent 3240e7383b
commit 1ac6b82912
46 changed files with 517 additions and 148 deletions

View File

@ -29,6 +29,7 @@ import javax.websocket.SendHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.common.WebSocketRemoteEndpoint;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.message.MessageOutputStream;
@ -85,13 +86,15 @@ public abstract class AbstractJsrRemote implements RemoteEndpoint
@Override
public boolean getBatchingAllowed()
{
return jettyRemote.isBatching();
return jettyRemote.getBatchMode() == BatchMode.ON;
}
@Override
public void setBatchingAllowed(boolean allowed) throws IOException
{
jettyRemote.setBatching(allowed);
if (jettyRemote.getBatchMode() == BatchMode.ON && !allowed)
jettyRemote.flush();
jettyRemote.setBatchMode(allowed ? BatchMode.ON : BatchMode.OFF);
}
@SuppressWarnings(

View File

@ -40,6 +40,7 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.SessionListener;
@ -374,8 +375,9 @@ public class JsrSession extends WebSocketSession implements javax.websocket.Sess
}
@Override
public boolean getBatchingDefault()
public BatchMode getBatchMode()
{
return false;
// JSR 356 specification mandates default batch mode to be off.
return BatchMode.OFF;
}
}

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -41,7 +42,7 @@ public class JettyEchoSocket extends WebSocketAdapter
{
RemoteEndpoint remote = getRemote();
remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
catch (IOException x)
@ -63,7 +64,7 @@ public class JettyEchoSocket extends WebSocketAdapter
{
RemoteEndpoint remote = getRemote();
remote.sendString(message, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
catch (IOException x)

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
@ -123,7 +124,7 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
@ -126,7 +127,7 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
callback.writeSuccess();
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
@ -92,7 +93,7 @@ public class JettyEchoSocket
public void sendMessage(String msg) throws IOException
{
remote.sendStringByFuture(msg);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
}

View File

@ -0,0 +1,181 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.jsr356.server;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpointConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.BasicEchoEndpoint;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JsrBatchModeTest
{
private Server server;
private ServerConnector connector;
private WebSocketContainer client;
@Before
public void prepare() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
ServerContainer container = WebSocketServerContainerInitializer.configureContext(context);
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(BasicEchoEndpoint.class, "/").build();
container.addEndpoint(config);
server.start();
client = ContainerProvider.getWebSocketContainer();
server.addBean(client, true);
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testBatchModeOn() throws Exception
{
ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort());
final CountDownLatch latch = new CountDownLatch(1);
EndpointAdapter endpoint = new EndpointAdapter()
{
@Override
public void onMessage(String message)
{
latch.countDown();
}
};
try (Session session = client.connectToServer(endpoint, config, uri))
{
RemoteEndpoint.Async remote = session.getAsyncRemote();
remote.setBatchingAllowed(true);
Future<Void> future = remote.sendText("batch_mode_on");
// The write is aggregated and therefore completes immediately.
future.get(1, TimeUnit.MICROSECONDS);
// Did not flush explicitly, so the message should not be back yet.
Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
// Explicitly flush.
remote.flushBatch();
// Wait for the echo.
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBatchModeOff() throws Exception
{
ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort());
final CountDownLatch latch = new CountDownLatch(1);
EndpointAdapter endpoint = new EndpointAdapter()
{
@Override
public void onMessage(String message)
{
latch.countDown();
}
};
try (Session session = client.connectToServer(endpoint, config, uri))
{
RemoteEndpoint.Async remote = session.getAsyncRemote();
remote.setBatchingAllowed(false);
Future<Void> future = remote.sendText("batch_mode_off");
// The write is immediate.
future.get(1, TimeUnit.SECONDS);
// Wait for the echo.
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testBatchModeAuto() throws Exception
{
ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build();
URI uri = URI.create("ws://localhost:" + connector.getLocalPort());
final CountDownLatch latch = new CountDownLatch(1);
EndpointAdapter endpoint = new EndpointAdapter()
{
@Override
public void onMessage(String message)
{
latch.countDown();
}
};
try (Session session = client.connectToServer(endpoint, config, uri))
{
RemoteEndpoint.Async remote = session.getAsyncRemote();
Future<Void> future = remote.sendText("batch_mode_auto");
// The write is immediate, as per the specification.
future.get(1, TimeUnit.SECONDS);
// Wait for the echo.
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
private static abstract class EndpointAdapter extends Endpoint implements MessageHandler.Whole<String>
{
@Override
public void onOpen(Session session, EndpointConfig config)
{
session.addMessageHandler(this);
}
}
}

View File

@ -0,0 +1,50 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
/**
* The possible batch modes when invoking {@link OutgoingFrames#outgoingFrame(Frame, WriteCallback, BatchMode)}.
*/
public enum BatchMode
{
/**
* Implementers are free to decide whether to send or not frames
* to the network layer.
*/
AUTO,
/**
* Implementers must batch frames.
*/
ON,
/**
* Implementers must send frames to the network layer.
*/
OFF;
public static BatchMode max(BatchMode one, BatchMode two)
{
// Return the BatchMode that has the higher priority, where AUTO < ON < OFF.
return one.ordinal() < two.ordinal() ? two : one;
}
}

View File

@ -123,16 +123,15 @@ public interface RemoteEndpoint
void sendString(String text, WriteCallback callback);
/**
* @return whether the implementation is allowed to batch messages.
* @return the batch mode with which messages are sent.
* @see #flush()
*/
boolean isBatching();
BatchMode getBatchMode();
/**
* Flushes messages that may have been batched by the implementation.
* @throws IOException if the flush fails
* @see #isBatching()
* @see #getBatchMode()
*/
void flush() throws IOException;
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.api.extensions;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
@ -36,26 +37,8 @@ public interface OutgoingFrames
*
* @param frame the frame to eventually write to the network layer.
* @param callback the callback to notify when the frame is written.
* @param flushMode the flush mode required by the sender.
* @param batchMode the batch mode requested by the sender.
*/
void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode);
void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode);
/**
* The possible flush modes when invoking {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}.
*/
public enum FlushMode
{
/**
* Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}
* are free to decide whether to flush or not the given frame
* to the network layer.
*/
AUTO,
/**
* Implementers of {@link #outgoingFrame(Frame, WriteCallback, OutgoingFrames.FlushMode)}
* must send the given frame to the network layer.
*/
SEND
}
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -98,13 +99,13 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
* Override to set the masker.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (frame instanceof WebSocketFrame)
{
masker.setMask((WebSocketFrame)frame);
}
super.outgoingFrame(frame,callback, flushMode);
super.outgoingFrame(frame,callback, batchMode);
}
@Override

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
@ -77,7 +78,7 @@ public class ClientWriteThread extends Thread
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
// block on write of last message
if (lastMessage != null)

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.common.WebSocketSession;
@ -85,7 +86,7 @@ public class SessionTest
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server

View File

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
@ -121,7 +122,7 @@ public class WebSocketClientTest
RemoteEndpoint remote = cliSock.getSession().getRemote();
remote.sendStringByFuture("Hello World!");
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -79,14 +80,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private final OutgoingFrames outgoing;
private final AtomicInteger msgState = new AtomicInteger();
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
private volatile boolean batching;
private volatile BatchMode batchMode;
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
this(connection, outgoing, true);
this(connection, outgoing, BatchMode.AUTO);
}
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, boolean batching)
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, BatchMode batchMode)
{
if (connection == null)
{
@ -94,7 +95,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
this.connection = connection;
this.outgoing = outgoing;
this.batching = batching;
this.batchMode = batchMode;
}
private void blockingWrite(WebSocketFrame frame) throws IOException
@ -292,11 +293,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
try
{
OutgoingFrames.FlushMode flushMode = OutgoingFrames.FlushMode.SEND;
if (frame.isDataFrame() && isBatching())
flushMode = OutgoingFrames.FlushMode.AUTO;
BatchMode batchMode = BatchMode.OFF;
if (frame.isDataFrame())
batchMode = getBatchMode();
connection.getIOState().assertOutputOpen();
outgoing.outgoingFrame(frame, callback, flushMode);
outgoing.outgoingFrame(frame, callback, batchMode);
}
catch (IOException e)
{
@ -426,17 +427,17 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
@Override
public boolean isBatching()
public BatchMode getBatchMode()
{
return batching;
return batchMode;
}
// Only the JSR needs to have this method exposed.
// In the Jetty implementation the batching is set
// at the moment of opening the session.
public void setBatching(boolean batching)
public void setBatchMode(BatchMode batchMode)
{
this.batching = batching;
this.batchMode = batchMode;
}
public void flush() throws IOException
@ -456,6 +457,6 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public String toString()
{
return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), isBatching());
return String.format("%s@%x[batching=%b]", getClass().getSimpleName(), hashCode(), getBatchMode());
}
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
@ -399,7 +400,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.getIOState().onConnected();
// Connect remote
remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchingDefault());
remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode());
// Open WebSocket
websocket.openSession(this);
@ -473,9 +474,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
/**
* @return the default (initial) value for the batching mode.
*/
public boolean getBatchingDefault()
public BatchMode getBatchMode()
{
return true;
return BatchMode.AUTO;
}
@Override

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Extension;
@ -162,10 +163,10 @@ public abstract class AbstractExtension extends ContainerLifeCycle implements Ex
this.nextIncoming.incomingFrame(frame);
}
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
log.debug("nextOutgoingFrame({})",frame);
this.nextOutgoing.outgoingFrame(frame,callback,flushMode);
this.nextOutgoing.outgoingFrame(frame,callback, batchMode);
}
public void setBufferPool(ByteBufferPool bufferPool)

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Extension;
@ -273,9 +274,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -344,13 +345,13 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
this.batchMode = batchMode;
}
@Override
@ -371,7 +372,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
LOG.debug("Processing {}", current);
if (current == null)
return Action.IDLE;
nextOutgoing.outgoingFrame(current.frame, this, current.flushMode);
nextOutgoing.outgoingFrame(current.frame, this, current.batchMode);
return Action.SCHEDULED;
}

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
@ -138,7 +139,7 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
// We use a queue and an IteratingCallback to handle concurrency.
// We must compress and write atomically, otherwise the compression
@ -150,7 +151,7 @@ public abstract class CompressExtension extends AbstractExtension
return;
}
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -192,13 +193,13 @@ public abstract class CompressExtension extends AbstractExtension
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
this.batchMode = batchMode;
}
@Override
@ -235,10 +236,10 @@ public abstract class CompressExtension extends AbstractExtension
private void deflate(FrameEntry entry)
{
Frame frame = entry.frame;
FlushMode flushMode = entry.flushMode;
BatchMode batchMode = entry.batchMode;
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload())
{
nextOutgoingFrame(frame, this, flushMode);
nextOutgoingFrame(frame, this, batchMode);
return;
}
@ -311,7 +312,7 @@ public abstract class CompressExtension extends AbstractExtension
boolean fin = frame.isFin() && finished;
chunk.setFin(fin);
nextOutgoingFrame(chunk, this, entry.flushMode);
nextOutgoingFrame(chunk, this, entry.batchMode);
}
@Override

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -92,14 +93,14 @@ public class PerMessageDeflateExtension extends CompressExtension
}
@Override
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (frame.isFin() && !outgoingContextTakeover)
{
LOG.debug("Outgoing Context Reset");
getDeflater().reset();
}
super.nextOutgoingFrame(frame, callback, flushMode);
super.nextOutgoingFrame(frame, callback, batchMode);
}
@Override

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -57,17 +58,17 @@ public class FragmentExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
ByteBuffer payload = frame.getPayload();
int length = payload != null ? payload.remaining() : 0;
if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength)
{
nextOutgoingFrame(frame, callback, flushMode);
nextOutgoingFrame(frame, callback, batchMode);
return;
}
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
LOG.debug("Queuing {}", entry);
entries.offer(entry);
flusher.iterate();
@ -84,13 +85,13 @@ public class FragmentExtension extends AbstractExtension
{
private final Frame frame;
private final WriteCallback callback;
private final FlushMode flushMode;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, FlushMode flushMode)
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
this.flushMode = flushMode;
this.batchMode = batchMode;
}
@Override
@ -145,7 +146,7 @@ public class FragmentExtension extends AbstractExtension
LOG.debug("Fragmented {}->{}", frame, fragment);
payload.position(newLimit);
nextOutgoingFrame(fragment, this, entry.flushMode);
nextOutgoingFrame(fragment, this, entry.batchMode);
}
@Override

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions.identity;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -56,10 +57,10 @@ public class IdentityExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
// pass through
nextOutgoingFrame(frame,callback, flushMode);
nextOutgoingFrame(frame,callback, batchMode);
}
@Override

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -376,7 +377,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(),FlushMode.SEND);
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
}
else
{
@ -387,7 +388,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// append close frame
outgoingFrame(close.asFrame(),new OnDisconnectCallback(),FlushMode.SEND);
outgoingFrame(close.asFrame(),new OnDisconnectCallback(), BatchMode.OFF);
default:
break;
}
@ -460,14 +461,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Frame from API, User, or Internal implementation destined for network.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (LOG.isDebugEnabled())
{
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
flusher.enqueue(frame,callback,flushMode);
flusher.enqueue(frame,callback, batchMode);
}
private int read(ByteBuffer buffer)

View File

@ -33,9 +33,9 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
@ -68,7 +68,7 @@ public class FrameFlusher
this.maxGather = maxGather;
}
public void enqueue(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (closed.get())
{
@ -81,7 +81,7 @@ public class FrameFlusher
return;
}
FrameEntry entry = new FrameEntry(frame, callback, flushMode);
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
synchronized (lock)
{
@ -185,34 +185,35 @@ public class FrameFlusher
private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
private ByteBuffer aggregate;
private boolean releaseAggregate;
private BatchMode batchMode;
@Override
protected Action process() throws Exception
{
int space = aggregate == null ? bufferSize : aggregate.remaining();
boolean batch = true;
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
while (entries.size() <= maxGather && !queue.isEmpty())
{
FrameEntry entry = queue.remove(0);
batch &= entry.flushMode == OutgoingFrames.FlushMode.AUTO;
currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
batch = false;
currentBatchMode = BatchMode.OFF;
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
batch = false;
currentBatchMode = BatchMode.OFF;
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
batch = false;
currentBatchMode = BatchMode.OFF;
entries.add(entry);
}
@ -223,6 +224,8 @@ public class FrameFlusher
if (entries.isEmpty())
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
if (releaseAggregate)
{
bufferPool.release(aggregate);
@ -230,19 +233,21 @@ public class FrameFlusher
LOG.debug("{} released aggregate buffer {}", FrameFlusher.this, aggregate);
aggregate = null;
}
return Action.IDLE;
if (batchMode != BatchMode.AUTO)
return Action.IDLE;
LOG.debug("{} auto flushing", FrameFlusher.this);
return flush();
}
if (batch)
batch();
else
flush();
batchMode = currentBatchMode;
return Action.SCHEDULED;
return currentBatchMode == BatchMode.OFF ? flush() : batch();
}
@SuppressWarnings("ForLoopReplaceableByForEach")
private void flush()
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
@ -268,12 +273,17 @@ public class FrameFlusher
if (LOG.isDebugEnabled())
LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
if (buffers.isEmpty())
return Action.IDLE;
endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
private void batch()
private Action batch()
{
if (aggregate == null)
{
@ -299,6 +309,7 @@ public class FrameFlusher
if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
succeeded();
return Action.SCHEDULED;
}
@SuppressWarnings("ForLoopReplaceableByForEach")
@ -346,14 +357,14 @@ public class FrameFlusher
{
private final Frame frame;
private final WriteCallback callback;
private final OutgoingFrames.FlushMode flushMode;
private final BatchMode batchMode;
private ByteBuffer headerBuffer;
private FrameEntry(Frame frame, WriteCallback callback, OutgoingFrames.FlushMode flushMode)
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
{
this.frame = Objects.requireNonNull(frame);
this.callback = callback;
this.flushMode = flushMode;
this.batchMode = batchMode;
}
private ByteBuffer getHeaderBytes()
@ -372,7 +383,7 @@ public class FrameFlusher
public String toString()
{
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, flushMode, failure);
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.io;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -43,7 +44,7 @@ public class FramePipes
@Override
public void incomingFrame(Frame frame)
{
this.outgoing.outgoingFrame(frame,null,OutgoingFrames.FlushMode.SEND);
this.outgoing.outgoingFrame(frame,null, BatchMode.OFF);
}
}
@ -57,7 +58,7 @@ public class FramePipes
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
try
{

View File

@ -59,12 +59,15 @@ public class DeMaskProcessor implements PayloadProcessor
maskOffset = offset;
}
public void reset(byte mask[])
public void reset(byte[] mask)
{
this.maskBytes = mask;
int maskInt = 0;
for (byte maskByte : maskBytes)
maskInt = (maskInt << 8) + (maskByte & 0xFF);
if (mask != null)
{
for (byte maskByte : mask)
maskInt = (maskInt << 8) + (maskByte & 0xFF);
}
this.maskInt = maskInt;
this.maskOffset = 0;
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
@ -137,7 +138,7 @@ public class MessageOutputStream extends OutputStream
try
{
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND);
outgoing.outgoingFrame(frame,blocker, BatchMode.OFF);
// block on write
blocker.block();
// block success

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
@ -118,7 +119,7 @@ public class MessageWriter extends Writer
try
{
outgoing.outgoingFrame(frame,blocker,OutgoingFrames.FlushMode.SEND);
outgoing.outgoingFrame(frame,blocker, BatchMode.OFF);
// block on write
blocker.block();
// write success

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -44,7 +45,7 @@ public class DummyOutgoingFrames implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
LOG.debug("outgoingFrame({},{})",frame,callback);
if (callback != null)

View File

@ -27,10 +27,10 @@ import java.util.List;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension;
@ -165,7 +165,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(frame, null, BatchMode.OFF);
}
// Expected Frames
@ -237,7 +237,7 @@ public class FragmentExtensionTest
for (String section : quote)
{
Frame frame = new TextFrame().setPayload(section);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(frame, null, BatchMode.OFF);
}
// Expected Frames
@ -294,7 +294,7 @@ public class FragmentExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(ping, null, BatchMode.OFF);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -23,9 +23,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension;
@ -81,7 +81,7 @@ public class IdentityExtensionTest
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload("hello");
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(frame, null, BatchMode.OFF);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT, 1);

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -31,7 +32,7 @@ public class CapturedHexPayloads implements OutgoingFrames
private List<String> captured = new ArrayList<>();
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
String hexPayload = Hex.asHex(frame.getPayload());
captured.add(hexPayload);

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -127,7 +128,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
ext.setNextOutgoingFrames(capture);
Frame frame = new TextFrame().setPayload(text);
ext.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(frame, null, BatchMode.OFF);
capture.assertBytes(0, expectedHex);
}
@ -234,9 +235,9 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
init(ext);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF);
ext.outgoingFrame(new TextFrame().setPayload("time:"), null, BatchMode.OFF);
List<String> actual = capture.getCaptured();
@ -308,8 +309,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("There"), null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(new TextFrame().setPayload("Hello"), null, BatchMode.OFF);
ext.outgoingFrame(new TextFrame().setPayload("There"), null, BatchMode.OFF);
capture.assertBytes(0, "c107f248cdc9c90700");
}
@ -398,7 +399,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
clientExtension.setNextOutgoingFrames(new OutgoingFrames()
{
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
serverExtension.incomingFrame(frame);
callback.writeSuccess();
@ -430,7 +431,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
BinaryFrame frame = new BinaryFrame();
frame.setPayload(input);
frame.setFin(true);
clientExtension.outgoingFrame(frame, null, OutgoingFrames.FlushMode.SEND);
clientExtension.outgoingFrame(frame, null, BatchMode.OFF);
Assert.assertArrayEquals(input, result.toByteArray());
}

View File

@ -26,10 +26,10 @@ import java.util.List;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest;
@ -318,7 +318,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.outgoingFrame(ping, null, OutgoingFrames.FlushMode.SEND);
ext.outgoingFrame(ping, null, BatchMode.OFF);
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -204,7 +205,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
}

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -467,7 +468,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -712,7 +713,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
{
frame.setMask(clientmask);
}
extensionStack.outgoingFrame(frame,null,FlushMode.SEND);
extensionStack.outgoingFrame(frame,null, BatchMode.OFF);
}
public void writeRaw(ByteBuffer buf) throws IOException

View File

@ -47,6 +47,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -231,7 +232,7 @@ public class BlockheadServer
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -561,7 +562,7 @@ public class BlockheadServer
public void write(Frame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
outgoing.outgoingFrame(frame,null,FlushMode.SEND);
outgoing.outgoingFrame(frame,null, BatchMode.OFF);
}
public void write(int b) throws IOException

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.test;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -85,7 +86,7 @@ public class OutgoingFramesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
frames.add(WebSocketFrame.copy(frame));
if (callback != null)

View File

@ -25,6 +25,7 @@ import java.util.Locale;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -62,7 +63,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, FlushMode flushMode)
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
generator.generateWholeFrame(frame,buf);

View File

@ -0,0 +1,103 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.helper.EchoSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BatchModeTest
{
private Server server;
private ServerConnector connector;
private WebSocketClient client;
@Before
public void prepare() throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
WebSocketHandler handler = new WebSocketHandler()
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.register(EchoSocket.class);
}
};
server.setHandler(handler);
client = new WebSocketClient();
server.addBean(client, true);
server.start();
}
@After
public void dispose() throws Exception
{
server.stop();
}
@Test
public void testBatchModeAuto() throws Exception
{
URI uri = URI.create("ws://localhost:" + connector.getLocalPort());
final CountDownLatch latch = new CountDownLatch(1);
WebSocketAdapter adapter = new WebSocketAdapter()
{
@Override
public void onWebSocketText(String message)
{
latch.countDown();
}
};
try (Session session = client.connect(adapter, uri).get())
{
RemoteEndpoint remote = session.getRemote();
Future<Void> future = remote.sendStringByFuture("batch_mode_on");
// The write is aggregated and therefore completes immediately.
future.get(1, TimeUnit.MICROSECONDS);
// Wait for the echo.
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
// TODO: currently not possible to configure the Jetty WebSocket Session with the batch mode.
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@ -87,7 +88,7 @@ public class WebSocketOverSSLTest
String msg = "this is an echo ... cho ... ho ... o";
RemoteEndpoint remote = session.getRemote();
remote.sendString(msg);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
// Read frame (hopefully text frame)
@ -128,7 +129,7 @@ public class WebSocketOverSSLTest
// Generate text frame
RemoteEndpoint remote = session.getRemote();
remote.sendString("session.isSecure");
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
// Read frame (hopefully text frame)
@ -169,7 +170,7 @@ public class WebSocketOverSSLTest
// Generate text frame
RemoteEndpoint remote = session.getRemote();
remote.sendString("session.upgradeRequest.requestURI");
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
// Read frame (hopefully text frame)

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.server.examples;
import java.io.IOException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@ -42,7 +43,7 @@ public class MyEchoSocket extends WebSocketAdapter
// echo the data back
RemoteEndpoint remote = getRemote();
remote.sendString(message);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
catch (IOException e)

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
@ -46,7 +47,7 @@ public class BigEchoSocket
}
RemoteEndpoint remote = session.getRemote();
remote.sendBytes(ByteBuffer.wrap(buf, offset, length), null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
@ -60,7 +61,7 @@ public class BigEchoSocket
}
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
}

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -48,7 +49,7 @@ public class EchoSocket
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
RemoteEndpoint remote = this.session.getRemote();
remote.sendBytes(data, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
@ -66,7 +67,7 @@ public class EchoSocket
// echo the message back.
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
}

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -45,7 +46,7 @@ public class RFCSocket
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
RemoteEndpoint remote = session.getRemote();
remote.sendBytes(data, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
@ -69,7 +70,7 @@ public class RFCSocket
// echo the message back.
RemoteEndpoint remote = session.getRemote();
remote.sendString(message, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -118,7 +119,7 @@ public class SessionSocket
{
RemoteEndpoint remote = session.getRemote();
remote.sendString(text, null);
if (remote.isBatching())
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
}