Issue #9965 - make multiple websocket demand throw ISE
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
2b4e8960e6
commit
0709946675
|
@ -17,6 +17,7 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ReadPendingException;
|
||||||
import java.security.SecureRandom;
|
import java.security.SecureRandom;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -31,7 +32,6 @@ import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
import org.eclipse.jetty.util.MathUtils;
|
|
||||||
import org.eclipse.jetty.util.component.Dumpable;
|
import org.eclipse.jetty.util.component.Dumpable;
|
||||||
import org.eclipse.jetty.util.thread.AutoLock;
|
import org.eclipse.jetty.util.thread.AutoLock;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
@ -61,7 +61,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
||||||
private final WebSocketCoreSession coreSession;
|
private final WebSocketCoreSession coreSession;
|
||||||
private final Flusher flusher;
|
private final Flusher flusher;
|
||||||
private final Random random;
|
private final Random random;
|
||||||
private long demand;
|
private Boolean demand;
|
||||||
private boolean fillingAndParsing;
|
private boolean fillingAndParsing;
|
||||||
private final LongAdder messagesIn = new LongAdder();
|
private final LongAdder messagesIn = new LongAdder();
|
||||||
private final LongAdder bytesIn = new LongAdder();
|
private final LongAdder bytesIn = new LongAdder();
|
||||||
|
@ -351,10 +351,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this);
|
LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this);
|
||||||
|
|
||||||
if (demand < 0)
|
if (demand != null)
|
||||||
return;
|
{
|
||||||
|
if (demand)
|
||||||
demand = MathUtils.cappedAdd(demand, 1);
|
throw new ReadPendingException();
|
||||||
|
demand = true;
|
||||||
|
}
|
||||||
|
|
||||||
if (!fillingAndParsing)
|
if (!fillingAndParsing)
|
||||||
{
|
{
|
||||||
|
@ -379,13 +381,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
||||||
|
|
||||||
if (!fillingAndParsing)
|
if (!fillingAndParsing)
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
if (demand != 0) //if demand was canceled, this creates synthetic demand in order to read until EOF
|
if (demand == null) // If demand was canceled, this creates synthetic demand in order to read until EOF.
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
fillingAndParsing = false;
|
fillingAndParsing = false;
|
||||||
if (!networkBuffer.hasRemaining())
|
if (!networkBuffer.hasRemaining())
|
||||||
releaseNetworkBuffer();
|
releaseNetworkBuffer();
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -397,14 +398,13 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("meetDemand d={} fp={} {} {}", demand, fillingAndParsing, networkBuffer, this);
|
LOG.debug("meetDemand d={} fp={} {} {}", demand, fillingAndParsing, networkBuffer, this);
|
||||||
|
|
||||||
if (demand == 0)
|
if (demand == Boolean.FALSE)
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
if (!fillingAndParsing)
|
if (!fillingAndParsing)
|
||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
|
|
||||||
if (demand > 0)
|
if (demand != null)
|
||||||
demand--;
|
demand = false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -415,7 +415,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("cancelDemand d={} fp={} {} {}", demand, fillingAndParsing, networkBuffer, this);
|
LOG.debug("cancelDemand d={} fp={} {} {}", demand, fillingAndParsing, networkBuffer, this);
|
||||||
demand = -1;
|
demand = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,8 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.websocket.core.util;
|
package org.eclipse.jetty.websocket.core.util;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.nio.channels.ReadPendingException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
@ -41,7 +42,7 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
|
||||||
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
|
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
|
||||||
|
|
||||||
private final IncomingFrames _emitFrame;
|
private final IncomingFrames _emitFrame;
|
||||||
private final AtomicLong _demand = new AtomicLong();
|
private final AtomicBoolean _demand = new AtomicBoolean();
|
||||||
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
|
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
|
||||||
private DemandChain _nextDemand;
|
private DemandChain _nextDemand;
|
||||||
|
|
||||||
|
@ -77,7 +78,8 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
|
||||||
@Override
|
@Override
|
||||||
public void demand()
|
public void demand()
|
||||||
{
|
{
|
||||||
_demand.incrementAndGet();
|
if (!_demand.compareAndSet(false, true))
|
||||||
|
throw new ReadPendingException();
|
||||||
iterate();
|
iterate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,8 +141,8 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
|
||||||
*/
|
*/
|
||||||
public void emitFrame(Frame frame, Callback callback)
|
public void emitFrame(Frame frame, Callback callback)
|
||||||
{
|
{
|
||||||
if (_demand.decrementAndGet() < 0)
|
if (_demand.compareAndSet(true, false))
|
||||||
throw new IllegalStateException("Negative Demand");
|
throw new IllegalStateException("Demand Already Fulfilled");
|
||||||
_emitFrame.onFrame(frame, callback);
|
_emitFrame.onFrame(frame, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,7 +155,7 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema
|
||||||
if (failure != null)
|
if (failure != null)
|
||||||
throw failure;
|
throw failure;
|
||||||
|
|
||||||
if (_demand.get() <= 0)
|
if (!_demand.get())
|
||||||
break;
|
break;
|
||||||
|
|
||||||
if (_needContent)
|
if (_needContent)
|
||||||
|
|
Loading…
Reference in New Issue