diff --git a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index 9c16c7b3f66..6354ed30208 100644 --- a/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-websocket/websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -175,7 +175,7 @@ public interface Session extends Closeable void setIdleTimeout(long ms); /** - * Suspend a the incoming read events on the connection. + * Suspend the incoming read events on the connection. * * @return the suspend token suitable for resuming the reading of data on the connection. */ diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 00ccb55e0d8..75ed55eb02b 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -705,7 +705,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if (suspendToken.getAndSet(false)) { - fillInterested(); + if (!isReading()) + { + fillInterested(); + } } } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/SuspendResumeTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/SuspendResumeTest.java new file mode 100644 index 00000000000..d4e01e8c061 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/SuspendResumeTest.java @@ -0,0 +1,138 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.server; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.toolchain.test.EventQueue; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.SuspendToken; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.common.test.BlockheadClient; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class SuspendResumeTest +{ + @WebSocket + public static class EchoSocket + { + private Session session; + + @OnWebSocketConnect + public void onConnect(Session session) + { + this.session = session; + } + + @OnWebSocketMessage + public void onMessage(String message) + { + SuspendToken suspendToken = this.session.suspend(); + this.session.getRemote().sendString(message, + new WriteCallback() + { + + @Override + public void writeSuccess() + { + suspendToken.resume(); + } + + @Override + public void writeFailed(Throwable t) + { + Assert.fail(t.getMessage()); + } + }); + } + } + + public static class EchoCreator implements WebSocketCreator + { + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + return new EchoSocket(); + } + } + + public static class EchoServlet extends WebSocketServlet + { + private static final long serialVersionUID = 1L; + + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setCreator(new EchoCreator()); + } + } + + private static SimpleServletServer server; + + @BeforeClass + public static void startServer() throws Exception + { + server = new SimpleServletServer(new EchoServlet()); + server.start(); + } + + @AfterClass + public static void stopServer() + { + server.stop(); + } + + @Test + public void testSuspendResume() throws Exception + { + try (BlockheadClient client = new BlockheadClient(server.getServerUri())) + { + client.setTimeout(1, TimeUnit.SECONDS); + + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + client.write(new TextFrame().setPayload("echo1")); + client.write(new TextFrame().setPayload("echo2")); + + EventQueue frames = client.readFrames(2, 30, TimeUnit.SECONDS); + WebSocketFrame tf = frames.poll(); + assertThat(EchoSocket.class.getSimpleName() + ".onMessage()", tf.getPayloadAsUTF8(), is("echo1")); + tf = frames.poll(); + assertThat(EchoSocket.class.getSimpleName() + ".onMessage()", tf.getPayloadAsUTF8(), is("echo2")); + } + } +}