WebSocket core autobahn and MessageHandler refactor (#3802)

* reworked WebSocket autobahn test code and the core MessageHandler

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* PR #3802 - changes from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2019-06-26 13:47:46 +10:00 committed by GitHub
parent 390033edfc
commit cc4304a0f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 351 additions and 1373 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
@ -343,6 +344,8 @@ public class MessageReceivingTest
public static class ServerMessageNegotiator extends CoreServer.BaseNegotiator
{
private static final int MAX_MESSAGE_SIZE = (1024 * 1024) + 2;
public ServerMessageNegotiator()
{
super();
@ -363,7 +366,6 @@ public class MessageReceivingTest
{
negotiation.setSubprotocol("partial-binary");
SendPartialBinaryFrameHandler frameHandler = new SendPartialBinaryFrameHandler();
frameHandler.setMaxBinaryMessageSize((1024 * 1024) + 2);
return frameHandler;
}
@ -371,12 +373,18 @@ public class MessageReceivingTest
{
negotiation.setSubprotocol("echo");
EchoWholeMessageFrameHandler frameHandler = new EchoWholeMessageFrameHandler();
frameHandler.setMaxTextMessageSize((1024 * 1024) + 2);
return frameHandler;
}
return null;
}
@Override
public void customize(FrameHandler.Configuration configurable)
{
configurable.setMaxBinaryMessageSize(MAX_MESSAGE_SIZE);
configurable.setMaxTextMessageSize(MAX_MESSAGE_SIZE);
}
}
public static class TestEndpoint extends Endpoint

View File

@ -81,11 +81,11 @@ public class LargeAnnotatedTest
client.start();
FrameHandlerTracker clientSocket = new FrameHandlerTracker();
clientSocket.setMaxTextMessageSize(128 * 1024);
Future<FrameHandler.CoreSession> clientConnectFuture = client.connect(clientSocket, uri.resolve("/app/echo/large"));
// wait for connect
FrameHandler.CoreSession coreSession = clientConnectFuture.get(1, TimeUnit.SECONDS);
coreSession.setMaxTextMessageSize(128 * 1024);
try
{

View File

@ -103,11 +103,11 @@ public class LargeContainerTest
client.start();
FrameHandlerTracker clientSocket = new FrameHandlerTracker();
clientSocket.setMaxTextMessageSize(128 * 1024);
Future<FrameHandler.CoreSession> clientConnectFuture = client.connect(clientSocket, uri.resolve("/app/echo/large"));
// wait for connect
FrameHandler.CoreSession coreSession = clientConnectFuture.get(5, TimeUnit.SECONDS);
coreSession.setMaxTextMessageSize(128 * 1024);
try
{
// The message size should be bigger than default, but smaller than the limit that LargeEchoSocket specifies

View File

@ -4,9 +4,7 @@
},
"url": "ws://127.0.0.1:9001",
"outdir": "./target/reports/clients",
"cases": [
"*"
],
"cases": ["*"],
"exclude-cases": [],
"exclude-agent-cases": {}
}

View File

@ -324,7 +324,7 @@ public interface FrameHandler extends IncomingFrames
*/
void demand(long n);
class Empty implements CoreSession
class Empty extends ConfigurationCustomizer implements CoreSession
{
@Override
public String getNegotiatedSubProtocol()
@ -397,28 +397,6 @@ public interface FrameHandler extends IncomingFrames
return false;
}
@Override
public Duration getIdleTimeout()
{
return Duration.ZERO;
}
@Override
public Duration getWriteTimeout()
{
return Duration.ZERO;
}
@Override
public void setIdleTimeout(Duration timeout)
{
}
@Override
public void setWriteTimeout(Duration timeout)
{
}
@Override
public void flush(Callback callback)
{
@ -439,76 +417,10 @@ public interface FrameHandler extends IncomingFrames
{
}
@Override
public boolean isAutoFragment()
{
return false;
}
@Override
public void setAutoFragment(boolean autoFragment)
{
}
@Override
public long getMaxFrameSize()
{
return 0;
}
@Override
public void setMaxFrameSize(long maxFrameSize)
{
}
@Override
public int getOutputBufferSize()
{
return 0;
}
@Override
public void setOutputBufferSize(int outputBufferSize)
{
}
@Override
public int getInputBufferSize()
{
return 0;
}
@Override
public void setInputBufferSize(int inputBufferSize)
{
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
}
@Override
public long getMaxBinaryMessageSize()
{
return 0;
}
@Override
public void setMaxBinaryMessageSize(long maxSize)
{
}
@Override
public long getMaxTextMessageSize()
{
return 0;
}
@Override
public void setMaxTextMessageSize(long maxSize)
{
}
}
}

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.core;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
@ -39,7 +39,6 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class MessageHandler implements FrameHandler
{
public static MessageHandler from(Consumer<String> onText, Consumer<ByteBuffer> onBinary)
{
return new MessageHandler()
@ -86,56 +85,38 @@ public class MessageHandler implements FrameHandler
};
}
private static final Logger LOG = Log.getLogger(MessageHandler.class);
private final int factor;
protected static final Logger LOG = Log.getLogger(MessageHandler.class);
private CoreSession coreSession;
private Utf8StringBuilder utf8StringBuilder = null;
private ByteBuffer binaryMessage = null;
private Utf8StringBuilder textMessageBuffer;
private ByteArrayOutputStream binaryMessageBuffer;
private byte dataType = OpCode.UNDEFINED;
private int maxTextMessageSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
private int maxBinaryMessageSize = WebSocketConstants.DEFAULT_MAX_BINARY_MESSAGE_SIZE;
public MessageHandler()
{
this(3);
}
public MessageHandler(int factor)
{
this.factor = factor;
}
public int getMaxTextMessageSize()
{
return maxTextMessageSize;
}
public void setMaxTextMessageSize(int maxTextMessageSize)
{
this.maxTextMessageSize = maxTextMessageSize;
}
public int getMaxBinaryMessageSize()
{
return maxBinaryMessageSize;
}
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
{
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
public CoreSession getCoreSession()
{
return coreSession;
}
private Utf8StringBuilder getTextMessageBuffer()
{
if (textMessageBuffer == null)
textMessageBuffer = new Utf8StringBuilder();
return textMessageBuffer;
}
private ByteArrayOutputStream getBinaryMessageBuffer()
{
if (binaryMessageBuffer == null)
binaryMessageBuffer = new ByteArrayOutputStream();
return binaryMessageBuffer;
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", coreSession);
this.coreSession = coreSession;
callback.succeeded();
}
@ -143,90 +124,33 @@ public class MessageHandler implements FrameHandler
@Override
public void onFrame(Frame frame, Callback callback)
{
try
if (LOG.isDebugEnabled())
LOG.debug("onFrame {}", frame);
switch (frame.getOpCode())
{
byte opcode = frame.getOpCode();
if (LOG.isDebugEnabled())
LOG.debug("{}: {}", OpCode.name(opcode), BufferUtil.toDetailString(frame.getPayload()));
switch (opcode)
{
case OpCode.PING:
case OpCode.PONG:
case OpCode.CLOSE:
if (isDemanding())
getCoreSession().demand(1);
callback.succeeded();
break;
case OpCode.BINARY:
if (frame.isFin())
{
final int maxSize = getMaxBinaryMessageSize();
if (frame.hasPayload() && frame.getPayload().remaining() > maxSize)
throw new MessageTooLargeException("Message larger than " + maxSize + " bytes");
onBinary(frame.getPayload(), callback); //bypass buffer aggregation
}
else
{
dataType = OpCode.BINARY;
binaryMessage = getCoreSession().getByteBufferPool().acquire(frame.getPayloadLength() * factor, false);
onBinaryFrame(frame, callback);
}
break;
case OpCode.TEXT:
dataType = OpCode.TEXT;
if (utf8StringBuilder == null)
{
final int maxSize = getMaxTextMessageSize();
utf8StringBuilder = (maxSize < 0) ? new Utf8StringBuilder() : new Utf8StringBuilder()
{
@Override
protected void appendByte(byte b) throws IOException
{
// TODO can we avoid byte by byte length check?
if (length() >= maxSize)
throw new MessageTooLargeException("Message larger than " + maxSize + " characters");
super.appendByte(b);
}
};
}
onTextFrame(frame, callback);
break;
case OpCode.CONTINUATION:
switch (dataType)
{
case OpCode.BINARY:
onBinaryFrame(frame, callback);
break;
case OpCode.TEXT:
onTextFrame(frame, callback);
break;
default:
throw new IllegalStateException();
}
break;
default:
throw new IllegalStateException();
}
}
catch (Utf8Appendable.NotUtf8Exception bple)
{
utf8StringBuilder.reset();
callback.failed(new BadPayloadException(bple));
}
catch (Throwable th)
{
if (utf8StringBuilder != null)
utf8StringBuilder.reset();
callback.failed(th);
case OpCode.CLOSE:
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
dataType = OpCode.TEXT;
onTextFrame(frame, callback);
break;
case OpCode.BINARY:
dataType = OpCode.BINARY;
onBinaryFrame(frame, callback);
break;
case OpCode.CONTINUATION:
onContinuationFrame(frame, callback);
if (frame.isFin())
dataType = OpCode.UNDEFINED;
break;
}
}
@ -234,7 +158,8 @@ public class MessageHandler implements FrameHandler
public void onError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug(this + " onError ", cause);
LOG.debug("onError ", cause);
callback.succeeded();
}
@ -242,98 +167,128 @@ public class MessageHandler implements FrameHandler
public void onClosed(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClosed {}", this, closeStatus);
if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
LOG.debug("onClosed {}", closeStatus);
if (binaryMessage != null)
if (textMessageBuffer != null)
{
if (BufferUtil.hasContent(binaryMessage))
LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
textMessageBuffer.reset();
textMessageBuffer = null;
}
if (utf8StringBuilder != null)
if (binaryMessageBuffer != null)
{
utf8StringBuilder.reset();
utf8StringBuilder = null;
binaryMessageBuffer.reset();
binaryMessageBuffer = null;
}
coreSession = null;
callback.succeeded();
}
private void onTextFrame(Frame frame, Callback callback)
protected void onTextFrame(Frame frame, Callback callback)
{
if (frame.hasPayload())
utf8StringBuilder.append(frame.getPayload());
if (frame.isFin())
try
{
dataType = OpCode.UNDEFINED;
Utf8StringBuilder textBuffer = getTextMessageBuffer();
String message = utf8StringBuilder.toString();
utf8StringBuilder.reset();
onText(message, callback);
if (frame.hasPayload())
{
long maxSize = coreSession.getMaxTextMessageSize();
long currentSize = frame.getPayload().remaining() + textBuffer.length();
if (currentSize > maxSize)
throw new MessageTooLargeException("Message larger than " + maxSize + " bytes");
textBuffer.append(frame.getPayload());
}
if (frame.isFin())
{
onText(textBuffer.toString(), callback);
textBuffer.reset();
}
else
{
callback.succeeded();
}
}
else
catch (Utf8Appendable.NotUtf8Exception e)
{
if (isDemanding())
getCoreSession().demand(1);
callback.succeeded();
callback.failed(new BadPayloadException(e));
}
catch (Throwable t)
{
callback.failed(t);
}
}
protected void onBinaryFrame(Frame frame, Callback callback)
{
try
{
ByteArrayOutputStream binaryBuffer = getBinaryMessageBuffer();
if (frame.hasPayload())
{
long maxSize = coreSession.getMaxBinaryMessageSize();
long currentSize = frame.getPayload().remaining() + binaryBuffer.size();
if (currentSize > maxSize)
throw new MessageTooLargeException("Message larger than " + maxSize + " bytes");
BufferUtil.writeTo(frame.getPayload(), binaryBuffer);
}
if (frame.isFin())
{
onBinary(BufferUtil.toBuffer(binaryBuffer.toByteArray()), callback);
binaryBuffer.reset();
}
else
{
callback.succeeded();
}
}
catch (Throwable t)
{
callback.failed(t);
}
}
private void onBinaryFrame(Frame frame, Callback callback)
protected void onContinuationFrame(Frame frame, Callback callback)
{
if (frame.hasPayload())
switch (dataType)
{
if (BufferUtil.space(binaryMessage) < frame.getPayloadLength())
binaryMessage = BufferUtil
.ensureCapacity(binaryMessage, binaryMessage.capacity() + Math.max(binaryMessage.capacity(), frame.getPayloadLength() * factor));
case OpCode.BINARY:
onBinaryFrame(frame, callback);
break;
BufferUtil.append(binaryMessage, frame.getPayload());
case OpCode.TEXT:
onTextFrame(frame, callback);
break;
default:
throw new IllegalStateException();
}
}
final int maxSize = getMaxBinaryMessageSize();
if (binaryMessage.remaining() > maxSize)
{
getCoreSession().getByteBufferPool().release(binaryMessage);
binaryMessage = null;
throw new MessageTooLargeException("Message larger than " + maxSize + " bytes");
}
protected void onPingFrame(Frame frame, Callback callback)
{
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), callback, false);
}
if (frame.isFin())
{
dataType = OpCode.UNDEFINED;
protected void onPongFrame(Frame frame, Callback callback)
{
callback.succeeded();
}
final ByteBuffer completeMessage = binaryMessage;
binaryMessage = null;
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
getCoreSession().getByteBufferPool().release(completeMessage);
}
};
onBinary(completeMessage, callback);
}
else
{
if (isDemanding())
getCoreSession().demand(1);
callback.succeeded();
}
protected void onCloseFrame(Frame frame, Callback callback)
{
callback.succeeded();
}
/**
* Method called when a complete text message is received.
*
* @param message the received text payload
* @param message the received text payload
* @param callback The callback to signal completion of handling.
*/
protected void onText(String message, Callback callback)
@ -344,7 +299,7 @@ public class MessageHandler implements FrameHandler
/**
* Method called when a complete binary message is received.
*
* @param message The binary payload
* @param message The binary payload
* @param callback The callback to signal completion of handling.
*/
protected void onBinary(ByteBuffer message, Callback callback)
@ -355,13 +310,12 @@ public class MessageHandler implements FrameHandler
/**
* Send a String as a single text frame.
*
* @param message The message to send
* @param message The message to send
* @param callback The callback to call when the send is complete
* @param batch The batch mode to send the frames in.
* @param batch The batch mode to send the frames in.
*/
public void sendText(String message, Callback callback, boolean batch)
{
// TODO if autofragment is on, enforce max buffer size
getCoreSession().sendFrame(new Frame(OpCode.TEXT, true, message), callback, batch);
}
@ -371,8 +325,8 @@ public class MessageHandler implements FrameHandler
* single fragment need be converted to bytes
*
* @param callback The callback to call when the send is complete
* @param batch The batch mode to send the frames in.
* @param parts The parts of the message.
* @param batch The batch mode to send the frames in.
* @param parts The parts of the message.
*/
public void sendText(Callback callback, boolean batch, final String... parts)
{
@ -400,8 +354,8 @@ public class MessageHandler implements FrameHandler
String part = parts[i++];
getCoreSession().sendFrame(new Frame(
i == 1 ? OpCode.TEXT : OpCode.CONTINUATION,
i == parts.length, part), this, batch);
i == 1 ? OpCode.TEXT : OpCode.CONTINUATION,
i == parts.length, part), this, batch);
return Action.SCHEDULED;
}
}.iterate();
@ -410,9 +364,9 @@ public class MessageHandler implements FrameHandler
/**
* Send a ByteBuffer as a single binary frame.
*
* @param message The message to send
* @param message The message to send
* @param callback The callback to call when the send is complete
* @param batch The batch mode to send the frames in.
* @param batch The batch mode to send the frames in.
*/
public void sendBinary(ByteBuffer message, Callback callback, boolean batch)
{
@ -423,8 +377,8 @@ public class MessageHandler implements FrameHandler
* Send a sequence of ByteBuffers as a sequences for fragmented text frame.
*
* @param callback The callback to call when the send is complete
* @param batch The batch mode to send the frames in.
* @param parts The parts of the message.
* @param batch The batch mode to send the frames in.
* @param parts The parts of the message.
*/
public void sendBinary(Callback callback, boolean batch, final ByteBuffer... parts)
{
@ -452,8 +406,8 @@ public class MessageHandler implements FrameHandler
ByteBuffer part = parts[i++];
getCoreSession().sendFrame(new Frame(
i == 1 ? OpCode.BINARY : OpCode.CONTINUATION,
i == parts.length, part), this, batch);
i == 1 ? OpCode.BINARY : OpCode.CONTINUATION,
i == parts.length, part), this, batch);
return Action.SCHEDULED;
}
}.iterate();

View File

@ -1,349 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import static org.eclipse.jetty.websocket.core.OpCode.PONG;
/**
* Base level implementation of local WebSocket Endpoint Frame handling.
* <p>
* This implementation assumes RFC6455 behavior with HTTP/1.1.
* NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some.
* </p>
*/
public class AbstractTestFrameHandler implements SynchronousFrameHandler
{
private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class);
private byte partial = OpCode.UNDEFINED;
private Utf8StringBuilder utf8;
private ByteBuffer byteBuffer;
private FrameHandler.CoreSession coreSession;
public FrameHandler.CoreSession getCoreSession()
{
return coreSession;
}
@Override
public void onOpen(CoreSession coreSession)
{
this.coreSession = coreSession;
onOpen();
}
public void onOpen()
{
}
@Override
public void onFrame(Frame frame, Callback callback)
{
byte opcode = frame.getOpCode();
if (LOG.isDebugEnabled())
LOG.debug("{}: {}", OpCode.name(opcode), BufferUtil.toDetailString(frame.getPayload()));
switch (opcode)
{
case OpCode.PING:
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
onTextFrame(frame, callback);
break;
case OpCode.BINARY:
onBinaryFrame(frame, callback);
break;
case OpCode.CONTINUATION:
onContinuationFrame(frame, callback);
break;
case OpCode.CLOSE:
onCloseFrame(frame, callback);
break;
}
}
@Override
public void onError(Throwable cause)
{
}
/**
* Notification method for when a Ping frame is received.
* The default implementation sends a Pong frame using the passed callback for completion
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
*/
protected void onPingFrame(Frame frame, Callback callback)
{
ByteBuffer pongBuf;
if (frame.hasPayload())
{
pongBuf = ByteBuffer.allocate(frame.getPayload().remaining());
BufferUtil.put(frame.getPayload().slice(), pongBuf);
BufferUtil.flipToFlush(pongBuf, 0);
}
else
{
pongBuf = ByteBuffer.allocate(0);
}
try
{
coreSession.sendFrame(new Frame(PONG).setPayload(pongBuf), callback, false);
}
catch (Throwable t)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to send pong", t);
callback.failed(t);
}
}
/**
* Notification method for when a Pong frame is received.
* The default implementation just succeeds the callback
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
*/
protected void onPongFrame(Frame frame, Callback callback)
{
callback.succeeded();
}
/**
* Notification method for when a Text frame is received.
* The default implementation accumulates the payload in a Utf8StringBuilder
* and calls the {@link #onText(Utf8StringBuilder, Callback, boolean)} method.
* For partial textMessages (fin == false), the {@link #onText(Utf8StringBuilder, Callback, boolean)}
* may either leave the contents in the Utf8StringBuilder to accumulate with following Continuation
* frames, or it may be consumed.
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
* @see #onText(Utf8StringBuilder, Callback, boolean)
*/
protected void onTextFrame(Frame frame, Callback callback)
{
if (utf8 == null)
utf8 = new Utf8StringBuilder(Math.max(1024, frame.getPayloadLength() * 2));
else
utf8.reset();
if (frame.hasPayload())
utf8.append(frame
.getPayload()); // TODO: this should trigger a bad UTF8 exception if sequence is bad which we wrap in a ProtocolException (but not on unfinished sequences)
if (frame.isFin())
utf8.checkState(); // TODO: this should not be necessary, checkState() shouldn't be necessary to use (the utf8.toString() should trigger on bad utf8 in final octets)
else
partial = OpCode.TEXT;
onText(utf8, callback, frame.isFin());
}
/**
* Notification method for when UTF8 text is received. This method is
* called by {@link #onTextFrame(Frame, Callback)} and
* {@link #onContinuationFrame(Frame, Callback)}. Implementations
* may consume partial content with {@link Utf8StringBuilder#takePartialString()}
* or leave it to accumulate over multiple calls.
* The default implementation just succeeds the callback.
*
* @param utf8 The received text
* @param callback The callback to indicate completion of frame handling.
* @param fin True if the current message is completed by this call.
*/
protected void onText(Utf8StringBuilder utf8, Callback callback, boolean fin)
{
callback.succeeded();
}
/**
* Notification method for when a Binary frame is received.
* The default implementation accumulates the payload in a ByteBuffer
* and calls the {@link #onBinary(ByteBuffer, Callback, boolean)} method.
* For partial textMessages (fin == false), the {@link #onBinary(ByteBuffer, Callback, boolean)}
* may either leave the contents in the ByteBuffer to accumulate with following Continuation
* frames, or it may be consumed.
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
* @see #onBinary(ByteBuffer, Callback, boolean)
*/
protected void onBinaryFrame(Frame frame, Callback callback)
{
if (frame.isFin())
{
onBinary(frame.getPayload(), callback, true);
}
else
{
partial = OpCode.BINARY;
// TODO use the pool?
if (byteBuffer == null)
byteBuffer = BufferUtil.allocate(Math.max(1024, frame.getPayloadLength() * 2));
else
BufferUtil.clear(byteBuffer);
if (frame.hasPayload())
BufferUtil.append(byteBuffer, frame.getPayload());
onBinary(byteBuffer, callback, false);
}
}
/**
* Notification method for when binary data is received. This method is
* called by {@link #onBinaryFrame(Frame, Callback)} and
* {@link #onContinuationFrame(Frame, Callback)}. Implementations
* may consume partial content from the {@link ByteBuffer}
* or leave it to accumulate over multiple calls.
* The default implementation just succeeds the callback.
*
* @param payload The received data
* @param callback The callback to indicate completion of frame handling.
* @param fin True if the current message is completed by this call.
*/
protected void onBinary(ByteBuffer payload, Callback callback, boolean fin)
{
callback.succeeded();
}
/**
* Notification method for when a Continuation frame is received.
* The default implementation will call either {@link #onText(Utf8StringBuilder, Callback, boolean)}
* or {@link #onBinary(ByteBuffer, Callback, boolean)} as appropriate, accumulating
* payload as necessary.
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
*/
protected void onContinuationFrame(Frame frame, Callback callback)
{
if (partial == OpCode.UNDEFINED)
{
callback.failed(new IllegalStateException());
return;
}
switch (partial)
{
case OpCode.TEXT:
if (frame.hasPayload())
utf8.append(frame.getPayload());
if (frame.isFin())
utf8.checkState();
onText(utf8, callback, frame.isFin());
break;
case OpCode.BINARY:
if (frame.hasPayload())
{
int factor = frame.isFin() ? 1 : 3;
BufferUtil.compact(byteBuffer);
if (BufferUtil.space(byteBuffer) < frame.getPayloadLength())
byteBuffer = BufferUtil
.ensureCapacity(byteBuffer, byteBuffer.capacity() + Math.max(byteBuffer.capacity(), frame.getPayloadLength() * factor));
BufferUtil.append(byteBuffer, frame.getPayload());
}
onBinary(byteBuffer, callback, frame.isFin());
break;
default:
callback.failed(new IllegalStateException());
}
}
/**
* Notification method for when a Close frame is received.
* The default implementation responds with a close frame when necessary.
*
* @param frame The received frame
* @param callback The callback to indicate completion of frame handling.
*/
protected void onCloseFrame(Frame frame, Callback callback)
{
int respond;
String reason = null;
int code = frame.hasPayload() ? new CloseStatus(frame.getPayload()).getCode() : -1;
switch (code)
{
case -1:
respond = CloseStatus.NORMAL;
break;
case CloseStatus.NORMAL:
case CloseStatus.SHUTDOWN:
case CloseStatus.PROTOCOL:
case CloseStatus.BAD_DATA:
case CloseStatus.BAD_PAYLOAD:
case CloseStatus.POLICY_VIOLATION:
case CloseStatus.MESSAGE_TOO_LARGE:
case CloseStatus.EXTENSION_ERROR:
case CloseStatus.SERVER_ERROR:
respond = 0;
break;
default:
if (code >= 3000 && code <= 4999)
{
respond = code;
}
else
{
respond = CloseStatus.PROTOCOL;
reason = "invalid " + code + " close received";
}
break;
}
if (respond > 0)
coreSession.close(respond, reason, callback);
else
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus)
{
}
}

View File

@ -65,7 +65,7 @@ public class GeneratorFrameFlagsTest
{
ExtensionStack exStack = new ExtensionStack(new WebSocketExtensionRegistry(), Behavior.SERVER);
exStack.negotiate(new DecoratedObjectFactory(), bufferPool, new LinkedList<>(), new LinkedList<>());
this.coreSession = new WebSocketCoreSession(new AbstractTestFrameHandler(), Behavior.CLIENT, Negotiated.from(exStack));
this.coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.CLIENT, Negotiated.from(exStack));
}
@ParameterizedTest

View File

@ -56,7 +56,7 @@ public class GeneratorTest
ByteBufferPool bufferPool = new MappedByteBufferPool();
ExtensionStack exStack = new ExtensionStack(new WebSocketExtensionRegistry(), Behavior.SERVER);
exStack.negotiate(new DecoratedObjectFactory(), bufferPool, new LinkedList<>(), new LinkedList<>());
return new WebSocketCoreSession(new AbstractTestFrameHandler(), behavior, Negotiated.from(exStack));
return new WebSocketCoreSession(new TestMessageHandler(), behavior, Negotiated.from(exStack));
}
/**

View File

@ -49,7 +49,6 @@ public class MessageHandlerTest
static byte[] nonUtf8Bytes = {0x7F, (byte)0xFF, (byte)0xFF};
boolean demanding;
int demand;
CoreSession coreSession;
List<String> textMessages = new ArrayList<>();
List<ByteBuffer> binaryMessages = new ArrayList<>();
@ -61,7 +60,6 @@ public class MessageHandlerTest
public void beforeEach() throws Exception
{
demanding = false;
demand = 0;
coreSession = new CoreSession.Empty()
{
@ -79,12 +77,6 @@ public class MessageHandlerTest
{
return byteBufferPool;
}
@Override
public void demand(long n)
{
demand += n;
}
};
handler = new MessageHandler()
@ -127,7 +119,7 @@ public class MessageHandlerTest
assertThat(callback.isDone(), is(true));
assertThat(textMessages.size(), is(0));
assertThat(frames.size(), is(0));
assertThat(frames.size(), is(1));
}
@Test
@ -276,61 +268,12 @@ public class MessageHandlerTest
assertThat(frames.size(), is(0));
}
@Test
public void testDemanding()
{
demanding = true;
FutureCallback callback;
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.PING, true, "test"), callback);
assertThat(callback.isDone(), is(true));
assertThat(demand, is(1));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, "test"), callback);
assertThat(callback.isDone(), is(false));
assertThat(textMessages.size(), is(1));
assertThat(textMessages.get(0), is("test"));
assertThat(callbacks.size(), is(1));
callbacks.get(0).succeeded();
assertThat(demand, is(1));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, "Hello"), callback);
assertThat(callback.isDone(), is(true));
assertThat(textMessages.size(), is(1));
assertThat(callbacks.size(), is(1));
assertThat(demand, is(2));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.CONTINUATION, false, " "), callback);
assertThat(callback.isDone(), is(true));
assertThat(textMessages.size(), is(1));
assertThat(callbacks.size(), is(1));
assertThat(demand, is(3));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.CONTINUATION, true, "World"), callback);
assertThat(callback.isDone(), is(false));
assertThat(textMessages.size(), is(2));
assertThat(textMessages.get(1), is("Hello World"));
assertThat(callbacks.size(), is(2));
callbacks.get(1).succeeded();
assertThat(callback.isDone(), is(true));
FutureCallback finalCallback = callback;
assertDoesNotThrow(() -> finalCallback.get());
assertThat(demand, is(3));
assertThat(frames.size(), is(0));
}
@Test
public void testTextNotTooLarge()
{
FutureCallback callback;
handler.setMaxTextMessageSize(4);
coreSession.setMaxTextMessageSize(4);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, "test"), callback);
@ -348,7 +291,7 @@ public class MessageHandlerTest
{
FutureCallback callback;
handler.setMaxTextMessageSize(4);
coreSession.setMaxTextMessageSize(4);
handler.onOpen(coreSession, NOOP);
callback = new FutureCallback();
@ -367,7 +310,7 @@ public class MessageHandlerTest
{
FutureCallback callback;
handler.setMaxTextMessageSize(4);
coreSession.setMaxTextMessageSize(4);
handler.onOpen(coreSession, NOOP);
callback = new FutureCallback();
@ -392,28 +335,21 @@ public class MessageHandlerTest
@Test
public void testLargeBytesSmallCharsTooLarge()
{
FutureCallback callback;
coreSession.setMaxTextMessageSize(4);
handler.setMaxTextMessageSize(4);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, BufferUtil.toBuffer(fourByteUtf8Bytes)), callback);
assertThat(callback.isDone(), is(true));
FutureCallback callback1 = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, BufferUtil.toBuffer(fourByteUtf8Bytes)), callback1);
assertThat(callback1.isDone(), is(true));
assertThat(textMessages.size(), is(0));
assertThat(callbacks.size(), is(0));
FutureCallback finalCallback = callback;
assertDoesNotThrow(() -> finalCallback.get());
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, BufferUtil.toBuffer(fourByteUtf8Bytes)), callback);
assertThat(callback.isDone(), is(false));
assertThat(textMessages.size(), is(1));
assertThat(textMessages.get(0), is(fourByteUtf8String + fourByteUtf8String));
assertThat(callbacks.size(), is(1));
callbacks.get(0).succeeded();
assertThat(callback.isDone(), is(true));
FutureCallback finalCallback1 = callback;
assertDoesNotThrow(() -> finalCallback1.get());
assertDoesNotThrow(() -> callback1.get());
FutureCallback callback2 = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, BufferUtil.toBuffer(fourByteUtf8Bytes)), callback2);
assertThat(callback2.isDone(), is(true));
assertThat(textMessages.size(), is(0));
assertThat(callbacks.size(), is(0));
ExecutionException e = assertThrows(ExecutionException.class, () -> callback2.get());
assertThat(e.getCause(), instanceOf(MessageTooLargeException.class));
}
@Test
@ -495,61 +431,12 @@ public class MessageHandlerTest
assertThat(frames.size(), is(0));
}
@Test
public void testDemandingBinary()
{
demanding = true;
FutureCallback callback;
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.PING, true, "test"), callback);
assertThat(callback.isDone(), is(true));
assertThat(demand, is(1));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, true, "test"), callback);
assertThat(callback.isDone(), is(false));
assertThat(binaryMessages.size(), is(1));
assertThat(BufferUtil.toString(binaryMessages.get(0)), is("test"));
assertThat(callbacks.size(), is(1));
callbacks.get(0).succeeded();
assertThat(demand, is(1));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, false, "Hello"), callback);
assertThat(callback.isDone(), is(true));
assertThat(binaryMessages.size(), is(1));
assertThat(callbacks.size(), is(1));
assertThat(demand, is(2));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.CONTINUATION, false, " "), callback);
assertThat(callback.isDone(), is(true));
assertThat(binaryMessages.size(), is(1));
assertThat(callbacks.size(), is(1));
assertThat(demand, is(3));
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.CONTINUATION, true, "World"), callback);
assertThat(callback.isDone(), is(false));
assertThat(binaryMessages.size(), is(2));
assertThat(BufferUtil.toString(binaryMessages.get(1)), is("Hello World"));
assertThat(callbacks.size(), is(2));
callbacks.get(1).succeeded();
assertThat(callback.isDone(), is(true));
FutureCallback finalCallback = callback;
assertDoesNotThrow(() -> finalCallback.get());
assertThat(demand, is(3));
assertThat(frames.size(), is(0));
}
@Test
public void testBinaryNotTooLarge()
{
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
coreSession.setMaxBinaryMessageSize(4);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, true, "test"), callback);
@ -567,7 +454,7 @@ public class MessageHandlerTest
{
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
coreSession.setMaxBinaryMessageSize(4);
handler.onOpen(coreSession, NOOP);
callback = new FutureCallback();
@ -586,7 +473,7 @@ public class MessageHandlerTest
{
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
coreSession.setMaxBinaryMessageSize(4);
handler.onOpen(coreSession, NOOP);
callback = new FutureCallback();

View File

@ -59,7 +59,7 @@ public class ParserCapture
ByteBufferPool bufferPool = new MappedByteBufferPool();
ExtensionStack exStack = new ExtensionStack(new WebSocketExtensionRegistry(), Behavior.SERVER);
exStack.negotiate(new DecoratedObjectFactory(), bufferPool, new LinkedList<>(), new LinkedList<>());
this.coreSession = new WebSocketCoreSession(new AbstractTestFrameHandler(), behavior, Negotiated.from(exStack));
this.coreSession = new WebSocketCoreSession(new TestMessageHandler(), behavior, Negotiated.from(exStack));
this.parser = new Parser(bufferPool, coreSession);
}

View File

@ -0,0 +1,94 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class TestMessageHandler extends MessageHandler
{
protected final Logger LOG = Log.getLogger(TestMessageHandler.class);
public CoreSession coreSession;
public BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> binaryMessages = new BlockingArrayQueue<>();
public volatile Throwable error;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", coreSession);
this.coreSession = coreSession;
super.onOpen(coreSession, callback);
openLatch.countDown();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onFrame {}", frame);
super.onFrame(frame, callback);
}
@Override
public void onError(Throwable cause, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onError {}", cause);
super.onError(cause, callback);
error = cause;
errorLatch.countDown();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onClosed {}", closeStatus);
super.onClosed(closeStatus, callback);
closeLatch.countDown();
}
@Override
protected void onText(String message, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onText {}", message);
textMessages.offer(message);
}
@Override
protected void onBinary(ByteBuffer message, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("onBinary {}", message);
binaryMessages.offer(message);
}
}

View File

@ -14,89 +14,38 @@
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
//
package org.eclipse.jetty.websocket.core.autobahn;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.time.Duration;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractTestFrameHandler;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import static org.eclipse.jetty.websocket.core.OpCode.TEXT;
class AutobahnFrameHandler extends AbstractTestFrameHandler
public class AutobahnFrameHandler extends TestMessageHandler
{
private static Logger LOG = Log.getLogger(AutobahnFrameHandler.class);
private AtomicBoolean open = new AtomicBoolean(false);
@Override
public void onOpen()
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.info("onOpen {}", getCoreSession());
if (!open.compareAndSet(false, true))
throw new IllegalStateException();
}
int count;
@Override
public void onText(Utf8StringBuilder utf8, Callback callback, boolean fin)
{
LOG.debug("onText {} {} {} {}", count++, utf8.length(), fin, getCoreSession());
if (fin)
{
getCoreSession().sendFrame(new Frame(TEXT).setPayload(utf8.toString()),
callback,
false);
}
else
{
callback.succeeded();
}
coreSession.setIdleTimeout(Duration.ofSeconds(5));
coreSession.setMaxTextMessageSize(Integer.MAX_VALUE);
coreSession.setMaxBinaryMessageSize(Integer.MAX_VALUE);
coreSession.setMaxFrameSize(WebSocketConstants.DEFAULT_MAX_FRAME_SIZE * 2);
super.onOpen(coreSession, callback);
}
@Override
public void onBinary(ByteBuffer payload, Callback callback, boolean fin)
public void onBinary(ByteBuffer wholeMessage, Callback callback)
{
LOG.debug("onBinary {} {} {}", payload == null ? -1 : payload.remaining(), fin, getCoreSession());
if (fin)
{
Frame echo = new Frame(OpCode.BINARY);
if (payload != null)
echo.setPayload(payload);
getCoreSession().sendFrame(echo, callback, false);
}
else
{
callback.succeeded();
}
sendBinary(wholeMessage, callback, false);
}
@Override
public void onClosed(CloseStatus closeStatus)
public void onText(String wholeMessage, Callback callback)
{
LOG.info("onClosed {}", closeStatus);
if (!open.compareAndSet(true, false))
LOG.warn("Already closed or not open {}", closeStatus);
}
@Override
public void onError(Throwable cause)
{
if (cause instanceof WebSocketTimeoutException && open.get())
LOG.info("timeout!");
else
LOG.warn("onError", cause);
sendText(wholeMessage, callback, false);
}
}

View File

@ -16,24 +16,26 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn.client;
package org.eclipse.jetty.websocket.core.autobahn;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.UrlEncoded;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* WebSocket Client for use with <a href="https://github.com/crossbario/autobahn-testsuite">autobahn websocket testsuite</a> (wstest).
* <p>
@ -70,52 +72,19 @@ import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
*/
public class AutobahnWebSocketClient
{
private static final int MBYTE = 1024 * 1024;
private static String getJettyVersion() throws IOException
{
String resource = "META-INF/maven/org.eclipse.jetty.websocket/websocket-core/pom.properties";
URL url = Thread.currentThread().getContextClassLoader().getResource(resource);
if (url == null)
{
if (AutobahnWebSocketClient.class.getPackage() != null)
{
Package pkg = AutobahnWebSocketClient.class.getPackage();
if (pkg.getImplementationVersion() != null)
{
return pkg.getImplementationVersion();
}
}
return "GitMaster";
}
try (InputStream in = url.openStream())
{
Properties props = new Properties();
props.load(in);
return props.getProperty("version");
}
}
public static void main(String[] args)
{
String hostname = "localhost";
int port = 9001;
if (args.length > 0)
{
hostname = args[0];
}
if (args.length > 1)
{
port = Integer.parseInt(args[1]);
}
int caseNumbers[] = null;
// Optional case numbers
// NOTE: these are url query parameter case numbers (whole integers, eg "6"), not the case ids (eg "7.3.1")
int[] caseNumbers = null;
if (args.length > 2)
{
int offset = 2;
@ -129,82 +98,71 @@ public class AutobahnWebSocketClient
AutobahnWebSocketClient client = null;
try
{
String userAgent = "JettyWebsocketClient/" + getJettyVersion();
String userAgent = "JettyWebsocketClient/" + Jetty.VERSION;
client = new AutobahnWebSocketClient(hostname, port, userAgent);
client.updateStatus("Running test suite...");
client.updateStatus("Using Fuzzing Server: %s:%d", hostname, port);
client.updateStatus("User Agent: %s", userAgent);
LOG.info("Running test suite...");
LOG.info("Using Fuzzing Server: {}:{}", hostname, port);
LOG.info("User Agent: {}", userAgent);
if (caseNumbers == null)
{
int caseCount = client.getCaseCount();
client.updateStatus("Will run all %d cases ...", caseCount);
LOG.info("Will run all {} cases ...", caseCount);
for (int caseNum = 1; caseNum <= caseCount; caseNum++)
{
client.updateStatus("Running case %d (of %d) ...", caseNum, caseCount);
LOG.info("Running case {} (of {}) ...", caseNum, caseCount);
client.runCaseByNumber(caseNum);
}
}
else
{
client.updateStatus("Will run %d cases ...", caseNumbers.length);
LOG.info("Will run %d cases ...", caseNumbers.length);
for (int caseNum : caseNumbers)
{
client.runCaseByNumber(caseNum);
}
}
client.updateStatus("All test cases executed.");
LOG.info("All test cases executed.");
client.updateReports();
}
catch (Throwable t)
{
t.printStackTrace(System.err);
LOG.warn("Test Failed", t);
}
finally
{
if (client != null)
{
client.shutdown();
}
}
System.exit(0);
}
private Logger log;
private static final Logger LOG = Log.getLogger(AutobahnWebSocketClient.class);
private URI baseWebsocketUri;
private WebSocketCoreClient client;
private String hostname;
private int port;
private String userAgent;
public AutobahnWebSocketClient(String hostname, int port, String userAgent) throws Exception
{
this.log = Log.getLogger(this.getClass());
this.hostname = hostname;
this.port = port;
this.userAgent = userAgent;
this.baseWebsocketUri = new URI("ws://" + hostname + ":" + port);
this.client = new WebSocketCoreClient();
// TODO: this should be enabled by default
// this.client.getExtensionFactory().register("permessage-deflate",PerMessageDeflateExtension.class);
this.client.start();
}
public int getCaseCount() throws IOException, InterruptedException, ExecutionException, TimeoutException
public int getCaseCount() throws IOException, InterruptedException
{
URI wsUri = baseWebsocketUri.resolve("/getCaseCount");
GetCaseCountHandler onCaseCount = new GetCaseCountHandler();
TestMessageHandler onCaseCount = new TestMessageHandler();
Future<FrameHandler.CoreSession> response = client.connect(onCaseCount, wsUri);
if (waitForUpgrade(wsUri, response))
{
onCaseCount.awaitMessage();
if (onCaseCount.hasCaseCount())
{
return onCaseCount.getCaseCount();
}
String msg = onCaseCount.textMessages.poll(10, TimeUnit.SECONDS);
onCaseCount.getCoreSession().abort(); // Don't expect normal close
assertTrue(onCaseCount.closeLatch.await(2, TimeUnit.SECONDS));
assertNotNull(msg);
return Integer.decode(msg);
}
throw new IllegalStateException("Unable to get Case Count");
}
@ -212,14 +170,18 @@ public class AutobahnWebSocketClient
public void runCaseByNumber(int caseNumber) throws IOException, InterruptedException
{
URI wsUri = baseWebsocketUri.resolve("/runCase?case=" + caseNumber + "&agent=" + UrlEncoded.encodeString(userAgent));
log.info("test uri: {}", wsUri);
EchoHandler onEchoMessage = new EchoHandler(caseNumber);
Future<FrameHandler.CoreSession> response = client.connect(onEchoMessage, wsUri);
LOG.info("test uri: {}", wsUri);
AutobahnFrameHandler echoHandler = new AutobahnFrameHandler();
Future<FrameHandler.CoreSession> response = client.connect(echoHandler, wsUri);
if (waitForUpgrade(wsUri, response))
{
onEchoMessage.awaitClose();
// Wait up to 5 min as some of the tests can take a while
if (!echoHandler.closeLatch.await(5, TimeUnit.MINUTES))
{
LOG.warn("could not close {}, aborting session", echoHandler);
echoHandler.coreSession.abort();
}
}
}
@ -227,26 +189,23 @@ public class AutobahnWebSocketClient
{
try
{
this.client.stop();
client.stop();
}
catch (Exception e)
{
log.warn("Unable to stop WebSocketClient", e);
LOG.warn("Unable to stop WebSocketClient", e);
}
}
public void updateReports() throws IOException, InterruptedException, ExecutionException, TimeoutException
{
URI wsUri = baseWebsocketUri.resolve("/updateReports?agent=" + UrlEncoded.encodeString(userAgent));
UpdateReportsHandler onUpdateReports = new UpdateReportsHandler();
TestMessageHandler onUpdateReports = new TestMessageHandler();
Future<FrameHandler.CoreSession> response = client.connect(onUpdateReports, wsUri);
response.get(5, TimeUnit.SECONDS);
onUpdateReports.awaitClose();
}
public void updateStatus(String format, Object... args)
{
log.info(String.format(format, args));
assertTrue(onUpdateReports.closeLatch.await(15, TimeUnit.SECONDS));
LOG.info("Reports updated.");
LOG.info("Test suite finished!");
}
private boolean waitForUpgrade(URI wsUri, Future<FrameHandler.CoreSession> response) throws InterruptedException
@ -256,14 +215,9 @@ public class AutobahnWebSocketClient
response.get(10, TimeUnit.SECONDS);
return true;
}
catch (ExecutionException e)
catch (Throwable t)
{
log.warn("Unable to connect to: " + wsUri, e);
return false;
}
catch (TimeoutException e)
{
log.warn("Unable to connect to: " + wsUri, e);
LOG.warn("Unable to connect to: " + wsUri, t);
return false;
}
}

View File

@ -1,110 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
class AutobahnWebSocketNegotiator implements WebSocketNegotiator
{
final DecoratedObjectFactory objectFactory;
final WebSocketExtensionRegistry extensionRegistry;
final ByteBufferPool bufferPool;
public AutobahnWebSocketNegotiator(DecoratedObjectFactory objectFactory, WebSocketExtensionRegistry extensionRegistry, ByteBufferPool bufferPool)
{
this.objectFactory = objectFactory;
this.extensionRegistry = extensionRegistry;
this.bufferPool = bufferPool;
}
@Override
public FrameHandler negotiate(Negotiation negotiation) throws IOException
{
// Finalize negotiations in API layer involves:
// TODO need access to real request/response????
// + MAY read request and set response headers
// + MAY reject with sendError semantics
// + MAY change/add/remove offered extensions
// + MUST pick subprotocol
// + MUST return the FrameHandler or null or exception?
// Examples of those steps are below:
// + MAY read request and set response headers
String special = negotiation.getRequest().getHeader("MySpecialHeader");
if (special != null)
negotiation.getResponse().setHeader("MySpecialHeader", "OK:" + special);
// + MAY reject with sendError semantics
if ("abort".equals(special))
{
negotiation.getResponse().sendError(401, "Some Auth reason");
return null;
}
// + MAY change extensions by mutating response headers
List<ExtensionConfig> offeredExtensions = negotiation.getOfferedExtensions();
// negotiateResponse.addHeader(HttpHeader.SEC_WEBSOCKET_EXTENSIONS.asString(),"@identity");
// + MUST pick subprotocol
List<String> subprotocols = negotiation.getOfferedSubprotocols();
String subprotocol = (subprotocols == null || subprotocols.isEmpty()) ? null : subprotocols.get(0);
negotiation.setSubprotocol(subprotocol);
// + MUST return the FrameHandler or null or exception?
return new AutobahnFrameHandler();
}
@Override
public void customize(FrameHandler.Configuration configurable)
{
configurable.setIdleTimeout(Duration.ofMillis(10000));
configurable.setMaxTextMessageSize(Integer.MAX_VALUE);
configurable.setMaxBinaryMessageSize(Integer.MAX_VALUE);
configurable.setMaxFrameSize(65536 * 2);
}
@Override
public WebSocketExtensionRegistry getExtensionRegistry()
{
return extensionRegistry;
}
@Override
public DecoratedObjectFactory getObjectFactory()
{
return objectFactory;
}
@Override
public ByteBufferPool getByteBufferPool()
{
return bufferPool;
}
}

View File

@ -18,14 +18,9 @@
package org.eclipse.jetty.websocket.core.autobahn;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.core.TestUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
/**
@ -68,25 +63,16 @@ public class AutobahnWebSocketServer
{
int port = 9001; // same port as found in fuzzing-client.json
if (args != null && args.length > 0)
{
port = Integer.parseInt(args[0]);
}
Server server = new Server(port);
ServerConnector connector = new ServerConnector(
server,
new HttpConnectionFactory()
);
ServerConnector connector = new ServerConnector(server);
connector.setIdleTimeout(10000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator =
new AutobahnWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(), connector.getByteBufferPool());
WebSocketUpgradeHandler handler = new TestUpgradeHandler(negotiator);
WebSocketUpgradeHandler handler = new WebSocketUpgradeHandler((neg) -> new AutobahnFrameHandler());
context.setHandler(handler);
server.start();

View File

@ -1,96 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn.client;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractTestFrameHandler;
import org.eclipse.jetty.websocket.core.CloseStatus;
public abstract class AbstractClientFrameHandler extends AbstractTestFrameHandler
{
protected final Logger LOG;
public AbstractClientFrameHandler()
{
LOG = Log.getLogger(this.getClass());
}
@Override
public void onOpen()
{
LOG.info("onOpen({})", getCoreSession());
}
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.debug("onClosed({})", closeStatus);
}
@Override
public void onText(Utf8StringBuilder utf8, Callback callback, boolean fin)
{
LOG.debug("onText len={} fin={}", utf8.length(), fin);
if (fin)
onWholeText(utf8.toString());
callback.succeeded();
}
protected void onWholeText(String message)
{
}
@Override
public void onBinary(ByteBuffer payload, Callback callback, boolean fin)
{
if (fin)
onWholeBinary(payload);
callback.succeeded();
}
protected void onWholeBinary(ByteBuffer payload)
{
}
/**
* Make a copy of a byte buffer.
* <p>
* This is important in some tests, as the underlying byte buffer contained in a Frame can be modified through
* masking and make it difficult to compare the results in the fuzzer.
*
* @param payload the payload to copy
* @return a new byte array of the payload contents
*/
@SuppressWarnings("Duplicates")
public ByteBuffer copyOf(ByteBuffer payload)
{
if (payload == null)
return null;
ByteBuffer copy = ByteBuffer.allocate(payload.remaining());
copy.put(payload.slice());
copy.flip();
return copy;
}
}

View File

@ -1,105 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn.client;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
public class EchoHandler extends AbstractClientFrameHandler
{
private final int currentCaseId;
private CountDownLatch latch = new CountDownLatch(1);
public EchoHandler(int currentCaseId)
{
this.currentCaseId = currentCaseId;
}
public void awaitClose() throws InterruptedException
{
latch.await(5, TimeUnit.SECONDS);
}
@Override
public void onOpen()
{
super.onOpen();
LOG.info("Executing test case {}", currentCaseId);
}
int count;
@Override
public void onText(Utf8StringBuilder utf8, Callback callback, boolean fin)
{
LOG.debug("onText {} {} {} {}", count++, utf8.length(), fin, getCoreSession());
if (fin)
{
Frame echo = new Frame(OpCode.TEXT).setPayload(utf8.toString());
LOG.debug("onText echo {}", echo);
getCoreSession().sendFrame(echo, callback, false);
}
else
{
callback.succeeded();
}
}
@Override
public void onBinary(ByteBuffer payload, Callback callback, boolean fin)
{
LOG.debug("onBinary {} {} {}", payload == null ? -1 : payload.remaining(), fin, getCoreSession());
if (fin)
{
Frame echo = new Frame(OpCode.BINARY);
if (payload != null)
echo.setPayload(payload);
getCoreSession().sendFrame(echo, callback, false);
}
else
{
callback.succeeded();
}
}
@Override
public void onError(Throwable cause)
{
if (cause instanceof WebSocketTimeoutException)
LOG.debug("timeout!");
else
LOG.warn("onError", cause);
}
@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
super.onClosed(closeStatus);
latch.countDown();
}
}

View File

@ -1,54 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Get the autobahn case count
*/
public class GetCaseCountHandler extends AbstractClientFrameHandler
{
private Integer casecount = null;
private CountDownLatch latch = new CountDownLatch(1);
public void awaitMessage() throws InterruptedException
{
latch.await(1, TimeUnit.SECONDS);
}
public int getCaseCount()
{
return casecount.intValue();
}
public boolean hasCaseCount()
{
return (casecount != null);
}
@Override
protected void onWholeText(String message)
{
casecount = Integer.decode(message);
latch.countDown();
LOG.info("Running {} Autobahn testcase", casecount);
}
}

View File

@ -1,51 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core.autobahn.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.core.CloseStatus;
public class UpdateReportsHandler extends AbstractClientFrameHandler
{
private CountDownLatch latch = new CountDownLatch(1);
public void awaitClose() throws InterruptedException
{
latch.await(15, TimeUnit.SECONDS);
}
@Override
public void onOpen()
{
super.onOpen();
LOG.info("Updating reports ...");
}
@Override
public void onClosed(CloseStatus closeStatus)
{
super.onClosed(closeStatus);
LOG.debug("onClose({})", closeStatus);
LOG.info("Reports updated.");
LOG.info("Test suite finished!");
latch.countDown();
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -71,7 +72,7 @@ public class ChatWebSocketServer
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.debug("onOpen {}", coreSession);
setMaxTextMessageSize(2 * 1024);
coreSession.setMaxTextMessageSize(2 * 1024);
super.onOpen(coreSession, Callback.from(() ->
{
members.add(this);
@ -123,7 +124,7 @@ public class ChatWebSocketServer
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
throws IOException, ServletException
{
response.setStatus(200);
response.setContentType("text/plain");

View File

@ -39,7 +39,6 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.AbstractTestFrameHandler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CapturedHexPayloads;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
@ -47,6 +46,7 @@ import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingNetworkBytesCapture;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Generator;
@ -421,7 +421,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
ExtensionStack exStack = new ExtensionStack(new WebSocketExtensionRegistry(), Behavior.SERVER);
exStack.negotiate(new DecoratedObjectFactory(), bufferPool, new LinkedList<>(), new LinkedList<>());
WebSocketCoreSession coreSession = new WebSocketCoreSession(new AbstractTestFrameHandler(), Behavior.SERVER, Negotiated.from(exStack));
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack));
coreSession.setMaxFrameSize(maxMessageSize);
return coreSession;
}

View File

@ -28,13 +28,13 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.core.AbstractTestFrameHandler;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
@ -157,7 +157,7 @@ public class ExtensionTool
ByteBufferPool bufferPool = new MappedByteBufferPool();
ExtensionStack exStack = new ExtensionStack(new WebSocketExtensionRegistry(), Behavior.SERVER);
exStack.negotiate(new DecoratedObjectFactory(), bufferPool, new LinkedList<>(), new LinkedList<>());
WebSocketCoreSession coreSession = new WebSocketCoreSession(new AbstractTestFrameHandler(), Behavior.SERVER, Negotiated.from(exStack));
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack));
return coreSession;
}
}