Issue #7351 - large WebSocket payloads with permessage-deflate hang (#7360)

PerMessageDeflateExtension and FragmentExtensions now intercept demand for incoming frames. These extensions may fragment a single frame into many frames, so they must wait until a new frame has been demanded before forwarding the next synthetic frame to the application.
This commit is contained in:
Lachlan 2022-01-19 10:47:45 +11:00 committed by GitHub
parent 113cafafef
commit 2b41e4d231
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 941 additions and 140 deletions

View File

@ -21,14 +21,16 @@ import org.eclipse.jetty.util.Callback;
public interface IncomingFrames
{
/**
* Process the incoming frame.
* <p>
* Note: if you need to hang onto any information from the frame, be sure
* to copy it, as the information contained in the Frame will be released
* and/or reused by the implementation.
* <p>Process the incoming frame.</p>
*
* @param frame the frame to process
* @param callback the read completion
* <p>Note: if you need to hang onto any information from the frame, be sure
* to copy it, as the information contained in the Frame will be released
* and/or reused by the implementation.</p>
*
* <p>Failure of the callback will propagate the failure back to the {@link CoreSession}
* to fail the connection and attempt to send a close {@link Frame} if one has not been sent.</p>
* @param frame the frame to process.
* @param callback the read completion.
*/
void onFrame(Frame frame, Callback callback);
}

View File

@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core.internal;
import java.util.function.LongConsumer;
import org.eclipse.jetty.websocket.core.Extension;
/**
* This is extended by an {@link Extension} so it can intercept demand calls.
* Demand is called by the application and the call is forwarded through the {@link ExtensionStack}
* for every {@link Extension} which implements this interface.
*/
public interface DemandChain
{
void demand(long n);
default void setNextDemand(LongConsumer nextDemand)
{
}
}

View File

@ -0,0 +1,181 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core.internal;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
/**
* <p>This flusher can be used to mutated and fragment {@link Frame}s and forwarded them on towards the application using the
* {@link IncomingFrames} provided in the constructor. This can split a single incoming frame into n {@link Frame}s which are
* passed on to the {@link IncomingFrames} one at a time.</p>
*
* <p>The asynchronous operation performed by this {@link IteratingCallback} is demanding from upper layer after which
* {@link #onFrame(Frame, Callback)} will called with the new content.</p>
*
* <p>This flusher relies on the interception of demand, and because of this it can only be used in an {@link Extension} which
* implements the {@link DemandChain} interface. The methods of {@link DemandChain} from the {@link Extension}
* must be forwarded to this flusher.</p>
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
private LongConsumer _nextDemand;
private Frame _frame;
private Callback _callback;
private boolean _needContent = true;
private boolean _first = true;
/**
* @param emitFrame where frames generated by {@link #handle(Frame, Callback, boolean)} are forwarded.
*/
public DemandingFlusher(IncomingFrames emitFrame)
{
_emitFrame = emitFrame;
}
/**
* <p>Called when there is demand for a single frame to be produced. During this method a single call can be made
* to {@link #emitFrame(Frame, Callback)} which will forward this frame towards the application. Returning true
* from this method signals that you are done processing the current Frame, and the next invocation of this
* method will have the next frame.</p>
*
* <p>Note that the callback supplied here is specially wrapped so that you can call
* it multiple times and it will not be completed more than once. This simplifies the
* handling of failure cases.</p>
* @param frame the original frame.
* @param callback to succeed to release the frame payload.
* @param first if this is the first time this method has been called for this frame.
* @return false to continue processing this frame, true to complete processing and get a new frame.
*/
protected abstract boolean handle(Frame frame, Callback callback, boolean first);
@Override
public void demand(long n)
{
_demand.getAndUpdate(d -> Math.addExact(d, n));
iterate();
}
@Override
public void setNextDemand(LongConsumer nextDemand)
{
_nextDemand = nextDemand;
}
/**
* Used to supply the flusher with a new frame. This frame should only arrive if demanded
* through the {@link LongConsumer} provided by {@link #setNextDemand(LongConsumer)}.
* @param frame the WebSocket frame.
* @param callback to release frame payload.
*/
public void onFrame(Frame frame, Callback callback)
{
if (_frame != null || _callback != null)
throw new IllegalStateException("Not expecting onFrame");
_frame = frame;
_callback = new CountingCallback(callback, 1);
succeeded();
}
/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
if (_failure.compareAndSet(null, t))
{
failed(t);
iterate();
}
}
/**
* <p>This is used within an implementation of {@link #handle(Frame, Callback, boolean)}
* to forward a frame onto the next layer of processing.</p>
*
* <p>This method should only be called ONCE within each invocation of {@link #handle(Frame, Callback, boolean)}
* otherwise</p>
* @param frame the WebSocket frame.
* @param callback to release frame payload.
*/
public void emitFrame(Frame frame, Callback callback)
{
if (_demand.decrementAndGet() < 0)
throw new IllegalStateException("Negative Demand");
_emitFrame.onFrame(frame, callback);
}
@Override
protected Action process() throws Throwable
{
while (true)
{
Throwable failure = _failure.get();
if (failure != null)
throw failure;
if (_demand.get() <= 0)
break;
if (_needContent)
{
_needContent = false;
_nextDemand.accept(1);
return Action.SCHEDULED;
}
boolean first = _first;
_first = false;
boolean needContent = handle(_frame, _callback, first);
if (needContent)
{
_needContent = true;
_first = true;
_frame = null;
_callback = null;
}
}
return Action.IDLE;
}
@Override
protected void onCompleteFailure(Throwable cause)
{
Throwable suppressed = _failure.getAndSet(cause);
if (suppressed != null && suppressed != cause)
cause.addSuppressed(suppressed);
// This is wrapped with CountingCallback so protects against double succeed/failed.
if (_callback != null)
_callback.failed(cause);
_frame = null;
_callback = null;
}
}

View File

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import org.eclipse.jetty.http.BadMessageException;
@ -50,6 +51,8 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
private IncomingFrames incoming;
private OutgoingFrames outgoing;
private final Extension[] rsvClaims = new Extension[3];
private LongConsumer lastDemand;
private DemandChain demandChain = n -> lastDemand.accept(n);
public ExtensionStack(WebSocketComponents components, Behavior behavior)
{
@ -199,7 +202,7 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
rsvClaims[2] = ext;
}
// Wire up Extensions
// Wire up Extensions and DemandChain.
if ((extensions != null) && (extensions.size() > 0))
{
ListIterator<Extension> exts = extensions.listIterator();
@ -210,6 +213,13 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
Extension ext = exts.next();
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
if (ext instanceof DemandChain)
{
DemandChain demandingExtension = (DemandChain)ext;
demandingExtension.setNextDemand(demandChain::demand);
demandChain = demandingExtension;
}
}
// Connect incomingFrames
@ -253,6 +263,16 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
}
}
public void demand(long n)
{
demandChain.demand(n);
}
public void setLastDemand(LongConsumer lastDemand)
{
this.lastDemand = lastDemand;
}
public Extension getRsv1User()
{
return rsvClaims[0];

View File

@ -13,11 +13,15 @@
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,16 +29,17 @@ import org.slf4j.LoggerFactory;
/**
* Fragment Extension
*/
public class FragmentExtension extends AbstractExtension
public class FragmentExtension extends AbstractExtension implements DemandChain
{
private static final Logger LOG = LoggerFactory.getLogger(FragmentExtension.class);
private final FragmentingFlusher flusher;
private final FragmentingFlusher outgoingFlusher;
private final DemandingFlusher incomingFlusher;
private final Configuration configuration = new Configuration.ConfigurationCustomizer();
public FragmentExtension()
{
flusher = new FragmentingFlusher(configuration)
outgoingFlusher = new FragmentingFlusher(configuration)
{
@Override
void forwardFrame(Frame frame, Callback callback, boolean batch)
@ -42,6 +47,20 @@ public class FragmentExtension extends AbstractExtension
nextOutgoingFrame(frame, callback, batch);
}
};
incomingFlusher = new FragmentingDemandingFlusher();
}
@Override
public void demand(long n)
{
incomingFlusher.demand(n);
}
@Override
public void setNextDemand(LongConsumer nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}
@Override
@ -53,13 +72,13 @@ public class FragmentExtension extends AbstractExtension
@Override
public void onFrame(Frame frame, Callback callback)
{
nextIncomingFrame(frame, callback);
incomingFlusher.onFrame(frame, callback);
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
flusher.sendFrame(frame, callback, batch);
outgoingFlusher.sendFrame(frame, callback, batch);
}
@Override
@ -69,4 +88,68 @@ public class FragmentExtension extends AbstractExtension
int maxLength = config.getParameter("maxLength", -1);
configuration.setMaxFrameSize(maxLength);
}
public class FragmentingDemandingFlusher extends DemandingFlusher
{
public FragmentingDemandingFlusher()
{
super(FragmentExtension.this::nextIncomingFrame);
}
@Override
protected boolean handle(Frame frame, Callback callback, boolean first)
{
if (first)
{
if (OpCode.isControlFrame(frame.getOpCode()))
{
emitFrame(frame, callback);
return true;
}
}
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
long maxFrameSize = configuration.getMaxFrameSize();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);
boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first;
Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode());
boolean finished = (maxFrameSize <= 0 || remaining <= maxFrameSize);
fragment.setFin(frame.isFin() && finished);
if (finished)
{
// If finished we don't need to fragment, forward original payload.
fragment.setPayload(payload);
}
else
{
// Slice the fragmented payload from the buffer.
int limit = payload.limit();
int newLimit = payload.position() + fragmentSize;
payload.limit(newLimit);
ByteBuffer payloadFragment = payload.slice();
payload.limit(limit);
fragment.setPayload(payloadFragment);
payload.position(newLimit);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
}
Callback payloadCallback = Callback.from(() ->
{
if (finished)
callback.succeeded();
}, t ->
{
// This is wrapped with CountingCallback so will only be failed once.
callback.failed(t);
failFlusher(t);
});
emitFrame(fragment, payloadCallback);
return finished;
}
}
}

View File

@ -16,6 +16,8 @@ package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@ -40,15 +42,15 @@ import org.slf4j.LoggerFactory;
* <p>
* Attempts to follow <a href="https://tools.ietf.org/html/rfc7692">Compression Extensions for WebSocket</a>
*/
public class PerMessageDeflateExtension extends AbstractExtension
public class PerMessageDeflateExtension extends AbstractExtension implements DemandChain
{
private static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF};
private static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
private static final Logger LOG = LoggerFactory.getLogger(PerMessageDeflateExtension.class);
private static final int DEFAULT_BUF_SIZE = 8 * 1024;
private final TransformingFlusher outgoingFlusher;
private final TransformingFlusher incomingFlusher;
private final OutgoingFlusher outgoingFlusher;
private final IncomingFlusher incomingFlusher;
private DeflaterPool.Entry deflaterHolder;
private InflaterPool.Entry inflaterHolder;
private boolean incomingCompressed;
@ -88,7 +90,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
@Override
public void onFrame(Frame frame, Callback callback)
{
incomingFlusher.sendFrame(frame, callback, false);
incomingFlusher.onFrame(frame, callback);
}
@Override
@ -236,6 +238,18 @@ public class PerMessageDeflateExtension extends AbstractExtension
super.nextOutgoingFrame(frame, callback, batch);
}
@Override
public void setNextDemand(LongConsumer nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}
@Override
public void demand(long n)
{
incomingFlusher.demand(n);
}
private class OutgoingFlusher extends TransformingFlusher
{
private boolean _first;
@ -291,7 +305,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
if (buffer.limit() == bufferSize)
{
// We need to fragment. TODO: what if there was only bufferSize of content?
// We need to fragment.
if (!getConfiguration().isAutoFragment())
throw new MessageTooLargeException("Deflated payload exceeded the compress buffer size");
break;
@ -335,66 +349,59 @@ public class PerMessageDeflateExtension extends AbstractExtension
}
}
private class IncomingFlusher extends TransformingFlusher
private class IncomingFlusher extends DemandingFlusher
{
private boolean _first;
private Frame _frame;
private boolean _tailBytes;
private AtomicReference<ByteBuffer> _payloadRef = new AtomicReference<>();
@Override
protected boolean onFrame(Frame frame, Callback callback, boolean batch)
public IncomingFlusher()
{
_tailBytes = false;
_first = true;
_frame = frame;
if (OpCode.isControlFrame(_frame.getOpCode()))
{
nextIncomingFrame(_frame, callback);
return true;
}
// This extension requires the RSV1 bit set only in the first frame.
// Subsequent continuation frames don't have RSV1 set, but are compressed.
switch (_frame.getOpCode())
{
case OpCode.TEXT:
case OpCode.BINARY:
incomingCompressed = _frame.isRsv1();
break;
case OpCode.CONTINUATION:
if (_frame.isRsv1())
throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame");
break;
default:
break;
}
if (_first && !incomingCompressed)
{
nextIncomingFrame(_frame, callback);
return true;
}
if (_frame.isFin())
incomingCompressed = false;
// Provide the frames payload as input to the Inflater.
getInflater().setInput(_frame.getPayload().slice());
callback.succeeded();
return false;
super(PerMessageDeflateExtension.this::nextIncomingFrame);
}
@Override
protected boolean transform(Callback callback)
protected boolean handle(Frame frame, Callback callback, boolean first)
{
if (first)
{
if (OpCode.isControlFrame(frame.getOpCode()))
{
emitFrame(frame, callback);
return true;
}
// This extension requires the RSV1 bit set only in the first frame.
// Subsequent continuation frames don't have RSV1 set, but are compressed.
switch (frame.getOpCode())
{
case OpCode.TEXT:
case OpCode.BINARY:
incomingCompressed = frame.isRsv1();
break;
case OpCode.CONTINUATION:
if (frame.isRsv1())
throw new ProtocolException("Invalid RSV1 set on permessage-deflate CONTINUATION frame");
break;
default:
break;
}
if (!incomingCompressed)
{
emitFrame(frame, callback);
return true;
}
// Provide the frames payload as input to the Inflater.
_tailBytes = false;
getInflater().setInput(frame.getPayload().slice());
}
try
{
boolean finished = inflate(callback);
_first = false;
return finished;
return inflate(frame, callback, first);
}
catch (DataFormatException e)
{
@ -402,18 +409,18 @@ public class PerMessageDeflateExtension extends AbstractExtension
}
}
private boolean inflate(Callback callback) throws DataFormatException
private boolean inflate(Frame frame, Callback callback, boolean first) throws DataFormatException
{
// Get a buffer for the inflated payload.
long maxFrameSize = getConfiguration().getMaxFrameSize();
int bufferSize = (maxFrameSize <= 0) ? inflateBufferSize : (int)Math.min(maxFrameSize, inflateBufferSize);
final ByteBuffer payload = getBufferPool().acquire(bufferSize, false);
callback = Callback.from(callback, () -> getBufferPool().release(payload));
ByteBuffer payload = getBufferPool().acquire(bufferSize, false);
_payloadRef = new AtomicReference<>(payload);
BufferUtil.clear(payload);
// Fill up the ByteBuffer with a max length of bufferSize;
boolean finished = false;
Inflater inflater = getInflater();
boolean complete = false;
while (true)
{
int decompressed = inflater.inflate(payload.array(), payload.arrayOffset() + payload.position(), bufferSize - payload.position());
@ -423,7 +430,7 @@ public class PerMessageDeflateExtension extends AbstractExtension
if (payload.limit() == bufferSize)
{
// We need to fragment. TODO: what if there was only bufferSize of content?
// We need to fragment.
if (!getConfiguration().isAutoFragment())
throw new MessageTooLargeException("Inflated payload exceeded the decompress buffer size");
break;
@ -431,29 +438,47 @@ public class PerMessageDeflateExtension extends AbstractExtension
if (decompressed == 0)
{
if (!_tailBytes && _frame.isFin())
if (!_tailBytes && frame.isFin())
{
inflater.setInput(TAIL_BYTES_BUF.slice());
_tailBytes = true;
continue;
}
finished = true;
complete = true;
break;
}
}
Frame chunk = new Frame(_first ? _frame.getOpCode() : OpCode.CONTINUATION);
Frame chunk = new Frame(first ? frame.getOpCode() : OpCode.CONTINUATION);
chunk.setRsv1(false);
chunk.setPayload(payload);
chunk.setFin(_frame.isFin() && finished);
chunk.setFin(frame.isFin() && complete);
nextIncomingFrame(chunk, callback);
boolean succeedCallback = complete;
AtomicReference<ByteBuffer> payloadRef = _payloadRef;
Callback payloadCallback = Callback.from(() ->
{
getBufferPool().release(payloadRef.getAndSet(null));
if (succeedCallback)
callback.succeeded();
}, t ->
{
getBufferPool().release(payloadRef.getAndSet(null));
failFlusher(t);
});
emitFrame(chunk, payloadCallback);
if (LOG.isDebugEnabled())
LOG.debug("Decompress finished: {} {}", finished, chunk);
LOG.debug("Decompress finished: {} {}", complete, chunk);
return complete;
}
return finished;
@Override
protected void onCompleteFailure(Throwable cause)
{
getBufferPool().release(_payloadRef.getAndSet(null));
super.onCompleteFailure(cause);
}
}
}

View File

@ -270,9 +270,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
frame.close();
if (referenced != null)
referenced.release();
if (!coreSession.isDemanding())
demand(1);
}
@Override

View File

@ -67,6 +67,7 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
private final Negotiated negotiated;
private final boolean demanding;
private final Flusher flusher = new Flusher(this);
private final ExtensionStack extensionStack;
private int maxOutgoingFrames = -1;
private final AtomicInteger numOutgoingFrames = new AtomicInteger();
@ -90,7 +91,8 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
this.behavior = behavior;
this.negotiated = negotiated;
this.demanding = handler.isDemanding();
negotiated.getExtensions().initialize(new IncomingAdaptor(), new OutgoingAdaptor(), this);
extensionStack = negotiated.getExtensions();
extensionStack.initialize(new IncomingAdaptor(), new OutgoingAdaptor(), this);
}
public ClassLoader getClassLoader()
@ -204,6 +206,7 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
{
connection.getEndPoint().setIdleTimeout(idleTimeout.toMillis());
connection.getFrameFlusher().setIdleTimeout(writeTimeout.toMillis());
extensionStack.setLastDemand(connection::demand);
this.connection = connection;
}
@ -392,7 +395,7 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to OPEN");
if (!demanding)
connection.demand(1);
autoDemand();
},
x ->
{
@ -422,7 +425,12 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
{
if (!demanding)
throw new IllegalStateException("FrameHandler is not demanding: " + this);
connection.demand(n);
getExtensionStack().demand(n);
}
public void autoDemand()
{
getExtensionStack().demand(1);
}
@Override
@ -640,7 +648,7 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
private class IncomingAdaptor implements IncomingFrames
{
@Override
public void onFrame(Frame frame, final Callback callback)
public void onFrame(Frame frame, Callback callback)
{
Callback closeCallback = null;
try
@ -653,7 +661,13 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
// Handle inbound frame
if (frame.getOpCode() != OpCode.CLOSE)
{
handle(() -> handler.onFrame(frame, callback));
Callback handlerCallback = isDemanding() ? callback : Callback.from(() ->
{
callback.succeeded();
autoDemand();
}, callback::failed);
handle(() -> handler.onFrame(frame, handlerCallback));
return;
}

View File

@ -0,0 +1,41 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
public class DemandingIncomingFramesCapture extends IncomingFramesCapture
{
private final WebSocketCoreSession _coreSession;
public DemandingIncomingFramesCapture(WebSocketCoreSession coreSession)
{
_coreSession = coreSession;
}
@Override
public void onFrame(Frame frame, Callback callback)
{
try
{
super.onFrame(frame, callback);
}
finally
{
if (!_coreSession.isDemanding())
_coreSession.autoDemand();
}
}
}

View File

@ -28,8 +28,8 @@ public class TestFrameHandler implements SynchronousFrameHandler
protected CoreSession coreSession;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected CloseStatus closeStatus;
protected Throwable failure;
public CloseStatus closeStatus;
public Throwable failure;
public CountDownLatch open = new CountDownLatch(1);
public CountDownLatch error = new CountDownLatch(1);

View File

@ -14,7 +14,8 @@
package org.eclipse.jetty.websocket.core.extensions;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.io.ByteBufferPool;
@ -39,8 +40,8 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class ExtensionTool
{
@ -51,13 +52,15 @@ public class ExtensionTool
private Extension ext;
private final Parser parser;
private final IncomingFramesCapture capture;
private final WebSocketCoreSession coreSession;
private Tester(String parameterizedExtension)
{
this.requestedExtParams = parameterizedExtension;
this.extConfig = ExtensionConfig.parse(parameterizedExtension);
Class<?> extClass = components.getExtensionRegistry().getExtension(extConfig.getName());
assertThat("extClass", extClass, notNullValue());
coreSession = newWebSocketCoreSession(Collections.singletonList(extConfig));
ExtensionStack extensionStack = coreSession.getExtensionStack();
assertThat(extensionStack.getExtensions().size(), equalTo(1));
this.capture = new IncomingFramesCapture();
this.parser = new Parser(new MappedByteBufferPool());
@ -70,9 +73,8 @@ public class ExtensionTool
public void assertNegotiated(String expectedNegotiation)
{
this.ext = components.getExtensionRegistry().newInstance(extConfig, components);
this.ext = coreSession.getExtensionStack().getExtensions().get(0);
this.ext.setNextIncomingFrames(capture);
this.ext.setCoreSession(newWebSocketCoreSession());
}
public void parseIncomingHex(String... rawhex)
@ -80,6 +82,9 @@ public class ExtensionTool
int parts = rawhex.length;
byte[] net;
// Simulate initial demand from onOpen().
coreSession.autoDemand();
for (int i = 0; i < parts; i++)
{
String hex = rawhex[i].replaceAll("\\s*(0x)?", "");
@ -92,7 +97,16 @@ public class ExtensionTool
if (frame == null)
break;
FutureCallback callback = new FutureCallback();
FutureCallback callback = new FutureCallback()
{
@Override
public void succeeded()
{
super.succeeded();
if (!coreSession.isDemanding())
coreSession.autoDemand();
}
};
ext.onFrame(frame, callback);
// Throw if callback fails.
@ -157,11 +171,11 @@ public class ExtensionTool
return new Tester(parameterizedExtension);
}
private WebSocketCoreSession newWebSocketCoreSession()
private WebSocketCoreSession newWebSocketCoreSession(List<ExtensionConfig> configs)
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(new LinkedList<>(), new LinkedList<>());
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
return coreSession;
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
exStack.negotiate(configs, configs);
return new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
}
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -24,12 +25,19 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.ByteBufferAssert;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.DemandingIncomingFramesCapture;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFramesCapture;
import org.eclipse.jetty.websocket.core.TestMessageHandler;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
import org.eclipse.jetty.websocket.core.internal.FragmentExtension;
import org.eclipse.jetty.websocket.core.internal.Negotiated;
import org.eclipse.jetty.websocket.core.internal.WebSocketCoreSession;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -38,51 +46,74 @@ import static org.hamcrest.Matchers.is;
public class FragmentExtensionTest extends AbstractExtensionTest
{
/**
* Verify that incoming frames are passed thru without modification
* Verify that incoming frames are fragmented correctly.
*/
@Test
public void testIncomingFrames()
public void testIncomingFrames() throws Exception
{
IncomingFramesCapture capture = new IncomingFramesCapture();
FragmentExtension ext = new FragmentExtension();
ExtensionConfig config = ExtensionConfig.parse("fragment;maxLength=4");
ext.init(config, components);
ExtensionConfig config = ExtensionConfig.parse("fragment;maxLength=20");
WebSocketCoreSession coreSession = newSession(config);
FragmentExtension ext = (FragmentExtension)coreSession.getExtensionStack().getExtensions().get(0);
IncomingFramesCapture capture = new DemandingIncomingFramesCapture(coreSession);
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.autoDemand();
// Quote
List<String> quote = new ArrayList<>();
quote.add("No amount of experimentation can ever prove me right;");
quote.add("a single experiment can prove me wrong.");
quote.add("-- Albert Einstein");
// Manually create frame and pass into extension
for (String q : quote)
// Write quote as separate frames
for (String section : quote)
{
Frame frame = new Frame(OpCode.TEXT).setPayload(q);
Frame frame = new Frame(OpCode.TEXT).setPayload(section);
ext.onFrame(frame, Callback.NOOP);
}
int len = quote.size();
// Expected Frames
List<Frame> expectedFrames = new ArrayList<>();
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("No amount of experim").setFin(false));
expectedFrames.add(new Frame(OpCode.CONTINUATION).setPayload("entation can ever pr").setFin(false));
expectedFrames.add(new Frame(OpCode.CONTINUATION).setPayload("ove me right;").setFin(true));
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("a single experiment ").setFin(false));
expectedFrames.add(new Frame(OpCode.CONTINUATION).setPayload("can prove me wrong.").setFin(true));
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("-- Albert Einstein").setFin(true));
// capture.dump();
int len = expectedFrames.size();
capture.assertFrameCount(len);
String prefix;
int i = 0;
for (Frame actual : capture.frames)
BlockingQueue<Frame> frames = capture.frames;
for (int i = 0; i < len; i++)
{
prefix = "Frame[" + i + "]";
Frame actualFrame = frames.poll(1, TimeUnit.SECONDS);
Frame expectedFrame = expectedFrames.get(i);
assertThat(prefix + ".opcode", actual.getOpCode(), is(OpCode.TEXT));
assertThat(prefix + ".fin", actual.isFin(), is(true));
assertThat(prefix + ".rsv1", actual.isRsv1(), is(false));
assertThat(prefix + ".rsv2", actual.isRsv2(), is(false));
assertThat(prefix + ".rsv3", actual.isRsv3(), is(false));
// System.out.printf("actual: %s%n",actualFrame);
// System.out.printf("expect: %s%n",expectedFrame);
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i), StandardCharsets.UTF_8);
assertThat(prefix + ".payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expected, actual.getPayload().slice());
i++;
// Validate Frame
assertThat(prefix + ".opcode", actualFrame.getOpCode(), is(expectedFrame.getOpCode()));
assertThat(prefix + ".fin", actualFrame.isFin(), is(expectedFrame.isFin()));
assertThat(prefix + ".rsv1", actualFrame.isRsv1(), is(expectedFrame.isRsv1()));
assertThat(prefix + ".rsv2", actualFrame.isRsv2(), is(expectedFrame.isRsv2()));
assertThat(prefix + ".rsv3", actualFrame.isRsv3(), is(expectedFrame.isRsv3()));
// Validate Payload
ByteBuffer expectedData = expectedFrame.getPayload().slice();
ByteBuffer actualData = actualFrame.getPayload().slice();
assertThat(prefix + ".payloadLength", actualData.remaining(), is(expectedData.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload", expectedData, actualData);
}
}
@ -92,14 +123,16 @@ public class FragmentExtensionTest extends AbstractExtensionTest
@Test
public void testIncomingPing()
{
IncomingFramesCapture capture = new IncomingFramesCapture();
FragmentExtension ext = new FragmentExtension();
ExtensionConfig config = ExtensionConfig.parse("fragment;maxLength=4");
ext.init(config, components);
WebSocketCoreSession coreSession = newSession(config);
FragmentExtension ext = (FragmentExtension)coreSession.getExtensionStack().getExtensions().get(0);
IncomingFramesCapture capture = new DemandingIncomingFramesCapture(coreSession);
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.autoDemand();
String payload = "Are you there?";
Frame ping = new Frame(OpCode.PING).setPayload(payload);
ext.onFrame(ping, Callback.NOOP);
@ -290,4 +323,19 @@ public class FragmentExtensionTest extends AbstractExtensionTest
assertThat("Frame.payloadLength", actual.getPayloadLength(), is(expected.remaining()));
ByteBufferAssert.assertEquals("Frame.payload", expected, actual.getPayload().slice());
}
private WebSocketCoreSession newSession(ExtensionConfig config)
{
return newSessionFromConfig(new Configuration.ConfigurationCustomizer(), config == null ? Collections.emptyList() : Collections.singletonList(config));
}
private WebSocketCoreSession newSessionFromConfig(Configuration.ConfigurationCustomizer configuration, List<ExtensionConfig> configs)
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(configs, configs);
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
configuration.customize(configuration);
return coreSession;
}
}

View File

@ -17,7 +17,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -27,6 +27,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.Configuration.ConfigurationCustomizer;
import org.eclipse.jetty.websocket.core.DemandingIncomingFramesCapture;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
@ -283,9 +284,9 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
@Test
public void testIncomingPing()
{
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
ext.init(config, components);
WebSocketCoreSession coreSession = newSession(config);
PerMessageDeflateExtension ext = (PerMessageDeflateExtension)coreSession.getExtensionStack().getExtensions().get(0);
// Setup capture of incoming frames
IncomingFramesCapture capture = new IncomingFramesCapture();
@ -293,6 +294,9 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
// Wire up stack
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.autoDemand();
String payload = "Are you there?";
Frame ping = new Frame(OpCode.PING).setPayload(payload);
ext.onFrame(ping, Callback.NOOP);
@ -318,16 +322,19 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
@Test
public void testIncomingUncompressedFrames()
{
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
ext.init(config, components);
WebSocketCoreSession coreSession = newSession(config);
PerMessageDeflateExtension ext = (PerMessageDeflateExtension)coreSession.getExtensionStack().getExtensions().get(0);
// Setup capture of incoming frames
IncomingFramesCapture capture = new IncomingFramesCapture();
IncomingFramesCapture capture = new DemandingIncomingFramesCapture(coreSession);
// Wire up stack
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.autoDemand();
// Quote
List<String> quote = new ArrayList<>();
quote.add("No amount of experimentation can ever prove me right;");
@ -368,10 +375,9 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
@Test
public void testIncomingFrameNoPayload()
{
PerMessageDeflateExtension ext = new PerMessageDeflateExtension();
ExtensionConfig config = ExtensionConfig.parse("permessage-deflate");
ext.init(config, components);
ext.setCoreSession(newSession());
WebSocketCoreSession coreSession = newSession(config);
PerMessageDeflateExtension ext = (PerMessageDeflateExtension)coreSession.getExtensionStack().getExtensions().get(0);
// Setup capture of incoming frames
IncomingFramesCapture capture = new IncomingFramesCapture();
@ -379,6 +385,9 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
// Wire up stack
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.autoDemand();
Frame ping = new Frame(OpCode.TEXT);
ping.setRsv1(true);
ext.onFrame(ping, Callback.NOOP);
@ -592,14 +601,19 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
private WebSocketCoreSession newSession()
{
return newSessionFromConfig(new ConfigurationCustomizer());
return newSession(null);
}
private WebSocketCoreSession newSessionFromConfig(ConfigurationCustomizer configuration)
private WebSocketCoreSession newSession(ExtensionConfig config)
{
return newSessionFromConfig(new ConfigurationCustomizer(), config == null ? Collections.emptyList() : Collections.singletonList(config));
}
private WebSocketCoreSession newSessionFromConfig(ConfigurationCustomizer configuration, List<ExtensionConfig> configs)
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(new LinkedList<>(), new LinkedList<>());
exStack.negotiate(configs, configs);
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
configuration.customize(configuration);
return coreSession;

View File

@ -0,0 +1,197 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core.extensions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.client.CoreClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PermessageDeflateDemandTest
{
private Server _server;
private WebSocketCoreClient _client;
private ServerConnector _connector;
private WebSocketUpgradeHandler _upgradeHandler;
@BeforeEach
public void before() throws Exception
{
_server = new Server();
_connector = new ServerConnector(_server);
_server.addConnector(_connector);
_upgradeHandler = new WebSocketUpgradeHandler();
_server.setHandler(_upgradeHandler);
_server.start();
_client = new WebSocketCoreClient();
_client.start();
}
@AfterEach
public void after() throws Exception
{
_client.stop();
_server.stop();
}
@Test
public void test() throws Exception
{
ServerHandler serverHandler = new ServerHandler();
_upgradeHandler.addMapping("/", WebSocketNegotiator.from(n -> serverHandler));
TestFrameHandler clientHandler = new TestFrameHandler();
URI uri = URI.create("ws://localhost:" + _connector.getLocalPort());
CoreClientUpgradeRequest upgradeRequest = CoreClientUpgradeRequest.from(_client, uri, clientHandler);
upgradeRequest.addExtensions("permessage-deflate");
CoreSession coreSession = _client.connect(upgradeRequest).get(5, TimeUnit.SECONDS);
assertNotNull(coreSession);
// Set max frame size to autoFragment the message into multiple frames.
ByteBuffer message = randomBytes(1024);
coreSession.setMaxFrameSize(64);
coreSession.sendFrame(new Frame(OpCode.BINARY, message).setFin(true), Callback.NOOP, false);
coreSession.close(CloseStatus.NORMAL, null, Callback.NOOP);
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(clientHandler.closeStatus.getCode(), equalTo(CloseStatus.NORMAL));
assertThat(serverHandler.binaryMessages.size(), equalTo(1));
ByteBuffer recvMessage = serverHandler.binaryMessages.poll();
assertThat(recvMessage, equalTo(message));
}
private static ByteBuffer randomBytes(int size)
{
var bytes = new byte[size];
new Random(42).nextBytes(bytes);
return BufferUtil.toBuffer(bytes);
}
public static class ServerHandler implements FrameHandler
{
private CoreSession _coreSession;
private byte _messageType;
public BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> binaryMessages = new BlockingArrayQueue<>();
private StringBuilder _stringBuilder = new StringBuilder();
private ByteBufferCallbackAccumulator _byteBuilder = new ByteBufferCallbackAccumulator();
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
_coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
}
@Override
public void onFrame(Frame frame, Callback callback)
{
if (frame.isDataFrame())
{
switch (frame.getOpCode())
{
case OpCode.TEXT:
_messageType = OpCode.TEXT;
break;
case OpCode.BINARY:
_messageType = OpCode.BINARY;
break;
case OpCode.CONTINUATION:
break;
default:
throw new IllegalStateException(OpCode.name(frame.getOpCode()));
}
switch (_messageType)
{
case OpCode.TEXT:
_stringBuilder.append(frame.getPayloadAsUTF8());
callback.succeeded();
if (frame.isFin())
{
textMessages.add(_stringBuilder.toString());
_stringBuilder = new StringBuilder();
}
break;
case OpCode.BINARY:
_byteBuilder.addEntry(frame.getPayload(), callback);
if (frame.isFin())
{
binaryMessages.add(BufferUtil.toBuffer(_byteBuilder.takeByteArray()));
_byteBuilder = new ByteBufferCallbackAccumulator();
}
break;
default:
throw new IllegalStateException(OpCode.name(_messageType));
}
}
else
{
callback.succeeded();
}
_coreSession.demand(1);
}
@Override
public void onError(Throwable cause, Callback callback)
{
cause.printStackTrace();
callback.succeeded();
}
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
callback.succeeded();
}
@Override
public boolean isDemanding()
{
return true;
}
}
}

View File

@ -0,0 +1,101 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LargeDeflateTest
{
private Server _server;
private ServerConnector _connector;
private WebSocketClient _client;
private final EventSocket _serverSocket = new EventSocket();
@BeforeEach
void before() throws Exception
{
_server = new Server();
_connector = new ServerConnector(_server);
_server.addConnector(_connector);
ServletContextHandler handler = new ServletContextHandler();
_server.insertHandler(handler);
JettyWebSocketServletContainerInitializer.configure(handler, (servletContext, container) ->
{
container.setIdleTimeout(Duration.ofDays(1));
container.setMaxFrameSize(Integer.MAX_VALUE);
container.setMaxBinaryMessageSize(Integer.MAX_VALUE);
container.addMapping("/", (req, resp) -> _serverSocket);
});
_server.start();
_client = new WebSocketClient();
_client.start();
}
@AfterEach
void after() throws Exception
{
_client.stop();
_server.stop();
}
@Test
void testDeflate() throws Exception
{
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.addExtensions("permessage-deflate");
EventSocket clientSocket = new EventSocket();
Session session = _client.connect(clientSocket, URI.create("ws://localhost:" + _connector.getLocalPort() + "/ws"), upgradeRequest).get();
ByteBuffer sentMessage = largePayloads();
session.getRemote().sendBytes(sentMessage);
session.close(StatusCode.NORMAL, "close from test");
assertTrue(_serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(_serverSocket.closeCode, is(StatusCode.NORMAL));
assertThat(_serverSocket.closeReason, is("close from test"));
ByteBuffer message = _serverSocket.binaryMessages.poll(1, TimeUnit.SECONDS);
assertThat(message, is(sentMessage));
}
private static ByteBuffer largePayloads()
{
var bytes = new byte[4 * 1024 * 1024];
new Random(42).nextBytes(bytes);
return BufferUtil.toBuffer(bytes);
}
}

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -22,7 +23,9 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@ -32,6 +35,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -68,6 +72,13 @@ public class PermessageDeflateBufferTest
return sb.toString();
}
private static ByteBuffer randomBytes(int size)
{
var bytes = new byte[size];
new Random(42).nextBytes(bytes);
return BufferUtil.toBuffer(bytes);
}
@BeforeEach
public void before() throws Exception
{
@ -124,5 +135,26 @@ public class PermessageDeflateBufferTest
session.close();
assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(socket.closeCode, equalTo(StatusCode.NORMAL));
}
@Test
public void testPermessageDeflateFragmentedBinaryMessage() throws Exception
{
EventSocket socket = new EventSocket();
ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
clientUpgradeRequest.addExtensions("permessage-deflate");
URI uri = URI.create("ws://localhost:" + connector.getLocalPort());
Session session = client.connect(socket, uri, clientUpgradeRequest).get(5, TimeUnit.SECONDS);
ByteBuffer message = randomBytes(1024);
session.setMaxFrameSize(64);
session.getRemote().sendBytes(message);
assertThat(socket.binaryMessages.poll(5, TimeUnit.SECONDS), equalTo(message));
session.close();
assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(socket.closeCode, equalTo(StatusCode.NORMAL));
}
}