diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index f9ff4174fa7..72f696ebc7f 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.io.AbstractConnection; @@ -125,12 +124,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private final Generator generator; private final Parser parser; private final WebSocketPolicy policy; - private final AtomicBoolean suspendToken; + private final Suspender suspender; private final FrameFlusher flusher; private final String id; private WebSocketSession session; private List extensions; - private boolean isFilling; private ByteBuffer prefillBuffer; private ReadMode readMode = ReadMode.PARSE; private IOState ioState; @@ -147,7 +145,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp this.parser = new Parser(policy,bufferPool); this.scheduler = scheduler; this.extensions = new ArrayList<>(); - this.suspendToken = new AtomicBoolean(false); + this.suspender = new Suspender(); this.ioState = new IOState(); this.ioState.addListener(this); this.flusher = new Flusher(bufferPool,generator,endp); @@ -302,7 +300,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public boolean isReading() { - return isFilling; + return !suspender.isSuspended(); } /** @@ -384,8 +382,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp try { - isFilling = true; - if(readMode == ReadMode.PARSE) { readMode = readParse(buffer); @@ -400,14 +396,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp bufferPool.release(buffer); } - if ((readMode != ReadMode.EOF) && (suspendToken.get() == false)) + if (readMode == ReadMode.EOF) + { + suspender.eof(); + } + else if (!suspender.activateRequestedSuspend()) { fillInterested(); } - else - { - isFilling = false; - } } @Override @@ -586,12 +582,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void resume() { - if (suspendToken.getAndSet(false)) + if (suspender.requestResume()) { - if (!isReading()) - { - fillInterested(); - } + fillInterested(); } } @@ -627,7 +620,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public SuspendToken suspend() { - suspendToken.set(true); + suspender.requestSuspend(); return this; } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/Suspender.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/Suspender.java new file mode 100644 index 00000000000..ffaa57143ed --- /dev/null +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/Suspender.java @@ -0,0 +1,102 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.io; + +import java.util.concurrent.atomic.AtomicReference; + +class Suspender +{ + private enum State + { + /** Not suspended. */ + NORMAL, + + /** Suspend requested but not yet taken effect. */ + PENDING, + + /** Suspended but resumable. */ + SUSPENDED, + + /** Permanently suspended (terminal state). */ + EOF, + } + + private final AtomicReference ref = new AtomicReference<>(State.NORMAL); + + boolean isSuspended() + { + State state = ref.get(); + return state == State.SUSPENDED || state == State.EOF; + } + + /** + * Requests that activity be suspended the next time {@link #activateRequestedSuspend()} is called. + */ + void requestSuspend() + { + // Transition NORMAL -> PENDING + State state; + do + { + state = ref.get(); + } + while (state == State.NORMAL && !ref.compareAndSet(state, State.PENDING)); + } + + /** + * Returns true if activity is suspended, whether or not it was already suspended. + */ + boolean activateRequestedSuspend() + { + // Transition PENDING -> SUSPENDED + State state; + do + { + state = ref.get(); + } + while (state == State.PENDING && !ref.compareAndSet(state, State.SUSPENDED)); + return state != State.NORMAL; + } + + /** + * Returns true if activity was suspended and should now resume. + */ + boolean requestResume() + { + // Transition PENDING|SUSPENDED -> NORMAL + State state; + do + { + state = ref.get(); + } + while ((state == State.PENDING || state == State.SUSPENDED) && !ref.compareAndSet(state, State.NORMAL)); + return state == State.SUSPENDED; + } + + void eof() + { + ref.set(State.EOF); + } + + @Override + public String toString() + { + return ref.get().toString(); + } +} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/SuspenderTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/SuspenderTest.java new file mode 100644 index 00000000000..4d648706b8e --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/SuspenderTest.java @@ -0,0 +1,91 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.io; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.jupiter.api.Test; + +public class SuspenderTest { + + @Test + public void testNotSuspended() + { + Suspender suspender = new Suspender(); + assertThat("Initially normal", suspender.isSuspended(), is(false)); + + assertThat("No pending suspend to activate", suspender.activateRequestedSuspend(), is(false)); + assertThat("No pending suspend to activate", suspender.isSuspended(), is(false)); + + assertThat("No pending suspend to resume", suspender.requestResume(), is(false)); + assertThat("No pending suspend to resume", suspender.isSuspended(), is(false)); + } + + @Test + public void testSuspendThenResume() + { + Suspender suspender = new Suspender(); + assertThat("Initially suspended", suspender.isSuspended(), is(false)); + + suspender.requestSuspend(); + assertThat("Requesting suspend doesn't take effect immediately", suspender.isSuspended(), is(false)); + + assertThat("Resume from pending requires no followup", suspender.requestResume(), is(false)); + assertThat("Resume from pending requires no followup", suspender.isSuspended(), is(false)); + + assertThat("Requested suspend was discarded", suspender.activateRequestedSuspend(), is(false)); + assertThat("Requested suspend was discarded", suspender.isSuspended(), is(false)); + } + + @Test + public void testSuspendThenActivateThenResume() + { + Suspender suspender = new Suspender(); + assertThat("Initially suspended", suspender.isSuspended(), is(false)); + + suspender.requestSuspend(); + assertThat("Requesting suspend doesn't take effect immediately", suspender.isSuspended(), is(false)); + + assertThat("Suspend activated", suspender.activateRequestedSuspend(), is(true)); + assertThat("Suspend activated", suspender.isSuspended(), is(true)); + + assertThat("Resumed", suspender.requestResume(), is(true)); + assertThat("Resumed", suspender.isSuspended(), is(false)); + } + + @Test + public void testEof() + { + Suspender suspender = new Suspender(); + suspender.eof(); + assertThat(suspender.isSuspended(), is(true)); + + assertThat(suspender.activateRequestedSuspend(), is(true)); + + suspender.requestSuspend(); + assertThat(suspender.isSuspended(), is(true)); + + assertThat(suspender.activateRequestedSuspend(), is(true)); + assertThat(suspender.isSuspended(), is(true)); + + assertThat(suspender.requestResume(), is(false)); + assertThat(suspender.isSuspended(), is(true)); + } +}