Issue #9944 - remove integer from demand in websocket core

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2023-06-22 09:08:29 +10:00
parent 87eb651c1f
commit 9e99a58ac4
43 changed files with 129 additions and 135 deletions

View File

@ -152,14 +152,13 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
void abort();
/**
* <p>Manages flow control by indicating demand for WebSocket frames.</p>
* <p>Manages flow control by indicating demand for a WebSocket frame.</p>
* <p>A call to {@link FrameHandler#onFrame(Frame, Callback)} will only
* be made if there is demand.</p>
*
* @param n the number of frames that can be handled in sequential calls to
* {@link FrameHandler#onFrame(Frame, Callback)}, must be positive.
* {@link FrameHandler#onFrame(Frame, Callback)}.
*/
void demand(long n);
void demand();
/**
* @return true if an extension has been negotiated which uses the RSV1 bit.
@ -287,7 +286,7 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
}
@Override
public void demand(long n)
public void demand()
{
}

View File

@ -18,7 +18,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import org.eclipse.jetty.http.BadMessageException;
@ -45,8 +44,8 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
private IncomingFrames incoming;
private OutgoingFrames outgoing;
private final Extension[] rsvClaims = new Extension[3];
private LongConsumer lastDemand;
private DemandChain demandChain = n -> lastDemand.accept(n);
private DemandChain lastDemand;
private DemandChain demandChain = () -> lastDemand.demand();
public ExtensionStack(WebSocketComponents components, Behavior behavior)
{
@ -224,10 +223,9 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
if (ext instanceof DemandChain)
if (ext instanceof DemandChain demandingExtension)
{
DemandChain demandingExtension = (DemandChain)ext;
demandingExtension.setNextDemand(demandChain::demand);
demandingExtension.setNextDemand(demandChain);
demandChain = demandingExtension;
}
}
@ -273,12 +271,12 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
}
}
public void demand(long n)
public void demand()
{
demandChain.demand(n);
demandChain.demand();
}
public void setLastDemand(LongConsumer lastDemand)
public void setLastDemand(DemandChain lastDemand)
{
this.lastDemand = lastDemand;
}

View File

@ -54,7 +54,7 @@ import org.eclipse.jetty.util.Callback;
* or not. The error code will indicate the nature of the close.</li>
* </ul>
* <p>FrameHandler is responsible to manage the demand for more
* WebSocket frames, either directly by calling {@link CoreSession#demand(long)}
* WebSocket frames, either directly by calling {@link CoreSession#demand()}
* or by delegating the demand management to other components.</p>
*/
public interface FrameHandler extends IncomingFrames
@ -64,7 +64,7 @@ public interface FrameHandler extends IncomingFrames
* <p>It is allowed to send WebSocket frames via
* {@link CoreSession#sendFrame(Frame, Callback, boolean)}.
* <p>WebSocket frames cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
* <p>If the callback argument is failed, the implementation
* sends a CLOSE frame with {@link CloseStatus#SERVER_ERROR},
* and the connection will be closed.</p>
@ -80,7 +80,7 @@ public interface FrameHandler extends IncomingFrames
* <p>This method will never be called concurrently for the
* same session; will be called sequentially to satisfy the
* outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.</p>
* {@link CoreSession#demand()}.</p>
* <p>Both control and data frames are passed to this method.</p>
* <p>CLOSE frames may be responded from this method, but if
* they are not responded, then the implementation will respond
@ -89,7 +89,7 @@ public interface FrameHandler extends IncomingFrames
* that the buffers associated with the frame can be recycled.</p>
* <p>Additional WebSocket frames (of any type, including CLOSE
* frames) cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
*
* @param frame the WebSocket frame.
* @param callback the callback to indicate success or failure of

View File

@ -343,21 +343,18 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
fillAndParse();
}
public void demand(long n)
public void demand()
{
if (n <= 0)
throw new IllegalArgumentException("Demand must be positive");
boolean fillAndParse = false;
try (AutoLock l = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("demand {} d={} fp={} {} {}", n, demand, fillingAndParsing, networkBuffer, this);
LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this);
if (demand < 0)
return;
demand = MathUtils.cappedAdd(demand, n);
demand = MathUtils.cappedAdd(demand, 1);
if (!fillingAndParsing)
{

View File

@ -402,9 +402,9 @@ public class WebSocketCoreSession implements CoreSession, Dumpable
}
@Override
public void demand(long n)
public void demand()
{
getExtensionStack().demand(n);
getExtensionStack().demand();
}
@Override

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.AbstractExtension;
@ -55,13 +54,13 @@ public class FragmentExtension extends AbstractExtension implements DemandChain
}
@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}
@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}

View File

@ -117,7 +117,7 @@ public class MessageHandler implements FrameHandler
this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -210,7 +210,7 @@ public class MessageHandler implements FrameHandler
callback.succeeded();
}
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
@ -244,7 +244,7 @@ public class MessageHandler implements FrameHandler
callback.succeeded();
}
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
@ -264,13 +264,13 @@ public class MessageHandler implements FrameHandler
protected void onPingFrame(Frame frame, Callback callback)
{
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(1), callback), false);
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(), callback), false);
}
protected void onPongFrame(Frame frame, Callback callback)
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
protected void onCloseFrame(Frame frame, Callback callback)

View File

@ -17,7 +17,6 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@ -252,15 +251,15 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
}
@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}
@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}
private class OutgoingFlusher extends TransformingFlusher

View File

@ -87,12 +87,12 @@ public abstract class AbstractMessageSink implements MessageSink
/**
* <p>If {@link #isAutoDemand()} then demands for one more WebSocket frame
* via {@link CoreSession#demand(long)}; otherwise it is a no-operation,
* via {@link CoreSession#demand()}; otherwise it is a no-operation,
* because the demand is explicitly managed by the application function.</p>
*/
protected void autoDemand()
{
if (isAutoDemand())
getCoreSession().demand(1);
getCoreSession().demand();
}
}

View File

@ -78,7 +78,7 @@ public class ByteArrayMessageSink extends AbstractMessageSink
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}
@ -96,7 +96,7 @@ public class ByteArrayMessageSink extends AbstractMessageSink
}
else
{
getCoreSession().demand(1);
getCoreSession().demand();
}
}

View File

@ -82,7 +82,7 @@ public class ByteBufferMessageSink extends AbstractMessageSink
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}
@ -103,7 +103,7 @@ public class ByteBufferMessageSink extends AbstractMessageSink
else
{
// Did not call the application so must explicitly demand here.
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)

View File

@ -65,7 +65,7 @@ public class MessageInputStream extends InputStream implements MessageSink
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
session.demand(1);
session.demand();
return;
}
@ -228,7 +228,7 @@ public class MessageInputStream extends InputStream implements MessageSink
{
current.callback.succeeded();
if (!current.frame.isFin())
session.demand(1);
session.demand();
}
}

View File

@ -33,7 +33,7 @@ public interface MessageSink
* payload is consumed.</p>
* <p>The demand for more frames must be explicitly invoked,
* or arranged to be invoked asynchronously, by the implementation
* of this method, by calling {@link CoreSession#demand(long)}.</p>
* of this method, by calling {@link CoreSession#demand()}.</p>
*
* @param frame the frame to consume
* @param callback the callback to complete when the frame is consumed

View File

@ -54,7 +54,7 @@ public class PartialByteArrayMessageSink extends AbstractMessageSink
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)

View File

@ -52,7 +52,7 @@ public class PartialByteBufferMessageSink extends AbstractMessageSink
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)

View File

@ -72,7 +72,7 @@ public class StringMessageSink extends AbstractMessageSink
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)

View File

@ -13,8 +13,6 @@
package org.eclipse.jetty.websocket.core.util;
import java.util.function.LongConsumer;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionStack;
@ -25,9 +23,9 @@ import org.eclipse.jetty.websocket.core.ExtensionStack;
*/
public interface DemandChain
{
void demand(long n);
void demand();
default void setNextDemand(LongConsumer nextDemand)
default void setNextDemand(DemandChain nextDemand)
{
}
}

View File

@ -15,7 +15,6 @@ package org.eclipse.jetty.websocket.core.util;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
@ -44,7 +43,7 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
private LongConsumer _nextDemand;
private DemandChain _nextDemand;
private Frame _frame;
private Callback _callback;
@ -76,21 +75,21 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
protected abstract boolean handle(Frame frame, Callback callback, boolean first);
@Override
public void demand(long n)
public void demand()
{
_demand.getAndUpdate(d -> Math.addExact(d, n));
_demand.incrementAndGet();
iterate();
}
@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
_nextDemand = nextDemand;
}
/**
* Used to supply the flusher with a new frame. This frame should only arrive if demanded
* through the {@link LongConsumer} provided by {@link #setNextDemand(LongConsumer)}.
* through the {@link DemandChain} provided by {@link #setNextDemand(DemandChain)}.
* @param frame the WebSocket frame.
* @param callback to release frame payload.
*/
@ -160,7 +159,7 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
if (_needContent)
{
_needContent = false;
_nextDemand.accept(1);
_nextDemand.demand();
return Action.SCHEDULED;
}

View File

@ -69,14 +69,14 @@ public class DemandTest
{
_coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
public void onFrame(Frame frame, Callback callback)
{
callback.succeeded();
_coreSession.demand(1);
_coreSession.demand();
}
@Override
@ -111,7 +111,7 @@ public class DemandTest
_coreSession.abort();
// Demand should not throw even if closed.
_coreSession.demand(1);
_coreSession.demand();
errorFuture.complete(null);
}
catch (Throwable t)

View File

@ -33,7 +33,7 @@ public class DemandingIncomingFramesCapture extends IncomingFramesCapture
}
finally
{
_coreSession.demand(1);
_coreSession.demand();
}
}
}

View File

@ -56,6 +56,6 @@ public class EchoFrameHandler extends TestAsyncFrameHandler
callback.succeeded();
}
coreSession.demand(1);
coreSession.demand();
}
}

View File

@ -51,7 +51,7 @@ public class TestAsyncFrameHandler implements FrameHandler
LOG.debug("[{}] onOpen {}", name, coreSession);
this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
openLatch.countDown();
}
@ -62,7 +62,7 @@ public class TestAsyncFrameHandler implements FrameHandler
LOG.debug("[{}] onFrame {}", name, frame);
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -71,7 +71,7 @@ public class TestFrameHandler implements SynchronousFrameHandler
protected void demand()
{
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -111,7 +111,7 @@ public class WebSocketCloseTest extends WebSocketTester
case ISHUT:
{
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL));
assertThat(serverHandler.coreSession.toString(), containsString("ISHUT"));
@ -193,7 +193,7 @@ public class WebSocketCloseTest extends WebSocketTester
public void testClientCloseOSHUT(String scheme) throws Exception
{
setup(State.OSHUT, scheme);
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().succeeded();
@ -209,7 +209,7 @@ public class WebSocketCloseTest extends WebSocketTester
public void testClientDifferentCloseOSHUT(String scheme) throws Exception
{
setup(State.OSHUT, scheme);
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.BAD_PAYLOAD), true));
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().succeeded();
@ -227,7 +227,7 @@ public class WebSocketCloseTest extends WebSocketTester
try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class))
{
setup(State.OSHUT, scheme);
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
serverHandler.receivedCallback.poll().failed(new Exception("Test"));
@ -246,7 +246,7 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OPEN, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
@ -259,7 +259,7 @@ public class WebSocketCloseTest extends WebSocketTester
setup(State.OSHUT, scheme);
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL));
assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames"));
@ -335,7 +335,7 @@ public class WebSocketCloseTest extends WebSocketTester
client.close();
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
}
@ -348,7 +348,7 @@ public class WebSocketCloseTest extends WebSocketTester
client.close();
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE));
}
@ -376,7 +376,7 @@ public class WebSocketCloseTest extends WebSocketTester
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
{
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
}
@ -394,7 +394,7 @@ public class WebSocketCloseTest extends WebSocketTester
try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class))
{
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
}
@ -446,7 +446,7 @@ public class WebSocketCloseTest extends WebSocketTester
client.getOutputStream().write(RawFrameBuilder.buildClose(
new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS));
Callback closeFrameCallback = Objects.requireNonNull(serverHandler.receivedCallback.poll());
closeFrameCallback.succeeded();
@ -517,7 +517,7 @@ public class WebSocketCloseTest extends WebSocketTester
CloseStatus closeStatus = new CloseStatus(CloseStatus.NORMAL, "throw from onFrame");
client.getOutputStream().write(RawFrameBuilder.buildClose(closeStatus, true));
serverHandler.coreSession.demand(1);
serverHandler.coreSession.demand();
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(serverHandler.closeStatus.getReason(), containsString("deliberately throwing from onFrame"));

View File

@ -65,7 +65,7 @@ public class WebSocketOpenTest extends WebSocketTester
assertThat(s.toString(), containsString("CONNECTED"));
s.sendFrame(new Frame(OpCode.TEXT, "Hello"), NOOP, false);
c.succeeded();
s.demand(1);
s.demand();
});
Frame.Parsed frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(), is("Hello"));
@ -155,7 +155,7 @@ public class WebSocketOpenTest extends WebSocketTester
// Demanding in onOpen will allow you to receive frames.
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, "message in onOpen", true));
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
coreSession.demand(1);
coreSession.demand();
Frame rcvFrame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(rcvFrame);
assertThat(rcvFrame.getPayloadAsUTF8(), is("message in onOpen"));
@ -163,7 +163,7 @@ public class WebSocketOpenTest extends WebSocketTester
// Demand to receive the close frame.
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS));
coreSession.demand(1);
coreSession.demand();
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
// Closed handled normally

View File

@ -104,7 +104,7 @@ public class WebSocketClientServerTest
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
};

View File

@ -82,7 +82,7 @@ public class ExtensionTool
byte[] net;
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
for (int i = 0; i < parts; i++)
{
@ -102,7 +102,7 @@ public class ExtensionTool
public void succeeded()
{
super.succeeded();
coreSession.demand(1);
coreSession.demand();
}
};
ext.onFrame(frame, callback);
@ -172,7 +172,7 @@ public class ExtensionTool
private WebSocketCoreSession newWebSocketCoreSession(List<ExtensionConfig> configs)
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test.
exStack.negotiate(configs, configs);
return new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
}

View File

@ -59,7 +59,7 @@ public class FragmentExtensionTest extends AbstractExtensionTest
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
// Quote
List<String> quote = new ArrayList<>();
@ -131,7 +131,7 @@ public class FragmentExtensionTest extends AbstractExtensionTest
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
String payload = "Are you there?";
Frame ping = new Frame(OpCode.PING).setPayload(payload);
@ -333,7 +333,7 @@ public class FragmentExtensionTest extends AbstractExtensionTest
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(configs, configs);
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test.
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
configuration.customize(configuration);
return coreSession;

View File

@ -295,7 +295,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
String payload = "Are you there?";
Frame ping = new Frame(OpCode.PING).setPayload(payload);
@ -333,7 +333,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
// Quote
List<String> quote = new ArrayList<>();
@ -386,7 +386,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
ext.setNextIncomingFrames(capture);
// Simulate initial demand from onOpen().
coreSession.demand(1);
coreSession.demand();
Frame ping = new Frame(OpCode.TEXT);
ping.setRsv1(true);
@ -613,7 +613,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER);
exStack.negotiate(configs, configs);
exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test.
exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test.
WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components);
configuration.customize(configuration);
return coreSession;

View File

@ -120,7 +120,7 @@ public class PermessageDeflateDemandTest
{
_coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -170,7 +170,7 @@ public class PermessageDeflateDemandTest
callback.succeeded();
}
_coreSession.demand(1);
_coreSession.demand();
}
@Override

View File

@ -137,7 +137,7 @@ class WebSocketProxy
else
{
callback.succeeded();
client2ProxySession.demand(1);
client2ProxySession.demand();
}
}
@ -226,7 +226,7 @@ class WebSocketProxy
proxy2Server.send(frame, Callback.from(() ->
{
c.succeeded();
client2ProxySession.demand(1);
client2ProxySession.demand();
}, c::failed));
}
else
@ -471,7 +471,7 @@ class WebSocketProxy
else
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
}
@ -591,7 +591,7 @@ class WebSocketProxy
client2Proxy.send(frame, Callback.from(() ->
{
c.succeeded();
proxy2ServerSession.demand(1);
proxy2ServerSession.demand();
}, c::failed));
}
else

View File

@ -110,7 +110,8 @@ public class WebSocketServerTest extends WebSocketTester
Frame frame = serverHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS);
assertNull(frame);
serverHandler.getCoreSession().demand(2);
serverHandler.getCoreSession().demand();
serverHandler.getCoreSession().demand();
frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS);
assertNotNull(frame);
@ -124,7 +125,7 @@ public class WebSocketServerTest extends WebSocketTester
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.getCoreSession().demand(1);
serverHandler.getCoreSession().demand();
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS);
assertNotNull(frame);
@ -148,7 +149,7 @@ public class WebSocketServerTest extends WebSocketTester
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession, callback);
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -157,7 +158,7 @@ public class WebSocketServerTest extends WebSocketTester
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
receivedFrames.offer(frame);
receivedCallbacks.offer(callback);
getCoreSession().demand(1);
getCoreSession().demand();
}
};
@ -248,7 +249,9 @@ public class WebSocketServerTest extends WebSocketTester
public void onOpen(CoreSession coreSession, Callback callback)
{
super.onOpen(coreSession, callback);
coreSession.demand(3);
coreSession.demand();
coreSession.demand();
coreSession.demand();
}
@Override
@ -289,7 +292,7 @@ public class WebSocketServerTest extends WebSocketTester
client.close();
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.getCoreSession().demand(1);
serverHandler.getCoreSession().demand();
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
}
}
@ -307,7 +310,8 @@ public class WebSocketServerTest extends WebSocketTester
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2);
coreSession.demand();
coreSession.demand();
}
@Override
@ -375,7 +379,8 @@ public class WebSocketServerTest extends WebSocketTester
{
super.onOpen(coreSession);
callback.succeeded();
coreSession.demand(2);
coreSession.demand();
coreSession.demand();
}
@Override

View File

@ -402,7 +402,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private void autoDemand()
{
if (isAutoDemand())
session.getCoreSession().demand(1);
session.getCoreSession().demand();
}
public String toString()

View File

@ -55,7 +55,7 @@ public class WebSocketSession implements Session, Dumpable
{
if (frameHandler.isAutoDemand())
throw new IllegalStateException("auto-demanding endpoint cannot explicitly demand");
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -178,7 +178,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onJakartaWebSocketSessionOpened(session));
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable cause)
{
@ -578,7 +578,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
if (activeMessageSink == null)
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
return;
}
@ -593,12 +593,12 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(frame.getPayload()), Callback.from(() ->
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}, x ->
{
// Ignore failures, as we might be OSHUT but receive a PING.
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}), false);
}
@ -616,7 +616,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
JakartaWebSocketPongMessage pongMessage = new JakartaWebSocketPongMessage(payload);
pongHandle.invoke(pongMessage);
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable cause)
{
@ -626,7 +626,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
else
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
}

View File

@ -81,7 +81,7 @@ public abstract class AbstractSessionTest
}
@Override
public void demand(long n)
public void demand()
{
demand.release();
}

View File

@ -229,7 +229,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.coreSession = coreSession;
this.openLatch.countDown();
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -237,7 +237,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -32,7 +32,7 @@ public class FrameEcho implements FrameHandler
{
this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -179,7 +179,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onJakartaWebSocketSessionOpened(session));
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable cause)
{
@ -585,7 +585,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
if (activeMessageSink == null)
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
return;
}
@ -600,12 +600,12 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(frame.getPayload()), Callback.from(() ->
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}, x ->
{
// Ignore failures, as we might be OSHUT but receive a PING.
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}), false);
}
@ -623,7 +623,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
JakartaWebSocketPongMessage pongMessage = new JakartaWebSocketPongMessage(payload);
pongHandle.invoke(pongMessage);
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
catch (Throwable cause)
{
@ -633,7 +633,7 @@ public class JakartaWebSocketFrameHandler implements FrameHandler
else
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
}

View File

@ -75,7 +75,7 @@ public abstract class AbstractSessionTest
}
@Override
public void demand(long n)
public void demand()
{
demand.release();
}

View File

@ -229,7 +229,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.coreSession = coreSession;
this.openLatch.countDown();
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -237,7 +237,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
{
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override

View File

@ -32,7 +32,7 @@ public class FrameEcho implements FrameHandler
{
this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}
@Override
@ -41,7 +41,7 @@ public class FrameEcho implements FrameHandler
Runnable succeedAndDemand = () ->
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
};
if (frame.isControlFrame())

View File

@ -473,7 +473,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
if (frame != null)
onFrame(frame, callback);
else
session.getCoreSession().demand(1);
session.getCoreSession().demand();
}
}
@ -498,7 +498,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
}
if (demand)
session.getCoreSession().demand(1);
session.getCoreSession().demand();
}
public static Throwable convertCause(Throwable cause)