Issue #207 - initial pass through for Stream backpressure

This commit is contained in:
Joakim Erdfelt 2017-04-05 17:30:51 -07:00
parent c3bb6ae535
commit fa10576bc6
82 changed files with 837 additions and 777 deletions

View File

@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* Provides a reusable {@link Callback} that can block the thread
@ -168,6 +167,7 @@ public class SharedBlockingCallback
}
else
{
cause.printStackTrace(System.err);
throw new IllegalStateException(_state);
}
}

View File

@ -49,9 +49,11 @@ import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
import org.eclipse.jetty.websocket.common.function.CommonEndpointFunctions;
import org.eclipse.jetty.websocket.common.message.MessageSink;
@ -107,9 +109,9 @@ public class JsrEndpointFunctions extends CommonEndpointFunctions<JsrSession>
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
this.delegateSink.accept(payload, fin);
this.delegateSink.accept(frame, callback);
}
@Override

View File

@ -31,7 +31,9 @@ import javax.websocket.DeploymentException;
import javax.websocket.MessageHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
@ -126,8 +128,10 @@ public class JsrSessionTest
session.open();
session.incomingFrame(new TextFrame().setPayload("G'day").setFin(true));
session.incomingFrame(new TextFrame().setPayload("Hello World").setFin(true));
FrameCallback callback = new FrameCallbackAdapter();
session.incomingFrame(new TextFrame().setPayload("G'day").setFin(true), callback);
session.incomingFrame(new TextFrame().setPayload("Hello World").setFin(true), callback);
assertThat("Received msgs", received.size(), is(2));
assertThat("Received Message[0]", received.get(0), is("G'day"));
@ -155,8 +159,10 @@ public class JsrSessionTest
session.open();
session.incomingFrame(new BinaryFrame().setPayload("G'day").setFin(false));
session.incomingFrame(new ContinuationFrame().setPayload(" World").setFin(true));
FrameCallback callback = new FrameCallbackAdapter();
session.incomingFrame(new BinaryFrame().setPayload("G'day").setFin(false), callback);
session.incomingFrame(new ContinuationFrame().setPayload(" World").setFin(true), callback);
assertThat("Received partial", received.size(), is(2));
assertThat("Received Message[0].buffer", BufferUtil.toUTF8String((ByteBuffer) received.get(0)[0]), is("G'day"));

View File

@ -31,9 +31,10 @@ import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
@ -130,7 +131,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryStreamTest
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamSocket(), (endpoint) ->
{
endpoint.onBinary(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
endpoint.onBinary(new BinaryFrame().setPayload("Hello World").setFin(true), new FrameCallback.Adapter());
return null;
});
socket.assertEvent("onMessage(MessageInputStream) = \"Hello World\"");

View File

@ -34,8 +34,10 @@ import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.ConfiguredEndpoint;
@ -107,7 +109,7 @@ public class JsrEndpointFunctions_OnMessage_BinaryTest
// This invocation is the same for all tests
ByteBuffer byteBuffer = ByteBuffer.wrap("Hello World".getBytes(StandardCharsets.UTF_8));
expectedBuffer = BufferUtil.toDetailString(byteBuffer);
endpointFunctions.onBinary(byteBuffer, true);
endpointFunctions.onBinary(new BinaryFrame().setPayload(byteBuffer).setFin(true), new FrameCallback.Adapter());
socket.assertEvent(String.format(expectedEventFormat, args));
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356.function;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@ -31,9 +30,10 @@ import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
@ -130,7 +130,7 @@ public class JsrEndpointFunctions_OnMessage_TextStreamTest
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamSocket(), (endpoint) ->
{
endpoint.onText(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
endpoint.onText(new TextFrame().setPayload("Hello World").setFin(true), new FrameCallback.Adapter());
return null;
});
socket.assertEvent("onMessage(MessageReader) = \"Hello World\"");

View File

@ -33,8 +33,10 @@ import javax.websocket.OnMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.InvalidSignatureException;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
import org.eclipse.jetty.websocket.jsr356.ConfiguredEndpoint;
@ -101,7 +103,7 @@ public class JsrEndpointFunctions_OnMessage_TextTest
endpointFunctions.onOpen(newSession(socket));
ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8);
endpointFunctions.onText(payload, true);
endpointFunctions.onText(new TextFrame().setPayload(payload).setFin(true), new FrameCallback.Adapter());
}
private void assertOnMessageInvocation(TrackingSocket socket, String expectedEventFormat, Object... args) throws Exception

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@ -32,9 +31,10 @@ import javax.websocket.OnMessage;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
import org.eclipse.jetty.websocket.common.test.DummyConnection;
import org.eclipse.jetty.websocket.jsr356.ClientContainer;
@ -131,7 +131,7 @@ public class JsrServerEndpointFunctions_OnMessage_TextStreamTest
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamSocket(), (endpoint) ->
{
endpoint.onText(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
endpoint.onText(new TextFrame().setPayload("Hello World").setFin(true), new FrameCallback.Adapter());
return null;
});
socket.assertEvent("onMessage(MessageReader) = \"Hello World\"");
@ -161,7 +161,7 @@ public class JsrServerEndpointFunctions_OnMessage_TextStreamTest
{
TrackingSocket socket = performOnMessageInvocation(new MessageStreamParamSocket(), (endpoint) ->
{
endpoint.onText(BufferUtil.toBuffer("Hello World", StandardCharsets.UTF_8), true);
endpoint.onText(new TextFrame().setPayload("Hello World").setFin(true), new FrameCallback.Adapter());
return null;
});
socket.assertEvent("onMessage(MessageReader,foo) = \"Hello World\"");

View File

@ -0,0 +1,53 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.api;
public interface FrameCallback
{
/**
* <p>
* Callback invoked when the frame fails.
* </p>
*
* @param cause the reason for the frame failure
*/
void fail(Throwable cause);
/**
* <p>
* Callback invoked when the frame read/write completes.
* </p>
*
* @see #fail(Throwable)
*/
void succeed();
class Adapter implements FrameCallback
{
@Override
public void fail(Throwable cause)
{
}
@Override
public void succeed()
{
}
}
}

View File

@ -107,7 +107,7 @@ public interface Session extends Closeable
*
* @return the local side address
*/
public InetSocketAddress getLocalAddress();
InetSocketAddress getLocalAddress();
/**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.
@ -136,7 +136,7 @@ public interface Session extends Closeable
*
* @return the remote side address
*/
public InetSocketAddress getRemoteAddress();
InetSocketAddress getRemoteAddress();
/**
* Get the UpgradeRequest used to create this session
@ -157,7 +157,7 @@ public interface Session extends Closeable
*
* @return whether the session is open
*/
abstract boolean isOpen();
boolean isOpen();
/**
* Return true if and only if the underlying socket is using a secure transport.

View File

@ -60,6 +60,9 @@ public class WebSocketPolicy
*/
private int maxTextMessageBufferSize = 32 * KB;
private int maxTextFramePayloadSize; // TODO
private int maxBinaryFramePayloadSize; // TODO
/**
* The maximum size of a binary message during parsing/generating.
* <p>

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.api;
/**
* Callback for Write events.
*/
public interface WriteCallback
public interface WriteCallback extends FrameCallback
{
/*
* NOTE: We don't expose org.eclipse.jetty.util.Callback here as that would complicate matters with the WebAppContext's classloader isolation.
@ -35,7 +35,7 @@ public interface WriteCallback
* @param x
* the reason for the write failure
*/
public void writeFailed(Throwable x);
void writeFailed(Throwable x);
/**
* <p>
@ -44,5 +44,17 @@ public interface WriteCallback
*
* @see #writeFailed(Throwable)
*/
public abstract void writeSuccess();
void writeSuccess();
@Override
default void fail(Throwable cause)
{
writeFailed(cause);
}
@Override
default void succeed()
{
writeSuccess();
}
}

View File

@ -18,12 +18,14 @@
package org.eclipse.jetty.websocket.api.extensions;
import org.eclipse.jetty.websocket.api.FrameCallback;
/**
* Interface for dealing with Incoming Frames.
*/
public interface IncomingFrames
{
public void incomingError(Throwable t);
void incomingError(Throwable t);
/**
* Process the incoming frame.
@ -34,5 +36,29 @@ public interface IncomingFrames
*
* @param frame the frame to process
*/
public void incomingFrame(Frame frame);
// @Deprecated
// void incomingFrame(Frame frame);
/**
* Process the incoming frame.
* <p>
* Note: if you need to hang onto any information from the frame, be sure
* to copy it, as the information contained in the Frame will be released
* and/or reused by the implementation.
*
* @param frame the frame to process
* @param callback the read completion
*/
default void incomingFrame(Frame frame, FrameCallback callback)
{
try
{
//xincomingFrame(frame);
callback.succeed();
}
catch (Throwable e)
{
callback.fail(e);
}
}
}

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.api.extensions;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
/**
* Interface for dealing with frames outgoing to (eventually) the network layer.
@ -39,6 +39,5 @@ public interface OutgoingFrames
* @param callback the callback to notify when the frame is written.
* @param batchMode the batch mode requested by the sender.
*/
void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode);
void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode);
}

View File

@ -565,19 +565,6 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash",response);
}
// We can upgrade
EndPoint endp = oldConn.getEndPoint();
WebSocketClientConnection connection = new WebSocketClientConnection(endp,wsClient.getExecutor(),wsClient.getScheduler(),localEndpoint.getPolicy(),
wsClient.getBufferPool());
URI requestURI = this.getURI();
WebSocketSession session = getSessionFactory().createSession(requestURI,localEndpoint,connection);
session.setUpgradeRequest(new ClientUpgradeRequest(this));
session.setUpgradeResponse(new ClientUpgradeResponse(response));
connection.addListener(session);
ExtensionStack extensionStack = new ExtensionStack(getExtensionFactory());
List<ExtensionConfig> extensions = new ArrayList<>();
HttpField extField = response.getHeaders().getField(HttpHeader.SEC_WEBSOCKET_EXTENSIONS);
@ -598,11 +585,20 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
}
extensionStack.negotiate(extensions);
extensionStack.configure(connection.getParser());
extensionStack.configure(connection.getGenerator());
// We can upgrade
EndPoint endp = oldConn.getEndPoint();
WebSocketClientConnection connection = new WebSocketClientConnection(endp,wsClient.getExecutor(),wsClient.getScheduler(),wsClient.getPolicy(),
wsClient.getBufferPool(), extensionStack);
URI requestURI = this.getURI();
WebSocketSession session = getSessionFactory().createSession(requestURI,localEndpoint,connection);
session.setUpgradeRequest(new ClientUpgradeRequest(this));
session.setUpgradeResponse(new ClientUpgradeResponse(response));
connection.addListener(session);
// Setup Incoming Routing
connection.setNextIncomingFrames(extensionStack);
extensionStack.setNextIncoming(session);
// Setup Outgoing Routing

View File

@ -25,13 +25,13 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
/**
@ -41,9 +41,9 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private final Masker masker;
public WebSocketClientConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy websocketPolicy, ByteBufferPool bufferPool)
public WebSocketClientConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy websocketPolicy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
super(endp,executor,scheduler,websocketPolicy,bufferPool);
super(endp,executor,scheduler,websocketPolicy,bufferPool, extensionStack);
this.masker = new RandomMasker();
}
@ -63,7 +63,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
* Override to set the masker.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
if (frame instanceof WebSocketFrame)
{
@ -71,10 +71,4 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
}
super.outgoingFrame(frame,callback, batchMode);
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
}

View File

@ -451,11 +451,16 @@ public class ClientConnectTest
}
catch (ExecutionException e)
{
assertExpectedError(e, wsocket,
anyOf(
instanceOf(UpgradeException.class),
instanceOf(SocketTimeoutException.class),
instanceOf(ConnectException.class)));
if (OS.IS_WINDOWS)
{
// On windows, this is a SocketTimeoutException
assertExpectedError(e, wsocket, SocketTimeoutException.class);
}
else
{
// Expected path - java.net.ConnectException
assertExpectedError(e, wsocket, ConnectException.class);
}
}
}

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
/**
@ -39,7 +39,7 @@ public class BlockingWriteCallback extends SharedBlockingCallback
return new WriteBlocker(acquire());
}
public static class WriteBlocker implements WriteCallback, Callback, AutoCloseable
public static class WriteBlocker implements FrameCallback, Callback, AutoCloseable
{
private final Blocker blocker;
@ -56,13 +56,13 @@ public class BlockingWriteCallback extends SharedBlockingCallback
}
@Override
public void writeFailed(Throwable x)
public void fail(Throwable cause)
{
blocker.failed(x);
blocker.failed(cause);
}
@Override
public void writeSuccess()
public void succeed()
{
blocker.succeeded();
}

View File

@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.io.IOState;
@ -38,7 +37,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* @see org.eclipse.jetty.websocket.api.StatusCode
* @see #close(int, String)
*/
public void close();
void close();
/**
* Send a websocket Close frame, with status code.
@ -51,7 +50,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* the (optional) reason. (can be null for no reason)
* @see org.eclipse.jetty.websocket.api.StatusCode
*/
public void close(int statusCode, String reason);
void close(int statusCode, String reason);
/**
* Terminate the connection (no close frame sent)
@ -75,7 +74,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return the idle timeout in milliseconds
*/
public long getIdleTimeout();
long getIdleTimeout();
/**
* Get the IOState of the connection.
@ -120,7 +119,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return true if connection is open
*/
public boolean isOpen();
boolean isOpen();
/**
* Tests if the connection is actively reading.
@ -140,16 +139,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
void setMaxIdleTimeout(long ms);
/**
* Set where the connection should send the incoming frames to.
* <p>
* Often this is from the Parser to the start of the extension stack, and eventually on to the session.
*
* @param incoming
* the incoming frames handler
*/
void setNextIncomingFrames(IncomingFrames incoming);
/**
* Suspend a the incoming read events on the connection.
* @return the suspend token
@ -160,5 +149,5 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* Get Unique ID for the Connection
* @return the unique ID for the connection
*/
public String getId();
String getId();
}

View File

@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
@ -48,6 +47,11 @@ import org.eclipse.jetty.websocket.common.io.payload.PayloadProcessor;
*/
public class Parser
{
public interface Handler
{
void onFrame(Frame frame);
}
private enum State
{
START,
@ -61,6 +65,7 @@ public class Parser
private static final Logger LOG = Log.getLogger(Parser.class);
private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool;
private final Parser.Handler parserHandler;
// State specific
private State state = State.START;
@ -86,12 +91,11 @@ public class Parser
*/
private byte flagsInUse=0x00;
private IncomingFrames incomingFramesHandler;
public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool)
public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool, Parser.Handler parserHandler)
{
this.bufferPool = bufferPool;
this.policy = wspolicy;
this.parserHandler = parserHandler;
}
private void assertSanePayloadLength(long len)
@ -124,9 +128,13 @@ public class Parser
}
break;
case OpCode.TEXT:
// Quick failure for frames that already exceed messages size limits
// TODO: based on buffer limits
policy.assertValidTextMessageSize((int)len);
break;
case OpCode.BINARY:
// Quick failure for frames that already exceed messages size limits
// TODO: based on buffer limits
policy.assertValidBinaryMessageSize((int)len);
break;
}
@ -155,11 +163,6 @@ public class Parser
}
}
public IncomingFrames getIncomingFramesHandler()
{
return incomingFramesHandler;
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -182,8 +185,6 @@ public class Parser
protected void notifyFrame(final Frame f)
{
if (LOG.isDebugEnabled())
LOG.debug("{} Notify {}",policy.getBehavior(),getIncomingFramesHandler());
if (policy.getBehavior() == WebSocketBehavior.SERVER)
{
@ -211,33 +212,7 @@ public class Parser
}
}
if (incomingFramesHandler == null)
{
return;
}
try
{
incomingFramesHandler.incomingFrame(f);
}
catch (WebSocketException e)
{
notifyWebSocketException(e);
}
catch (Throwable t)
{
LOG.warn(t);
notifyWebSocketException(new WebSocketException(t));
}
}
protected void notifyWebSocketException(WebSocketException e)
{
LOG.warn(e);
if (incomingFramesHandler == null)
{
return;
}
incomingFramesHandler.incomingError(e);
this.parserHandler.onFrame(f);
}
public void parse(ByteBuffer buffer) throws WebSocketException
@ -261,24 +236,19 @@ public class Parser
reset();
}
}
catch (WebSocketException e)
{
buffer.position(buffer.limit()); // consume remaining
reset();
// let session know
notifyWebSocketException(e);
// need to throw for proper close behavior in connection
throw e;
}
catch (Throwable t)
{
buffer.position(buffer.limit()); // consume remaining
reset();
// let session know
WebSocketException e = new WebSocketException(t);
notifyWebSocketException(e);
// need to throw for proper close behavior in connection
throw e;
WebSocketException wse;
if(t instanceof WebSocketException)
wse = (WebSocketException) t;
else
wse = new WebSocketException(t);
throw wse;
}
}
@ -293,14 +263,10 @@ public class Parser
/**
* Parse the base framing protocol buffer.
* <p>
* Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method
* <p>
* Not overridable
*
* @param buffer
* the buffer to parse from.
* @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol.
* @return true if done parsing a whole frame. false if incomplete/partial parsing of frame.
*/
private boolean parseFrame(ByteBuffer buffer)
{
@ -650,30 +616,16 @@ public class Parser
return false;
}
public void setIncomingFramesHandler(IncomingFrames incoming)
{
this.incomingFramesHandler = incoming;
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("Parser@").append(Integer.toHexString(hashCode()));
builder.append("[");
if (incomingFramesHandler == null)
{
builder.append("NO_HANDLER");
}
else
{
builder.append(incomingFramesHandler.getClass().getSimpleName());
}
builder.append("[").append(policy.getBehavior());
builder.append(",s=").append(state);
builder.append(",c=").append(cursor);
builder.append(",len=").append(payloadLength);
builder.append(",f=").append(frame);
// builder.append(",p=").append(policy);
builder.append("]");
return builder.toString();
}

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -299,7 +300,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
}
public void uncheckedSendFrame(WebSocketFrame frame, WriteCallback callback)
public void uncheckedSendFrame(WebSocketFrame frame, FrameCallback callback)
{
try
{
@ -311,7 +312,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
}
catch (IOException e)
{
callback.writeFailed(e);
callback.fail(e);
}
}

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.util.thread.ThreadClassLoaderScope;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -385,10 +386,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
}
/**
* Incoming Raw Frames from Parser
* Incoming Raw Frames from Parser (after ExtensionStack)
*/
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
ClassLoader old = Thread.currentThread().getContextClassLoader();
try
@ -396,6 +397,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
Thread.currentThread().setContextClassLoader(classLoader);
if (connection.getIOState().isInputAvailable())
{
// For endpoints that want to see raw frames.
// These are immutable.
endpointFunctions.onFrame(frame);
byte opcode = frame.getOpCode();
@ -409,6 +412,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
// process handshake
getConnection().getIOState().onCloseRemote(close);
callback.succeed();
return;
}
@ -430,6 +434,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
}
endpointFunctions.onPing(frame.getPayload());
callback.succeed();
getRemote().sendPong(pongBuf);
break;
@ -440,23 +445,24 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
LOG.debug("PONG: {}", BufferUtil.toDetailString(frame.getPayload()));
endpointFunctions.onPong(frame.getPayload());
callback.succeed();
break;
}
case OpCode.BINARY:
{
endpointFunctions.onBinary(frame.getPayload(), frame.isFin());
endpointFunctions.onBinary(frame, callback);
return;
}
case OpCode.TEXT:
{
endpointFunctions.onText(frame.getPayload(), frame.isFin());
endpointFunctions.onText(frame, callback);
return;
}
case OpCode.CONTINUATION:
{
endpointFunctions.onContinuation(frame.getPayload(), frame.isFin());
endpointFunctions.onContinuation(frame, callback);
if (activeMessageSink != null)
activeMessageSink.accept(frame.getPayload(), frame.isFin());
activeMessageSink.accept(frame, callback);
return;
}
@ -475,6 +481,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
}
catch (NotUtf8Exception e)
{
callback.fail(e);
notifyError(e);
close(StatusCode.BAD_PAYLOAD, e.getMessage());
}
@ -488,6 +495,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
LOG.warn("Unhandled Error (closing connection)", cause);
callback.fail(cause);
notifyError(cause);
// Unhandled Error, close the connection.

View File

@ -29,8 +29,8 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -176,13 +176,13 @@ public abstract class AbstractExtension extends AbstractLifeCycle implements Dum
this.nextIncoming.incomingError(e);
}
protected void nextIncomingFrame(Frame frame)
protected void nextIncomingFrame(Frame frame, FrameCallback callback)
{
log.debug("nextIncomingFrame({})",frame);
this.nextIncoming.incomingFrame(frame);
this.nextIncoming.incomingFrame(frame, callback);
}
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
protected void nextOutgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
log.debug("nextOutgoingFrame({})",frame);
this.nextOutgoing.outgoingFrame(frame,callback, batchMode);

View File

@ -33,8 +33,8 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@ -215,9 +215,9 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
nextIncoming.incomingFrame(frame);
nextIncoming.incomingFrame(frame, callback);
}
/**
@ -287,7 +287,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled())
@ -383,10 +383,10 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
private static class FrameEntry
{
private final Frame frame;
private final WriteCallback callback;
private final FrameCallback callback;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
private FrameEntry(Frame frame, FrameCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
@ -400,7 +400,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
}
private class Flusher extends IteratingCallback implements WriteCallback
private class Flusher extends IteratingCallback implements FrameCallback
{
private FrameEntry current;
@ -435,7 +435,7 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void writeSuccess()
public void succeed()
{
// Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order.
@ -444,23 +444,23 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
@Override
public void writeFailed(Throwable x)
public void fail(Throwable cause)
{
// Notify first, the call succeeded() to drain the queue.
// We don't want to call failed(x) because that will put
// this flusher into a final state that cannot be exited,
// and the failure of a frame may not mean that the whole
// connection is now invalid.
notifyCallbackFailure(current.callback,x);
notifyCallbackFailure(current.callback,cause);
succeeded();
}
private void notifyCallbackSuccess(WriteCallback callback)
private void notifyCallbackSuccess(FrameCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
callback.succeed();
}
catch (Throwable x)
{
@ -468,12 +468,12 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
}
}
private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
private void notifyCallbackFailure(FrameCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
callback.fail(failure);
}
catch (Throwable x)
{

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -65,12 +66,12 @@ public class FrameCaptureExtension extends AbstractExtension
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
saveFrame(frame,false);
try
{
nextIncomingFrame(frame);
nextIncomingFrame(frame, callback);
}
catch (Throwable t)
{
@ -81,7 +82,7 @@ public class FrameCaptureExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
saveFrame(frame,true);
try

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
@ -127,7 +127,7 @@ public abstract class CompressExtension extends AbstractExtension
*/
abstract int getRsvUseMode();
protected void forwardIncoming(Frame frame, ByteAccumulator accumulator)
protected void forwardIncoming(Frame frame, FrameCallback callback, ByteAccumulator accumulator)
{
DataFrame newFrame = new DataFrame(frame);
// Unset RSV1 since it's not compressed anymore.
@ -139,7 +139,7 @@ public abstract class CompressExtension extends AbstractExtension
BufferUtil.flipToFill(buffer);
accumulator.transferTo(buffer);
newFrame.setPayload(buffer);
nextIncomingFrame(newFrame);
nextIncomingFrame(newFrame, callback);
}
finally
{
@ -199,7 +199,7 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
// We use a queue and an IteratingCallback to handle concurrency.
// We must compress and write atomically, otherwise the compression
@ -234,12 +234,12 @@ public abstract class CompressExtension extends AbstractExtension
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
protected void notifyCallbackSuccess(FrameCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
callback.succeed();
}
catch (Throwable x)
{
@ -248,12 +248,12 @@ public abstract class CompressExtension extends AbstractExtension
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
protected void notifyCallbackFailure(FrameCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
callback.fail(failure);
}
catch (Throwable x)
{
@ -390,10 +390,10 @@ public abstract class CompressExtension extends AbstractExtension
private static class FrameEntry
{
private final Frame frame;
private final WriteCallback callback;
private final FrameCallback callback;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
private FrameEntry(Frame frame, FrameCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
@ -407,7 +407,7 @@ public abstract class CompressExtension extends AbstractExtension
}
}
private class Flusher extends IteratingCallback implements WriteCallback
private class Flusher extends IteratingCallback implements FrameCallback
{
private FrameEntry current;
private boolean finished = true;
@ -566,7 +566,7 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
public void writeSuccess()
public void succeed()
{
if (finished)
notifyCallbackSuccess(current.callback);
@ -574,12 +574,12 @@ public abstract class CompressExtension extends AbstractExtension
}
@Override
public void writeFailed(Throwable x)
public void fail(Throwable cause)
{
notifyCallbackFailure(current.callback,x);
notifyCallbackFailure(current.callback,cause);
// If something went wrong, very likely the compression context
// will be invalid, so we need to fail this IteratingCallback.
failed(x);
failed(cause);
}
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.extensions.compress;
import java.util.zip.DataFormatException;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
/**
@ -49,7 +50,7 @@ public class DeflateFrameExtension extends CompressExtension
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
// Incoming frames are always non concurrent because
// they are read and parsed with a single thread, and
@ -57,7 +58,7 @@ public class DeflateFrameExtension extends CompressExtension
if ( frame.getType().isControl() || !frame.isRsv1() || !frame.hasPayload() )
{
nextIncomingFrame(frame);
nextIncomingFrame(frame, callback);
return;
}
@ -66,7 +67,7 @@ public class DeflateFrameExtension extends CompressExtension
ByteAccumulator accumulator = newByteAccumulator();
decompress(accumulator, frame.getPayload());
decompress(accumulator, TAIL_BYTES_BUF.slice());
forwardIncoming(frame, accumulator);
forwardIncoming(frame, callback, accumulator);
}
catch (DataFormatException e)
{

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
@ -52,7 +52,7 @@ public class PerMessageDeflateExtension extends CompressExtension
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
// Incoming frames are always non concurrent because
// they are read and parsed with a single thread, and
@ -67,7 +67,7 @@ public class PerMessageDeflateExtension extends CompressExtension
if (OpCode.isControlFrame(frame.getOpCode()) || !incomingCompressed)
{
nextIncomingFrame(frame);
nextIncomingFrame(frame, callback);
return;
}
@ -82,7 +82,7 @@ public class PerMessageDeflateExtension extends CompressExtension
decompress(accumulator, TAIL_BYTES_BUF.slice());
}
forwardIncoming(frame, accumulator);
forwardIncoming(frame, callback, accumulator);
}
catch (DataFormatException e)
{
@ -94,7 +94,7 @@ public class PerMessageDeflateExtension extends CompressExtension
}
@Override
protected void nextIncomingFrame(Frame frame)
protected void nextIncomingFrame(Frame frame, FrameCallback callback)
{
if (frame.isFin() && !incomingContextTakeover)
{
@ -102,11 +102,11 @@ public class PerMessageDeflateExtension extends CompressExtension
decompressCount.set(0);
getInflater().reset();
}
super.nextIncomingFrame(frame);
super.nextIncomingFrame(frame, callback);
}
@Override
protected void nextOutgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
protected void nextOutgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
if (frame.isFin() && !outgoingContextTakeover)
{

View File

@ -27,7 +27,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
@ -52,13 +52,13 @@ public class FragmentExtension extends AbstractExtension
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
nextIncomingFrame(frame);
nextIncomingFrame(frame, callback);
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
ByteBuffer payload = frame.getPayload();
int length = payload != null ? payload.remaining() : 0;
@ -101,10 +101,10 @@ public class FragmentExtension extends AbstractExtension
private static class FrameEntry
{
private final Frame frame;
private final WriteCallback callback;
private final FrameCallback callback;
private final BatchMode batchMode;
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
private FrameEntry(Frame frame, FrameCallback callback, BatchMode batchMode)
{
this.frame = frame;
this.callback = callback;
@ -118,7 +118,7 @@ public class FragmentExtension extends AbstractExtension
}
}
private class Flusher extends IteratingCallback implements WriteCallback
private class Flusher extends IteratingCallback implements FrameCallback
{
private FrameEntry current;
private boolean finished = true;
@ -182,7 +182,7 @@ public class FragmentExtension extends AbstractExtension
}
@Override
public void writeSuccess()
public void succeed()
{
// Notify first then call succeeded(), otherwise
// write callbacks may be invoked out of order.
@ -191,23 +191,23 @@ public class FragmentExtension extends AbstractExtension
}
@Override
public void writeFailed(Throwable x)
public void fail(Throwable cause)
{
// Notify first, the call succeeded() to drain the queue.
// We don't want to call failed(x) because that will put
// this flusher into a final state that cannot be exited,
// and the failure of a frame may not mean that the whole
// connection is now invalid.
notifyCallbackFailure(current.callback, x);
notifyCallbackFailure(current.callback, cause);
succeeded();
}
private void notifyCallbackSuccess(WriteCallback callback)
private void notifyCallbackSuccess(FrameCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
callback.succeed();
}
catch (Throwable x)
{
@ -216,12 +216,12 @@ public class FragmentExtension extends AbstractExtension
}
}
private void notifyCallbackFailure(WriteCallback callback, Throwable failure)
private void notifyCallbackFailure(FrameCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
callback.fail(failure);
}
catch (Throwable x)
{

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.common.extensions.identity;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
@ -50,14 +50,14 @@ public class IdentityExtension extends AbstractExtension
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
// pass through
nextIncomingFrame(frame);
nextIncomingFrame(frame, callback);
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
// pass through
nextOutgoingFrame(frame,callback, batchMode);

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
@ -702,42 +703,42 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
}
@Override
public void onText(ByteBuffer payload, boolean fin)
public void onText(Frame frame, FrameCallback callback)
{
assertIsStarted();
if (activeMessageSink == null)
activeMessageSink = onTextSink;
acceptMessage(payload, fin);
acceptMessage(frame, callback);
}
@Override
public void onBinary(ByteBuffer payload, boolean fin)
public void onBinary(Frame frame, FrameCallback callback)
{
assertIsStarted();
if (activeMessageSink == null)
activeMessageSink = onBinarySink;
acceptMessage(payload, fin);
acceptMessage(frame, callback);
}
@Override
public void onContinuation(ByteBuffer payload, boolean fin)
public void onContinuation(Frame frame, FrameCallback callback)
{
acceptMessage(payload, fin);
acceptMessage(frame, callback);
}
private void acceptMessage(ByteBuffer payload, boolean fin)
private void acceptMessage(Frame frame, FrameCallback callback)
{
// No message sink is active
if (activeMessageSink == null)
return;
// Accept the payload into the message sink
activeMessageSink.accept(payload, fin);
if (fin)
activeMessageSink.accept(frame, callback);
if (frame.isFin())
activeMessageSink = null;
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.function;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
@ -39,11 +40,11 @@ public interface EndpointFunctions<T> extends LifeCycle
void onError(Throwable cause);
void onText(ByteBuffer payload, boolean fin);
void onText(Frame frame, FrameCallback callback);
void onBinary(ByteBuffer payload, boolean fin);
void onBinary(Frame frame, FrameCallback callback);
void onContinuation(ByteBuffer payload, boolean fin);
void onContinuation(Frame frame, FrameCallback callback);
void onPing(ByteBuffer payload);

View File

@ -42,6 +42,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
@ -54,12 +55,13 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable, Parser.Handler
{
private final AtomicBoolean closed = new AtomicBoolean();
@ -219,6 +221,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private final String id;
private final ExtensionStack extensionStack;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer prefillBuffer;
@ -226,7 +229,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private IOState ioState;
private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
super(endp,executor);
this.id = String.format("%s:%d->%s:%d",
@ -237,8 +240,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.policy = policy;
this.behavior = policy.getBehavior();
this.bufferPool = bufferPool;
this.extensionStack = extensionStack;
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy,bufferPool);
this.parser = new Parser(policy,bufferPool,this);
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.suspendToken = new AtomicBoolean(false);
@ -247,6 +252,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.flusher = new Flusher(bufferPool,policy.getMaxBinaryMessageBufferSize(),generator,endp);
this.setInputBufferSize(policy.getInputBufferSize());
this.setMaxIdleTimeout(policy.getIdleTimeout());
this.extensionStack.setPolicy(this.policy);
this.extensionStack.configure(this.parser);
this.extensionStack.configure(this.generator);
}
@Override
@ -495,6 +504,25 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void onFrame(Frame frame)
{
extensionStack.incomingFrame(frame, new FrameCallback()
{
@Override
public void fail(Throwable cause)
{
// TODO: suspend
}
@Override
public void succeed()
{
// TODO: resume
}
});
}
@Override
public void onFillable()
{
@ -557,7 +585,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private void notifyError(Throwable t)
{
getParser().getIncomingFramesHandler().incomingError(t);
extensionStack.incomingError(t);
}
@Override
@ -606,7 +634,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Frame from API, User, or Internal implementation destined for network.
*/
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
if (LOG.isDebugEnabled())
{
@ -678,6 +706,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
}
}
@ -697,7 +726,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.warn(t);
close(StatusCode.ABNORMAL,t.getMessage());
// TODO: should probably only switch to discard if a non-ws-endpoint error
// TODO: should ws only switch to discard if a non-ws-endpoint error?
return ReadMode.DISCARD;
}
}

View File

@ -34,7 +34,7 @@ import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
@ -249,11 +249,11 @@ public class FrameFlusher
private class FrameEntry
{
private final Frame frame;
private final WriteCallback callback;
private final FrameCallback callback;
private final BatchMode batchMode;
private ByteBuffer headerBuffer;
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
private FrameEntry(Frame frame, FrameCallback callback, BatchMode batchMode)
{
this.frame = Objects.requireNonNull(frame);
this.callback = callback;
@ -332,7 +332,7 @@ public class FrameFlusher
}
}
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
public void enqueue(Frame frame, FrameCallback callback, BatchMode batchMode)
{
if (closed.get())
{
@ -382,13 +382,13 @@ public class FrameFlusher
flusher.iterate();
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
protected void notifyCallbackFailure(FrameCallback callback, Throwable failure)
{
try
{
if (callback != null)
{
callback.writeFailed(failure);
callback.fail(failure);
}
}
catch (Throwable x)
@ -398,13 +398,13 @@ public class FrameFlusher
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
protected void notifyCallbackSuccess(FrameCallback callback)
{
try
{
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}
catch (Throwable x)

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.common.io;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
@ -42,9 +42,9 @@ public class FramePipes
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
this.outgoing.outgoingFrame(frame,null, BatchMode.OFF);
this.outgoing.outgoingFrame(frame, callback, BatchMode.OFF);
}
}
@ -58,17 +58,9 @@ public class FramePipes
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
try
{
this.incoming.incomingFrame(frame);
callback.writeSuccess();
}
catch (Throwable t)
{
callback.writeFailed(t);
}
this.incoming.incomingFrame(frame, callback);
}
}

View File

@ -19,12 +19,13 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class ByteArrayMessageSink implements MessageSink
{
@ -42,12 +43,13 @@ public class ByteArrayMessageSink implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
try
{
if (payload != null)
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
policy.assertValidBinaryMessageSize(size + payload.remaining());
size += payload.remaining();
@ -56,19 +58,25 @@ public class ByteArrayMessageSink implements MessageSink
BufferUtil.writeTo(payload, out);
}
}
catch (IOException e)
{
throw new RuntimeException("Unable to append Binary Message", e);
}
finally
{
if (fin)
if (frame.isFin())
{
if (out != null)
notifyOnMessage(out.toByteArray());
else
notifyOnMessage(EMPTY_BUFFER);
}
callback.succeed();
}
catch (Throwable t)
{
callback.fail(t);
}
finally
{
if (frame.isFin())
{
// reset
out = null;
size = 0;
@ -76,7 +84,7 @@ public class ByteArrayMessageSink implements MessageSink
}
}
protected Object notifyOnMessage(byte buf[])
private Object notifyOnMessage(byte buf[])
{
return onMessageFunction.apply(buf);
}

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.message;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.FrameCallback;
public class FrameCallbackBuffer
{
public ByteBuffer buffer;
public FrameCallback callback;
public FrameCallbackBuffer(FrameCallback callback, ByteBuffer buffer)
{
this.callback = callback;
this.buffer = buffer;
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
@ -27,6 +26,8 @@ import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class InputStreamMessageSink implements MessageSink
{
@ -43,7 +44,7 @@ public class InputStreamMessageSink implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
try
{
@ -55,7 +56,7 @@ public class InputStreamMessageSink implements MessageSink
first = true;
}
stream.accept(payload,fin);
stream.accept(frame, callback);
if (first)
{
dispatchCompleted = new CountDownLatch(1);
@ -83,7 +84,7 @@ public class InputStreamMessageSink implements MessageSink
finally
{
//noinspection Duplicates
if (fin)
if (frame.isFin())
{
if (LOG.isDebugEnabled())
LOG.debug("dispatch complete await() - {}", stream);

View File

@ -27,12 +27,13 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
/**
* Support class for reading a (single) WebSocket BINARY message via a InputStream.
* Support class for reading a WebSocket BINARY message via a InputStream.
* <p>
* An InputStream that can access a queue of ByteBuffer payloads, along with expected InputStream blocking behavior.
* </p>
@ -40,15 +41,15 @@ import org.eclipse.jetty.util.log.Logger;
public class MessageInputStream extends InputStream implements MessageSink
{
private static final Logger LOG = Log.getLogger(MessageInputStream.class);
private static final ByteBuffer EOF = ByteBuffer.allocate(0).asReadOnlyBuffer();
private static final FrameCallbackBuffer EOF = new FrameCallbackBuffer(new FrameCallback.Adapter(), ByteBuffer.allocate(0).asReadOnlyBuffer());
private final BlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
private final BlockingDeque<FrameCallbackBuffer> buffers = new LinkedBlockingDeque<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final long timeoutMs;
private final CountDownLatch closedLatch = new CountDownLatch(1);
private ByteBuffer activeBuffer = null;
private FrameCallbackBuffer activeBuffer = null;
public MessageInputStream()
{
@ -61,16 +62,17 @@ public class MessageInputStream extends InputStream implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Appending {} chunk: {}", fin ? "final" : "non-final", BufferUtil.toDetailString(payload));
LOG.debug("Appending {}", frame);
}
// If closed, we should just toss incoming payloads into the bit bucket.
if (closed.get())
{
callback.fail(new IOException("Already Closed"));
return;
}
@ -79,22 +81,29 @@ public class MessageInputStream extends InputStream implements MessageSink
// be processed after this method returns.
try
{
if (payload == null)
if (!frame.hasPayload())
{
// skip if no payload
callback.succeed();
return;
}
ByteBuffer payload = frame.getPayload();
int capacity = payload.remaining();
if (capacity <= 0)
{
// skip if no payload data to copy
callback.succeed();
return;
}
// TODO: the copy buffer should be pooled too, but no buffer pool available from here.
ByteBuffer copy = payload.isDirect() ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
copy.put(payload).flip();
buffers.put(copy);
buffers.put(new FrameCallbackBuffer(callback,copy));
// TODO: backpressure
}
catch (InterruptedException e)
{
@ -102,7 +111,7 @@ public class MessageInputStream extends InputStream implements MessageSink
}
finally
{
if (fin)
if (frame.isFin())
{
buffers.offer(EOF);
}
@ -145,13 +154,14 @@ public class MessageInputStream extends InputStream implements MessageSink
}
// grab a fresh buffer
while (activeBuffer == null || !activeBuffer.hasRemaining())
while (activeBuffer == null || !activeBuffer.buffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("Waiting {} ms to read", timeoutMs);
if (timeoutMs < 0)
{
// Wait forever until a buffer is available.
// TODO: notify connection to resume (if paused)
activeBuffer = buffers.take();
}
else
@ -177,7 +187,7 @@ public class MessageInputStream extends InputStream implements MessageSink
}
}
return activeBuffer.get() & 0xFF;
return activeBuffer.buffer.get() & 0xFF;
}
catch (InterruptedException x)
{
@ -195,6 +205,8 @@ public class MessageInputStream extends InputStream implements MessageSink
throw new IOException("reset() not supported");
}
// TODO: remove await!
@Deprecated
public void awaitClose()
{
try

View File

@ -19,9 +19,11 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
/**
* Support class for reading a (single) WebSocket TEXT message via a Reader.
* <p>
@ -38,11 +40,13 @@ public class MessageReader extends InputStreamReader implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
this.stream.accept(payload, fin);
this.stream.accept(frame, callback);
}
// TODO: remove await!
@Deprecated
public void awaitClose()
{
stream.awaitClose();

View File

@ -18,23 +18,22 @@
package org.eclipse.jetty.websocket.common.message;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
/**
* Sink consumer for messages (used for multiple frames with continuations,
* and also to allow for streaming APIs)
*/
public interface MessageSink extends BiConsumer<ByteBuffer, Boolean>
public interface MessageSink
{
/**
* Consume the frame payload to the message.
*
* @param payload
* the frame payload to append.
* @param fin
* flag indicating if this is the final part of the message or not.
* @param frame
* the frame, its payload (and fin state) to append
* @param callback
* the callback for how the frame was consumed
*/
@Override
void accept(ByteBuffer payload, Boolean fin);
void accept(Frame frame, FrameCallback callback);
}

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
@ -50,7 +51,7 @@ public class MessageWriter extends Writer
private TextFrame frame;
private ByteBuffer buffer;
private Utf8CharBuffer utf;
private WriteCallback callback;
private FrameCallback callback;
private boolean closed;
public MessageWriter(WebSocketSession session)
@ -199,27 +200,27 @@ public class MessageWriter extends Writer
private void notifySuccess()
{
WriteCallback callback;
FrameCallback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}
private void notifyFailure(Throwable failure)
{
WriteCallback callback;
FrameCallback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.writeFailed(failure);
callback.fail(failure);
}
}
}

View File

@ -18,9 +18,11 @@
package org.eclipse.jetty.websocket.common.message;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class PartialBinaryMessageSink implements MessageSink
{
private final Function<PartialBinaryMessage, Void> onBinaryFunction;
@ -31,8 +33,16 @@ public class PartialBinaryMessageSink implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
onBinaryFunction.apply(new PartialBinaryMessage(payload,fin));
try
{
onBinaryFunction.apply(new PartialBinaryMessage(frame.getPayload(), frame.isFin()));
callback.succeed();
}
catch(Throwable t)
{
callback.fail(t);
}
}
}

View File

@ -18,9 +18,10 @@
package org.eclipse.jetty.websocket.common.message;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.util.Utf8PartialBuilder;
public class PartialTextMessageSink implements MessageSink
@ -35,16 +36,21 @@ public class PartialTextMessageSink implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
String partialText = utf8Partial.toPartialString(payload);
String partialText = utf8Partial.toPartialString(frame.getPayload());
try
{
onTextFunction.apply(new PartialTextMessage(partialText,fin));
onTextFunction.apply(new PartialTextMessage(partialText,frame.isFin()));
callback.succeed();
}
catch(Throwable t)
{
callback.fail(t);
}
finally
{
if (fin)
if (frame.isFin())
utf8Partial.reset();
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.common.message;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Function;
@ -27,6 +26,8 @@ import java.util.function.Function;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class ReaderMessageSink implements MessageSink
{
@ -43,7 +44,7 @@ public class ReaderMessageSink implements MessageSink
}
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
try
{
@ -55,7 +56,7 @@ public class ReaderMessageSink implements MessageSink
first = true;
}
stream.accept(payload, fin);
stream.accept(frame, callback);
if (first)
{
dispatchCompleted = new CountDownLatch(1);
@ -84,7 +85,7 @@ public class ReaderMessageSink implements MessageSink
finally
{
//noinspection Duplicates
if (fin)
if (frame.isFin())
{
if (LOG.isDebugEnabled())
LOG.debug("dispatch complete await() - {}", stream);

View File

@ -25,7 +25,9 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
public class StringMessageSink implements MessageSink
{
@ -45,10 +47,11 @@ public class StringMessageSink implements MessageSink
@SuppressWarnings("Duplicates")
@Override
public void accept(ByteBuffer payload, Boolean fin)
public void accept(Frame frame, FrameCallback callback)
{
if (payload != null)
if (frame.hasPayload())
{
ByteBuffer payload = frame.getPayload();
policy.assertValidTextMessageSize(size + payload.remaining());
size += payload.remaining();
@ -62,13 +65,15 @@ public class StringMessageSink implements MessageSink
utf.append(payload);
}
if (fin)
if (frame.isFin())
{
// notify event
if (utf != null)
onMessageFunction.apply(utf.toString());
else
onMessageFunction.apply("");
callback.succeed();
// reset
size = 0;
utf = null;

View File

@ -53,9 +53,8 @@ public class ClosePayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -0,0 +1,25 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.websocket.api.FrameCallback;
public class FrameCallbackAdapter extends FrameCallback.Adapter
{
}

View File

@ -42,9 +42,8 @@ public class GeneratorParserRoundtripTest
{
WebSocketPolicy policy = WebSocketPolicy.newClientPolicy();
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy,bufferPool);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new Parser(policy,bufferPool,capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
@ -80,9 +79,8 @@ public class GeneratorParserRoundtripTest
public void testParserAndGeneratorMasked() throws Exception
{
Generator gen = new Generator(WebSocketPolicy.newClientPolicy(),bufferPool);
Parser parser = new Parser(WebSocketPolicy.newServerPolicy(),bufferPool);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new Parser(WebSocketPolicy.newServerPolicy(),bufferPool,capture);
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";

View File

@ -297,9 +297,8 @@ public class GeneratorTest
// Parse complete buffer.
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(completeBuffer);

View File

@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -39,10 +40,15 @@ import org.eclipse.jetty.websocket.common.test.UnitGenerator;
import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.eclipse.jetty.websocket.common.util.Hex;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class ParserTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
/**
* Similar to the server side 5.15 testcase. A normal 2 fragment text text message, followed by another continuation.
*/
@ -57,13 +63,12 @@ public class ParserTest
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
ByteBuffer completeBuf = UnitGenerator.generate(send);
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(WebSocketPolicy.newServerPolicy(),capture);
parser.parseQuietly(completeBuf);
expectedException.expect(ProtocolException.class);
parser.parse(completeBuf);
capture.assertErrorCount(1);
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.CONTINUATION,1);
}
@ -80,12 +85,12 @@ public class ParserTest
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
ByteBuffer completeBuf = UnitGenerator.generate(send);
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(completeBuf);
UnitParser parser = new UnitParser(WebSocketPolicy.newServerPolicy(),capture);
expectedException.expect(ProtocolException.class);
parser.parse(completeBuf);
capture.assertErrorCount(1);
capture.assertHasFrame(OpCode.TEXT,1); // fragment 1
}
@ -106,12 +111,10 @@ public class ParserTest
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
ByteBuffer completeBuf = UnitGenerator.generate(send);
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(completeBuf);
UnitParser parser = new UnitParser(WebSocketPolicy.newServerPolicy(),capture);
parser.parse(completeBuf);
capture.assertErrorCount(0);
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.CONTINUATION,4);
capture.assertHasFrame(OpCode.CLOSE,1);
@ -130,12 +133,10 @@ public class ParserTest
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
ByteBuffer completeBuf = UnitGenerator.generate(send);
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(WebSocketPolicy.newServerPolicy(),capture);
parser.parse(completeBuf);
capture.assertErrorCount(0);
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.CLOSE,1);
capture.assertHasFrame(OpCode.PONG,1);
@ -158,7 +159,7 @@ public class ParserTest
byte mini[];
for (int i = 0; i < len; i++)
{
DataFrame frame = null;
DataFrame frame;
if (continuation)
{
frame = new ContinuationFrame();
@ -180,12 +181,10 @@ public class ParserTest
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
ByteBuffer completeBuf = UnitGenerator.generate(send);
UnitParser parser = new UnitParser();
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(WebSocketPolicy.newServerPolicy(),capture);
parser.parse(completeBuf);
capture.assertErrorCount(0);
capture.assertHasFrame(OpCode.TEXT,textCount);
capture.assertHasFrame(OpCode.CONTINUATION,continuationCount);
capture.assertHasFrame(OpCode.CLOSE,1);
@ -199,9 +198,9 @@ public class ParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -227,9 +226,8 @@ public class ParserTest
// Parse, in 4096 sized windows
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
while (networkBytes.remaining() > 0)
{

View File

@ -43,9 +43,8 @@ public class PingPayloadParserTest
BufferUtil.flipToFlush(buf,0);
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -40,9 +40,8 @@ public class RFC6455ExamplesParserTest
public void testFragmentedUnmaskedTextMessage()
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
ByteBuffer buf = ByteBuffer.allocate(16);
BufferUtil.clearToFill(buf);
@ -88,9 +87,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -112,9 +110,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -143,9 +140,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -182,9 +178,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -213,9 +208,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
@ -237,9 +231,8 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
UnitParser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();

View File

@ -35,10 +35,15 @@ import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.eclipse.jetty.websocket.common.util.MaskedByteBuffer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TextPayloadParserTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testFrameTooLargeDueToPolicy() throws Exception
{
@ -60,12 +65,12 @@ public class TextPayloadParserTest
MaskedByteBuffer.putPayload(buf,utf);
buf.flip();
UnitParser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(buf);
UnitParser parser = new UnitParser(policy,capture);
expectedException.expect(MessageTooLargeException.class);
parser.parse(buf);
capture.assertHasErrors(MessageTooLargeException.class,1);
capture.assertHasNoFrames();
MessageTooLargeException err = (MessageTooLargeException)capture.getErrors().poll();
@ -75,7 +80,7 @@ public class TextPayloadParserTest
@Test
public void testLongMaskedText() throws Exception
{
StringBuffer sb = new StringBuffer(); ;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3500; i++)
{
sb.append("Hell\uFF4f Big W\uFF4Frld ");
@ -97,12 +102,10 @@ public class TextPayloadParserTest
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
policy.setMaxTextMessageSize(100000);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.TEXT,1);
WebSocketFrame txt = capture.getFrames().poll();
Assert.assertThat("TextFrame.data",txt.getPayloadAsUTF8(),is(expectedText));
@ -111,7 +114,7 @@ public class TextPayloadParserTest
@Test
public void testMediumMaskedText() throws Exception
{
StringBuffer sb = new StringBuffer(); ;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 14; i++)
{
sb.append("Hell\uFF4f Medium W\uFF4Frld ");
@ -132,12 +135,10 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.TEXT,1);
WebSocketFrame txt = capture.getFrames().poll();
Assert.assertThat("TextFrame.data",txt.getPayloadAsUTF8(),is(expectedText));
@ -169,12 +170,10 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.TEXT,1);
capture.assertHasFrame(OpCode.CONTINUATION,1);
WebSocketFrame txt = capture.getFrames().poll();
@ -197,12 +196,10 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.TEXT,1);
WebSocketFrame txt = capture.getFrames().poll();
Assert.assertThat("TextFrame.data",txt.getPayloadAsUTF8(),is(expectedText));
@ -223,12 +220,10 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(buf);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.TEXT,1);
WebSocketFrame txt = capture.getFrames().poll();
Assert.assertThat("TextFrame.data",txt.getPayloadAsUTF8(),is(expectedText));

View File

@ -305,9 +305,8 @@ public class TestABCase1_1
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -339,9 +338,8 @@ public class TestABCase1_1
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -373,9 +371,8 @@ public class TestABCase1_1
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -407,9 +404,8 @@ public class TestABCase1_1
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -443,9 +439,8 @@ public class TestABCase1_1
expected.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
policy.setMaxTextMessageSize(length);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -480,9 +475,8 @@ public class TestABCase1_1
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
policy.setMaxTextMessageSize(length);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -504,9 +498,8 @@ public class TestABCase1_1
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -324,9 +324,8 @@ public class TestABCase1_2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -358,9 +357,8 @@ public class TestABCase1_2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -392,9 +390,8 @@ public class TestABCase1_2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -426,9 +423,8 @@ public class TestABCase1_2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -461,9 +457,8 @@ public class TestABCase1_2
expected.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
policy.setMaxBinaryMessageSize(length);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -498,9 +493,8 @@ public class TestABCase1_2
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
policy.setMaxBinaryMessageSize(length);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
@ -522,9 +516,8 @@ public class TestABCase1_2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();

View File

@ -39,11 +39,16 @@ import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestABCase2
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@Rule
public ExpectedException expectedException = ExpectedException.none();
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@Test
public void testGenerate125OctetPingCase2_4()
@ -185,12 +190,10 @@ public class TestABCase2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.PING,1);
Frame pActual = capture.getFrames().poll();
@ -215,12 +218,10 @@ public class TestABCase2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.PING,1);
Frame pActual = capture.getFrames().poll();
@ -238,12 +239,10 @@ public class TestABCase2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.PING,1);
Frame pActual = capture.getFrames().poll();
@ -269,12 +268,10 @@ public class TestABCase2
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.PING,1);
Frame pActual = capture.getFrames().poll();
@ -312,12 +309,11 @@ public class TestABCase2
expected.flip();
UnitParser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(expected);
UnitParser parser = new UnitParser(policy,capture);
Assert.assertEquals("error should be returned for too large of ping payload",1,capture.getErrorCount(ProtocolException.class));
expectedException.expect(ProtocolException.class);
parser.parse(expected);
}
}

View File

@ -18,21 +18,25 @@
package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.CoreMatchers.containsString;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestABCase4
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@Test
@ -45,26 +49,11 @@ public class TestABCase4
expected.flip();
IncomingFramesCapture capture = new IncomingFramesCapture();
Parser parser = new UnitParser(policy,capture);
try (StacklessLogging logging = new StacklessLogging(Parser.class))
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 11"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(containsString("Unknown opcode: 11"));
parser.parse(expected);
}
@Test
@ -77,26 +66,11 @@ public class TestABCase4
expected.flip();
IncomingFramesCapture capture = new IncomingFramesCapture();
Parser parser = new UnitParser(policy, capture);
try (StacklessLogging logging = new StacklessLogging(Parser.class))
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(containsString("Unknown opcode: 12"));
parser.parse(expected);
}
@Test
@ -109,26 +83,11 @@ public class TestABCase4
expected.flip();
IncomingFramesCapture capture = new IncomingFramesCapture();
Parser parser = new UnitParser(policy, capture);
try (StacklessLogging logging = new StacklessLogging(Parser.class))
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 3"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(containsString("Unknown opcode: 3"));
parser.parse(expected);
}
@Test
@ -141,25 +100,10 @@ public class TestABCase4
expected.flip();
IncomingFramesCapture capture = new IncomingFramesCapture();
Parser parser = new UnitParser(policy,capture);
try (StacklessLogging logging = new StacklessLogging(Parser.class))
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 4"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(containsString("Unknown opcode: 4"));
parser.parse(expected);
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
@ -39,12 +38,18 @@ import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
import org.eclipse.jetty.websocket.common.test.UnitParser;
import org.eclipse.jetty.websocket.common.util.Hex;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestABCase7_3
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@Rule
public ExpectedException expectedException = ExpectedException.none();
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
@Test
public void testCase7_3_1GenerateEmptyClose()
@ -73,20 +78,16 @@ public class TestABCase7_3
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.CLOSE,1);
Frame pActual = capture.getFrames().poll();
Assert.assertThat("CloseFrame.payloadLength",pActual.getPayloadLength(),is(0));
}
@Test(expected = ProtocolException.class)
public void testCase7_3_2Generate1BytePayloadClose()
{
@ -101,16 +102,12 @@ public class TestABCase7_3
{
ByteBuffer expected = Hex.asByteBuffer("880100");
UnitParser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(expected);
UnitParser parser = new UnitParser(policy,capture);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
ProtocolException known = (ProtocolException)capture.getErrors().poll();
Assert.assertThat("Payload.message",known.getMessage(),containsString("Invalid close frame payload length"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(CoreMatchers.containsString("Invalid close frame payload length"));
parser.parse(expected);
}
@Test
@ -140,20 +137,16 @@ public class TestABCase7_3
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.CLOSE,1);
Frame pActual = capture.getFrames().poll();
Assert.assertThat("CloseFrame.payloadLength",pActual.getPayloadLength(),is(2));
}
@Test
public void testCase7_3_4GenerateCloseWithStatusReason()
{
@ -197,17 +190,14 @@ public class TestABCase7_3
expected.put(messageBytes);
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.CLOSE,1);
Frame pActual = capture.getFrames().poll();
Assert.assertThat("CloseFrame.payloadLength",pActual.getPayloadLength(),is(messageBytes.length + 2));
}
@ -266,17 +256,14 @@ public class TestABCase7_3
expected.put(messageBytes);
expected.flip();
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
Parser parser = new UnitParser(policy,capture);
parser.parse(expected);
capture.assertNoErrors();
capture.assertHasFrame(OpCode.CLOSE,1);
Frame pActual = capture.getFrames().poll();
Assert.assertThat("CloseFrame.payloadLength",pActual.getPayloadLength(),is(125));
}
@Test(expected = ProtocolException.class)
@ -335,15 +322,11 @@ public class TestABCase7_3
expected.flip();
UnitParser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parseQuietly(expected);
UnitParser parser = new UnitParser(policy,capture);
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
ProtocolException known = (ProtocolException)capture.getErrors().poll();
Assert.assertThat("Payload.message",known.getMessage(),containsString("Invalid control frame payload length"));
expectedException.expect(ProtocolException.class);
expectedException.expectMessage(CoreMatchers.containsString("Invalid control frame payload length"));
parser.parse(expected);
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.extensions;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -43,7 +44,7 @@ public class DummyIncomingFrames implements IncomingFrames
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
LOG.debug("incomingFrame({})",frame);
}

View File

@ -21,7 +21,7 @@ package org.eclipse.jetty.websocket.common.extensions;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.junit.rules.TestName;
@ -45,12 +45,12 @@ public class DummyOutgoingFrames implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
LOG.debug("outgoingFrame({},{})",frame,callback);
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
@ -58,7 +59,8 @@ public class ExtensionTool
Class<?> extClass = factory.getExtension(extConfig.getName());
Assert.assertThat("extClass", extClass, notNullValue());
this.parser = new UnitParser(policy);
this.capture = new IncomingFramesCapture();
this.parser = new UnitParser(policy,frame -> ext.incomingFrame(frame, new FrameCallbackAdapter()));
}
public String getRequestedExtParams()
@ -68,13 +70,10 @@ public class ExtensionTool
public void assertNegotiated(String expectedNegotiation)
{
this.ext = (Extension)factory.newInstance(extConfig);
this.capture = new IncomingFramesCapture();
this.ext = factory.newInstance(extConfig);
this.ext.setNextIncomingFrames(capture);
this.parser.configureFromExtensions(Collections.singletonList(ext));
this.parser.setIncomingFramesHandler(ext);
}
public void parseIncomingHex(String... rawhex)

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension;
@ -77,7 +78,7 @@ public class FragmentExtensionTest
for (String q : quote)
{
Frame frame = new TextFrame().setPayload(q);
ext.incomingFrame(frame);
ext.incomingFrame(frame, new FrameCallbackAdapter());
}
int len = quote.size();
@ -121,7 +122,7 @@ public class FragmentExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.incomingFrame(ping);
ext.incomingFrame(ping, new FrameCallbackAdapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension;
@ -52,7 +53,7 @@ public class IdentityExtensionTest
ext.setNextIncomingFrames(capture);
Frame frame = new TextFrame().setPayload("hello");
ext.incomingFrame(frame);
ext.incomingFrame(frame, new FrameCallbackAdapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT, 1);

View File

@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.util.Hex;
@ -32,13 +32,13 @@ public class CapturedHexPayloads implements OutgoingFrames
private List<String> captured = new ArrayList<>();
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
String hexPayload = Hex.asHex(frame.getPayload());
captured.add(hexPayload);
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}

View File

@ -39,12 +39,13 @@ import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
@ -86,10 +87,8 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
// Wire up stack
ext.setNextIncomingFrames(capture);
Parser parser = new UnitParser(policy);
Parser parser = new UnitParser(policy, frame -> ext.incomingFrame(frame, new FrameCallbackAdapter()));
parser.configureFromExtensions(Collections.singletonList(ext));
parser.setIncomingFramesHandler(ext);
parser.parse(ByteBuffer.wrap(raw));
int len = expectedTextDatas.length;
@ -410,11 +409,11 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
clientExtension.setNextOutgoingFrames(new OutgoingFrames()
{
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
LOG.debug("outgoingFrame({})", frame);
serverExtension.incomingFrame(frame);
callback.writeSuccess();
serverExtension.incomingFrame(frame, callback);
callback.succeed();
}
});
@ -422,7 +421,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
serverExtension.setNextIncomingFrames(new IncomingFrames()
{
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
LOG.debug("incomingFrame({})", frame);
try

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest;
@ -245,7 +246,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.incomingFrame(ping);
ext.incomingFrame(ping, new FrameCallbackAdapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);
@ -291,7 +292,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
TextFrame frame = new TextFrame().setPayload(q);
frame.setRsv1(false); // indication to extension that frame is not compressed (ie: a normal frame)
ext.incomingFrame(frame);
ext.incomingFrame(frame, new FrameCallbackAdapter());
}
int len = quote.size();

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
@ -41,6 +42,8 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -108,7 +111,7 @@ public class CommonEndpointFunctionsTest
{
// Trigger Events
endpointFunctions.onOpen(session);
endpointFunctions.onText(BufferUtil.toBuffer("Hello?", UTF8), true);
endpointFunctions.onText(new TextFrame().setPayload("Hello?").setFin(true), new FrameCallback.Adapter());
endpointFunctions.onClose(new CloseInfo(StatusCode.NORMAL, "Normal"));
}
@ -143,7 +146,7 @@ public class CommonEndpointFunctionsTest
{
// Trigger Events
endpointFunctions.onOpen(session);
endpointFunctions.onText(BufferUtil.toBuffer("Hello Text", UTF8), true);
endpointFunctions.onText(new TextFrame().setPayload("Hello Text").setFin(true), new FrameCallback.Adapter());
endpointFunctions.onClose(new CloseInfo(StatusCode.NORMAL, "Normal"));
}
@ -188,7 +191,7 @@ public class CommonEndpointFunctionsTest
{
// Trigger Events
endpointFunctions.onOpen(session);
endpointFunctions.onText(BufferUtil.toBuffer("Hello Text Stream", UTF8), true);
endpointFunctions.onText(new TextFrame().setPayload("Hello Text Stream").setFin(true), new FrameCallback.Adapter());
endpointFunctions.onClose(new CloseInfo(StatusCode.NORMAL, "Normal"));
}
@ -209,10 +212,10 @@ public class CommonEndpointFunctionsTest
{
// Trigger Events
endpointFunctions.onOpen(session);
endpointFunctions.onText(BufferUtil.toBuffer("Hel"), false);
endpointFunctions.onText(BufferUtil.toBuffer("lo "), false);
endpointFunctions.onText(BufferUtil.toBuffer("Wor"), false);
endpointFunctions.onText(BufferUtil.toBuffer("ld"), true);
endpointFunctions.onText(new TextFrame().setPayload("Hel").setFin(false), new FrameCallback.Adapter());
endpointFunctions.onText(new ContinuationFrame().setPayload("lo ").setFin(false), new FrameCallback.Adapter());
endpointFunctions.onText(new ContinuationFrame().setPayload("Wor").setFin(false), new FrameCallback.Adapter());
endpointFunctions.onText(new ContinuationFrame().setPayload("ld").setFin(true), new FrameCallback.Adapter());
endpointFunctions.onClose(new CloseInfo(StatusCode.NORMAL, "Normal"));
}
@ -248,9 +251,9 @@ public class CommonEndpointFunctionsTest
{
// Trigger Events
endpointFunctions.onOpen(session);
endpointFunctions.onText(BufferUtil.toBuffer("Hello"), false);
endpointFunctions.onText(BufferUtil.toBuffer(" "), false);
endpointFunctions.onText(BufferUtil.toBuffer("World"), true);
endpointFunctions.onText(new TextFrame().setPayload("Hello").setFin(false), new FrameCallback.Adapter());
endpointFunctions.onText(new ContinuationFrame().setPayload(" ").setFin(false), new FrameCallback.Adapter());
endpointFunctions.onText(new ContinuationFrame().setPayload("World").setFin(true), new FrameCallback.Adapter());
endpointFunctions.onClose(new CloseInfo(StatusCode.NORMAL, "Normal"));
}

View File

@ -26,12 +26,11 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
@ -39,14 +38,13 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.junit.rules.TestName;
public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames, ConnectionStateListener
public class LocalWebSocketConnection implements LogicalConnection, ConnectionStateListener
{
private static final Logger LOG = Log.getLogger(LocalWebSocketConnection.class);
private final String id;
private final ByteBufferPool bufferPool;
private final Executor executor;
private WebSocketPolicy policy;
private IncomingFrames incoming;
private IOState ioState = new IOState();
public LocalWebSocketConnection(ByteBufferPool bufferPool)
@ -127,11 +125,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return 0;
}
public IncomingFrames getIncoming()
{
return incoming;
}
@Override
public IOState getIOState()
{
@ -162,18 +155,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return null;
}
@Override
public void incomingError(Throwable e)
{
incoming.incomingError(e);
}
@Override
public void incomingFrame(Frame frame)
{
incoming.incomingFrame(frame);
}
@Override
public boolean isOpen()
{
@ -217,7 +198,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
}
@ -231,12 +212,6 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
{
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
this.incoming = incoming;
}
public void setPolicy(WebSocketPolicy policy)
{
this.policy = policy;

View File

@ -27,7 +27,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
import org.junit.Assert;
import org.junit.Rule;
@ -48,9 +52,10 @@ public class MessageInputStreamTest
try (MessageInputStream stream = new MessageInputStream())
{
// Append a single message (simple, short)
ByteBuffer payload = BufferUtil.toBuffer("Hello World",StandardCharsets.UTF_8);
boolean fin = true;
stream.accept(payload,fin);
TextFrame frame = new TextFrame();
frame.setPayload("Hello World");
frame.setFin(true);
stream.accept(frame, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];
@ -80,14 +85,12 @@ public class MessageInputStreamTest
try
{
startLatch.countDown();
boolean fin = false;
TimeUnit.MILLISECONDS.sleep(200);
stream.accept(BufferUtil.toBuffer("Saved",StandardCharsets.UTF_8),fin);
stream.accept(new BinaryFrame().setPayload("Saved").setFin(false), new FrameCallback.Adapter());
TimeUnit.MILLISECONDS.sleep(200);
stream.accept(BufferUtil.toBuffer(" by ",StandardCharsets.UTF_8),fin);
fin = true;
stream.accept(new ContinuationFrame().setPayload(" by ").setFin(false), new FrameCallback.Adapter());
TimeUnit.MILLISECONDS.sleep(200);
stream.accept(BufferUtil.toBuffer("Zero",StandardCharsets.UTF_8),fin);
stream.accept(new ContinuationFrame().setPayload("Zero").setFin(true), new FrameCallback.Adapter());
}
catch (InterruptedException e)
{
@ -125,10 +128,9 @@ public class MessageInputStreamTest
{
try
{
boolean fin = true;
// wait for a little bit before populating buffers
TimeUnit.MILLISECONDS.sleep(400);
stream.accept(BufferUtil.toBuffer("I will conquer",StandardCharsets.UTF_8),fin);
stream.accept(new BinaryFrame().setPayload("I will conquer").setFin(true), new FrameCallback.Adapter());
}
catch (InterruptedException e)
{
@ -189,13 +191,14 @@ public class MessageInputStreamTest
try (MessageInputStream stream = new MessageInputStream())
{
// Append parts of message
ByteBuffer msg1 = BufferUtil.toBuffer("Hello ",StandardCharsets.UTF_8);
ByteBuffer msg2 = ByteBuffer.allocate(0); // what is being tested
ByteBuffer msg3 = BufferUtil.toBuffer("World",StandardCharsets.UTF_8);
WebSocketFrame msg1 = new BinaryFrame().setPayload("Hello ").setFin(false);
// what is being tested (an empty payload)
WebSocketFrame msg2 = new ContinuationFrame().setPayload(new byte[0]).setFin(false);
WebSocketFrame msg3 = new ContinuationFrame().setPayload("World").setFin(true);
stream.accept(msg1,false);
stream.accept(msg2,false);
stream.accept(msg3,true);
stream.accept(msg1, new FrameCallback.Adapter());
stream.accept(msg2, new FrameCallback.Adapter());
stream.accept(msg3, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];
@ -213,13 +216,15 @@ public class MessageInputStreamTest
try (MessageInputStream stream = new MessageInputStream())
{
// Append parts of message
ByteBuffer msg1 = BufferUtil.toBuffer("Hello ",StandardCharsets.UTF_8);
ByteBuffer msg2 = null; // what is being tested
ByteBuffer msg3 = BufferUtil.toBuffer("World",StandardCharsets.UTF_8);
WebSocketFrame msg1 = new BinaryFrame().setPayload("Hello ").setFin(false);
// what is being tested (a null payload)
ByteBuffer nilPayload = null;
WebSocketFrame msg2 = new ContinuationFrame().setPayload(nilPayload).setFin(false);
WebSocketFrame msg3 = new ContinuationFrame().setPayload("World").setFin(true);
stream.accept(msg1,false);
stream.accept(msg2,false);
stream.accept(msg3,true);
stream.accept(msg1, new FrameCallback.Adapter());
stream.accept(msg2, new FrameCallback.Adapter());
stream.accept(msg3, new FrameCallback.Adapter());
// Read entire message it from the stream.
byte buf[] = new byte[32];

View File

@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.net.URI;
import java.util.Arrays;
import org.eclipse.jetty.toolchain.test.TestTracker;
@ -31,7 +32,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.FramePipes;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
@ -57,7 +58,7 @@ public class MessageOutputStreamTest
private WebSocketPolicy policy;
private TrackingSocket remoteSocket;
private LocalWebSocketSession session;
private WebSocketSession session;
private WebSocketSession remoteSession;
@After
@ -81,14 +82,18 @@ public class MessageOutputStreamTest
// remote socket
remoteSocket = new TrackingSocket("remote");
remoteSession = new LocalWebSocketSession(containerScope,testname,remoteSocket);
URI remoteURI = new URI("ws://localhost/remote");
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(bufferPool);
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,remoteConnection);
OutgoingFrames socketPipe = FramePipes.to(remoteSession);
remoteSession.start();
remoteSession.open();
// Local Session
TrackingSocket localSocket = new TrackingSocket("local");
session = new LocalWebSocketSession(containerScope,testname,localSocket);
URI localURI = new URI("ws://localhost/local");
LocalWebSocketConnection localConnection = new LocalWebSocketConnection(bufferPool);
session = new WebSocketSession(containerScope,localURI,localSocket,localConnection);
// talk to our remote socket
session.setOutgoingHandler(socketPipe);
@ -98,7 +103,7 @@ public class MessageOutputStreamTest
session.open();
}
@Test
@Test(timeout = 2000)
public void testMultipleWrites() throws Exception
{
try (MessageOutputStream stream = new MessageOutputStream(session))
@ -113,7 +118,7 @@ public class MessageOutputStreamTest
Assert.assertThat("Message",msg,allOf(containsString("byte[11]"),containsString("Hello World")));
}
@Test
@Test(timeout = 2000)
public void testSingleWrite() throws Exception
{
try (MessageOutputStream stream = new MessageOutputStream(session))
@ -126,7 +131,7 @@ public class MessageOutputStreamTest
Assert.assertThat("Message",msg,allOf(containsString("byte[11]"),containsString("Hello World")));
}
@Test
@Test(timeout = 2000)
public void testWriteMultipleBuffers() throws Exception
{
int bufsize = (int)(policy.getMaxBinaryMessageBufferSize() * 2.5);

View File

@ -101,7 +101,7 @@ public class MessageWriterTest
session.open();
}
@Test
@Test(timeout = 2000)
public void testMultipleWrites() throws Exception
{
try (MessageWriter stream = new MessageWriter(session))
@ -116,7 +116,7 @@ public class MessageWriterTest
Assert.assertThat("Message",msg,is("Hello World"));
}
@Test
@Test(timeout = 20000)
public void testSingleWrite() throws Exception
{
try (MessageWriter stream = new MessageWriter(session))
@ -129,7 +129,7 @@ public class MessageWriterTest
Assert.assertThat("Message",msg,is("Hello World"));
}
@Test
@Test(timeout = 2000)
public void testWriteMultipleBuffers() throws Exception
{
int bufsize = (int)(policy.getMaxTextMessageBufferSize() * 2.5);

View File

@ -54,8 +54,8 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -88,7 +88,7 @@ import org.junit.Assert;
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
* scope.
*/
public class BlockheadClient implements OutgoingFrames, ConnectionStateListener, AutoCloseable, IBlockheadClient
public class BlockheadClient implements OutgoingFrames, ConnectionStateListener, AutoCloseable, IBlockheadClient, Parser.Handler
{
private class FrameReadingThread extends Thread implements Runnable, IncomingFrames
{
@ -167,7 +167,7 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
@Override
public synchronized void incomingFrame(Frame frame)
public synchronized void incomingFrame(Frame frame, FrameCallback callback)
{
this.frames.add(WebSocketFrame.copy(frame));
}
@ -231,7 +231,7 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
// This is a blockhead client, no point tracking leaks on this object.
this.bufferPool = new MappedByteBufferPool(8192);
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy,bufferPool);
this.parser = new Parser(policy,bufferPool,this);
this.extensionFactory = new WebSocketExtensionFactory(new SimpleContainerScope(policy,bufferPool));
this.ioState = new IOState();
@ -435,7 +435,6 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
// configure parser
parser.setIncomingFramesHandler(extensionStack);
ioState.onOpened();
LOG.debug("outgoing = {}",outgoing);
@ -591,7 +590,7 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -605,14 +604,14 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
out.flush();
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}
catch (IOException e)
{
if (callback != null)
{
callback.writeFailed(e);
callback.fail(e);
}
}
finally
@ -626,6 +625,12 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
}
}
@Override
public void onFrame(Frame frame)
{
// TODO
}
public EventQueue<WebSocketFrame> readFrames(int expectedFrameCount, int timeoutDuration, TimeUnit timeoutUnit) throws Exception
{
frameReader.frames.awaitEventCount(expectedFrameCount,timeoutDuration,timeoutUnit);

View File

@ -18,7 +18,8 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.io.BufferedReader;
import java.io.IOException;
@ -46,15 +47,16 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
@ -88,6 +90,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
private Map<String, String> extraResponseHeaders = new HashMap<>();
private OutgoingFrames outgoing = this;
private ExtensionStack extensionStack;
public BlockheadServerConnection(Socket socket)
{
@ -98,7 +101,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
this.policy.setMaxTextMessageSize(100000);
// This is a blockhead server connection, no point tracking leaks on this object.
this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE);
this.parser = new Parser(policy,bufferPool);
this.parser = new Parser(policy,bufferPool,frame -> extensionStack.incomingFrame(frame, new FrameCallbackAdapter()));
this.parseCount = new AtomicInteger(0);
this.generator = new Generator(policy,bufferPool,false);
this.extensionRegistry = new WebSocketExtensionFactory(new SimpleContainerScope(policy,bufferPool));
@ -214,7 +217,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
LOG.debug("incoming({})",frame);
int count = parseCount.incrementAndGet();
@ -222,7 +225,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
{
LOG.info("Server parsed {} frames",count);
}
incomingFrames.incomingFrame(WebSocketFrame.copy(frame));
incomingFrames.incomingFrame(WebSocketFrame.copy(frame), callback);
if (frame.getOpCode() == OpCode.CLOSE)
{
@ -245,7 +248,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
ByteBuffer headerBuf = generator.generateHeaderBytes(frame);
if (LOG.isDebugEnabled())
@ -261,7 +264,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
out.flush();
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
if (frame.getOpCode() == OpCode.CLOSE)
@ -273,7 +276,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
{
if (callback != null)
{
callback.writeFailed(t);
callback.fail(t);
}
}
}
@ -514,7 +517,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
Assert.assertThat("Request: Sec-WebSocket-Key",key,notNullValue());
// collect extensions configured in response header
ExtensionStack extensionStack = new ExtensionStack(extensionRegistry);
extensionStack = new ExtensionStack(extensionRegistry);
extensionStack.negotiate(extensionConfigs);
// Start with default routing
@ -535,9 +538,6 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
throw new IOException("Unable to start Extension Stack");
}
// Configure Parser
parser.setIncomingFramesHandler(extensionStack);
// Setup Response
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 101 Upgrade\r\n");

View File

@ -25,11 +25,10 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.io.IOState;
@ -134,9 +133,9 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
callback.writeSuccess();
callback.succeed();
}
@Override
@ -149,13 +148,6 @@ public class DummyConnection implements LogicalConnection
{
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
if (LOG.isDebugEnabled())
LOG.debug("setNextIncomingFrames({})", incoming);
}
@Override
public SuspendToken suspend()
{

View File

@ -27,19 +27,22 @@ import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
public class IncomingFramesCapture implements IncomingFrames
public class IncomingFramesCapture implements Parser.Handler, IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private EventQueue<WebSocketFrame> frames = new EventQueue<>();
private EventQueue<Throwable> errors = new EventQueue<>();
@Deprecated
public void assertErrorCount(int expectedCount)
{
Assert.assertThat("Captured error count",errors.size(),is(expectedCount));
@ -63,6 +66,7 @@ public class IncomingFramesCapture implements IncomingFrames
Assert.assertThat("Captured frame count",frames.size(),is(expectedCount));
}
@Deprecated
public void assertHasErrors(Class<? extends WebSocketException> errorType, int expectedCount)
{
Assert.assertThat(errorType.getSimpleName(),getErrorCount(errorType),is(expectedCount));
@ -84,6 +88,7 @@ public class IncomingFramesCapture implements IncomingFrames
Assert.assertThat("Frame count",frames.size(),is(0));
}
@Deprecated
public void assertNoErrors()
{
Assert.assertThat("Error count",errors.size(),is(0));
@ -105,6 +110,7 @@ public class IncomingFramesCapture implements IncomingFrames
}
}
@Deprecated
public int getErrorCount(Class<? extends Throwable> errorType)
{
int count = 0;
@ -118,6 +124,7 @@ public class IncomingFramesCapture implements IncomingFrames
return count;
}
@Deprecated
public Queue<Throwable> getErrors()
{
return errors;
@ -142,18 +149,21 @@ public class IncomingFramesCapture implements IncomingFrames
}
@Override
public void incomingError(Throwable e)
public void incomingError(Throwable t)
{
LOG.debug(e);
errors.add(e);
errors.add(t);
}
@Override
public void incomingFrame(Frame frame)
public void incomingFrame(Frame frame, FrameCallback callback)
{
onFrame(frame);
}
@Override
public void onFrame(Frame frame)
{
WebSocketFrame copy = WebSocketFrame.copy(frame);
// TODO: might need to make this optional (depending on use by client vs server tests)
// Assert.assertThat("frame.masking must be set",frame.isMasked(),is(true));
frames.add(copy);
}

View File

@ -18,21 +18,21 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
public class OutgoingFramesCapture implements OutgoingFrames
{
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
@ -87,7 +87,7 @@ public class OutgoingFramesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
frames.add(WebSocketFrame.copy(frame));
// Consume bytes
@ -96,7 +96,7 @@ public class OutgoingFramesCapture implements OutgoingFrames
// notify callback
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}
}

View File

@ -29,7 +29,7 @@ import java.util.Locale;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.Generator;
@ -63,7 +63,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
{
ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
generator.generateWholeFrame(frame,buf);
@ -71,7 +71,7 @@ public class OutgoingNetworkBytesCapture implements OutgoingFrames
captured.add(buf);
if (callback != null)
{
callback.writeSuccess();
callback.succeed();
}
}
}

View File

@ -28,14 +28,9 @@ import org.eclipse.jetty.websocket.common.Parser;
public class UnitParser extends Parser
{
public UnitParser()
public UnitParser(WebSocketPolicy policy, Parser.Handler handler)
{
this(WebSocketPolicy.newServerPolicy());
}
public UnitParser(WebSocketPolicy policy)
{
super(policy,new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
super(policy,new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()),handler);
}
private void parsePartial(ByteBuffer buf, int numBytes)
@ -52,6 +47,7 @@ public class UnitParser extends Parser
* Use if you know the parse will cause an exception and just don't wnat to make the test console all noisy.
* @param buf the buffer to parse
*/
@Deprecated
public void parseQuietly(ByteBuffer buf)
{
try (StacklessLogging suppress = new StacklessLogging(Parser.class))

View File

@ -6,4 +6,4 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.message.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.function.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.function.LEVEL=DEBUG

View File

@ -26,14 +26,14 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
public class WebSocketServerConnection extends AbstractWebSocketConnection implements Connection.UpgradeTo
{
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
super(endp,executor,scheduler,policy,bufferPool);
super(endp,executor,scheduler,policy,bufferPool,extensionStack);
}
@Override
@ -47,10 +47,4 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection imple
{
return getEndPoint().getRemoteAddress();
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{
getParser().setIncomingFramesHandler(incoming);
}
}

View File

@ -560,7 +560,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
ByteBufferPool bufferPool = connector.getByteBufferPool();
// Setup websocket connection
AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool);
AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, getPolicy().clonePolicy(), bufferPool, extensionStack);
extensionStack.setPolicy(wsConnection.getPolicy());
extensionStack.configure(wsConnection.getParser());
@ -581,7 +581,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
wsConnection.addListener(session);
// Setup Incoming Routing
wsConnection.setNextIncomingFrames(extensionStack);
extensionStack.setNextIncoming(session);
// Setup Outgoing Routing