- Replicate problems from WS close deadlock with test cases - use FutureCallback instead of SharedBlockingCallback for WS blocking methods - add timeout to the blocking callbacks for WS for (idleTimeout + 1000ms) - Core throws ClosedChannelException instead of ISE if send after closed
This commit is contained in:
parent
ae6610f4ba
commit
f6fd3c41a5
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -138,6 +139,42 @@ public class FutureCallback implements Future<Void>, Callback
|
|||
throw new ExecutionException(_cause);
|
||||
}
|
||||
|
||||
public void block() throws IOException
|
||||
{
|
||||
block(-1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void block(long timeout, TimeUnit unit) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
if (timeout > 0)
|
||||
get(timeout, unit);
|
||||
else
|
||||
get();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
InterruptedIOException exception = new InterruptedIOException();
|
||||
exception.initCause(e);
|
||||
throw exception;
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof RuntimeException)
|
||||
throw (RuntimeException)cause;
|
||||
else if (cause instanceof IOException)
|
||||
throw (IOException)cause;
|
||||
else
|
||||
throw new IOException(cause);
|
||||
}
|
||||
catch (TimeoutException e)
|
||||
{
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void rethrow(ExecutionException e) throws IOException
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
|
|
|
@ -400,16 +400,19 @@ public interface FrameHandler extends IncomingFrames
|
|||
@Override
|
||||
public void flush(Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -420,6 +423,7 @@ public interface FrameHandler extends IncomingFrames
|
|||
@Override
|
||||
public void sendFrame(Frame frame, Callback callback, boolean batch)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ public class WebSocketSessionState
|
|||
}
|
||||
}
|
||||
|
||||
public boolean onOutgoingFrame(Frame frame) throws ProtocolException
|
||||
public boolean onOutgoingFrame(Frame frame) throws Exception
|
||||
{
|
||||
byte opcode = frame.getOpCode();
|
||||
boolean fin = frame.isFin();
|
||||
|
@ -150,7 +150,7 @@ public class WebSocketSessionState
|
|||
synchronized (this)
|
||||
{
|
||||
if (!isOutputOpen())
|
||||
throw new IllegalStateException(_sessionState.toString());
|
||||
throw new ClosedChannelException();
|
||||
|
||||
if (opcode == OpCode.CLOSE)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under
|
||||
// the terms of the Eclipse Public License 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// This Source Code may also be made available under the following
|
||||
// Secondary Licenses when the conditions for such availability set
|
||||
// forth in the Eclipse Public License, v. 2.0 are satisfied:
|
||||
// the Apache License v2.0 which is available at
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class FlushTest
|
||||
{
|
||||
private WebSocketServer server;
|
||||
private TestFrameHandler serverHandler = new TestFrameHandler();
|
||||
private WebSocketCoreClient client;
|
||||
private WebSocketComponents components = new WebSocketComponents();
|
||||
|
||||
@BeforeEach
|
||||
public void startup() throws Exception
|
||||
{
|
||||
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverHandler);
|
||||
server = new WebSocketServer(negotiator);
|
||||
client = new WebSocketCoreClient(null, components);
|
||||
|
||||
server.start();
|
||||
client.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void shutdown() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStandardFlush() throws Exception
|
||||
{
|
||||
TestFrameHandler clientHandler = new TestFrameHandler();
|
||||
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
|
||||
connect.get(5, TimeUnit.SECONDS);
|
||||
|
||||
// Send a batched frame.
|
||||
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
|
||||
|
||||
// We have batched the frame and not sent it.
|
||||
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
|
||||
|
||||
// Once we flush the frame is received.
|
||||
clientHandler.getCoreSession().flush(Callback.NOOP);
|
||||
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
|
||||
|
||||
clientHandler.sendClose();
|
||||
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
|
||||
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertNull(clientHandler.getError());
|
||||
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushOnCloseFrame() throws Exception
|
||||
{
|
||||
TestFrameHandler clientHandler = new TestFrameHandler();
|
||||
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
|
||||
connect.get(5, TimeUnit.SECONDS);
|
||||
|
||||
// Send a batched frame.
|
||||
clientHandler.sendFrame(new Frame(OpCode.TEXT, "text payload"), Callback.NOOP, true);
|
||||
|
||||
// We have batched the frame and not sent it.
|
||||
assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS));
|
||||
|
||||
// Sending the close initiates the flush and the frame is received.
|
||||
clientHandler.sendClose();
|
||||
Frame frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
|
||||
assertThat(frame.getOpCode(), is(OpCode.TEXT));
|
||||
assertThat(frame.getPayloadAsUTF8(), is("text payload"));
|
||||
|
||||
frame = Objects.requireNonNull(serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS));
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NO_CODE));
|
||||
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertNull(clientHandler.getError());
|
||||
assertThat(clientHandler.closeStatus.getCode(), is(CloseStatus.NO_CODE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushAfterClose() throws Exception
|
||||
{
|
||||
TestFrameHandler clientHandler = new TestFrameHandler();
|
||||
CompletableFuture<FrameHandler.CoreSession> connect = client.connect(clientHandler, server.getUri());
|
||||
connect.get(5, TimeUnit.SECONDS);
|
||||
|
||||
clientHandler.sendClose();
|
||||
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertNull(clientHandler.getError());
|
||||
|
||||
Callback.Completable flushCallback = new Callback.Completable();
|
||||
clientHandler.getCoreSession().flush(flushCallback);
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> flushCallback.get(5, TimeUnit.SECONDS));
|
||||
assertThat(e.getCause(), instanceOf(ClosedChannelException.class));
|
||||
}
|
||||
}
|
|
@ -19,9 +19,12 @@
|
|||
package org.eclipse.jetty.websocket.core;
|
||||
|
||||
import java.net.Socket;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.server.NetworkConnector;
|
||||
|
@ -48,10 +51,13 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
import static org.eclipse.jetty.util.Callback.NOOP;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -425,6 +431,97 @@ public class WebSocketCloseTest extends WebSocketTester
|
|||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
|
||||
public void doubleNormalClose(String scheme) throws Exception
|
||||
{
|
||||
setup(State.OPEN, scheme);
|
||||
|
||||
Callback.Completable callback1 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 1", callback1);
|
||||
Callback.Completable callback2 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal 2", callback2);
|
||||
|
||||
// First Callback Succeeded
|
||||
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
|
||||
|
||||
// Second Callback Failed with ClosedChannelException
|
||||
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
|
||||
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
|
||||
|
||||
// Normal close frame received on client.
|
||||
Frame closeFrame = receiveFrame(client.getInputStream());
|
||||
assertThat(closeFrame.getOpCode(), is(OpCode.CLOSE));
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(closeFrame);
|
||||
assertThat(closeStatus.getCode(), is(CloseStatus.NORMAL));
|
||||
assertThat(closeStatus.getReason(), is("normal 1"));
|
||||
|
||||
// Send close response from client.
|
||||
client.getOutputStream().write(RawFrameBuilder.buildClose(
|
||||
new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true));
|
||||
|
||||
server.handler.getCoreSession().demand(1);
|
||||
assertNotNull(server.handler.receivedFrames.poll(10, TimeUnit.SECONDS));
|
||||
Callback closeFrameCallback = Objects.requireNonNull(server.handler.receivedCallback.poll());
|
||||
closeFrameCallback.succeeded();
|
||||
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.NORMAL));
|
||||
assertThat(server.handler.closeStatus.getReason(), is("normal response 1"));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
|
||||
public void doubleAbnormalClose(String scheme) throws Exception
|
||||
{
|
||||
setup(State.OPEN, scheme);
|
||||
|
||||
Callback.Completable callback1 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "server error should succeed", callback1);
|
||||
Callback.Completable callback2 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.PROTOCOL, "protocol error should fail", callback2);
|
||||
|
||||
// First Callback Succeeded
|
||||
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
|
||||
|
||||
// Second Callback Failed with ClosedChannelException
|
||||
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
|
||||
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
|
||||
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(server.handler.closeStatus.getReason(), containsString("server error should succeed"));
|
||||
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {WS_SCHEME, WSS_SCHEME})
|
||||
public void doubleCloseAbnormalOvertakesNormalClose(String scheme) throws Exception
|
||||
{
|
||||
setup(State.OPEN, scheme);
|
||||
|
||||
Callback.Completable callback1 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.NORMAL, "normal close (client does not complete close handshake)", callback1);
|
||||
Callback.Completable callback2 = new Callback.Completable();
|
||||
server.handler.getCoreSession().close(CloseStatus.SERVER_ERROR, "error close should overtake normal close", callback2);
|
||||
|
||||
// First Callback Succeeded
|
||||
assertDoesNotThrow(() -> callback1.get(5, TimeUnit.SECONDS));
|
||||
|
||||
// Second Callback Failed with ClosedChannelException
|
||||
ExecutionException error = assertThrows(ExecutionException.class, () -> callback2.get(5, TimeUnit.SECONDS));
|
||||
assertThat(error.getCause(), instanceOf(ClosedChannelException.class));
|
||||
|
||||
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
|
||||
assertThat(server.handler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
|
||||
assertThat(server.handler.closeStatus.getReason(), containsString("error close should overtake normal close"));
|
||||
|
||||
Frame frame = receiveFrame(client.getInputStream());
|
||||
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
|
||||
}
|
||||
|
||||
static class DemandingTestFrameHandler implements SynchronousFrameHandler
|
||||
{
|
||||
private CoreSession coreSession;
|
||||
|
|
|
@ -47,6 +47,7 @@ public class WebSocketTester
|
|||
private static String NON_RANDOM_KEY = Base64.getEncoder().encodeToString("0123456701234567".getBytes());
|
||||
private static SslContextFactory.Client sslContextFactory;
|
||||
protected ByteBufferPool bufferPool;
|
||||
protected ByteBuffer buffer;
|
||||
protected Parser parser;
|
||||
|
||||
@BeforeAll
|
||||
|
@ -159,33 +160,34 @@ public class WebSocketTester
|
|||
|
||||
protected Parser.ParsedFrame receiveFrame(InputStream in) throws IOException
|
||||
{
|
||||
ByteBuffer buffer = bufferPool.acquire(4096, false);
|
||||
if (buffer == null)
|
||||
buffer = bufferPool.acquire(4096, false);
|
||||
|
||||
while (true)
|
||||
{
|
||||
Parser.ParsedFrame frame = parser.parse(buffer);
|
||||
if (!buffer.hasRemaining())
|
||||
BufferUtil.clear(buffer);
|
||||
if (frame != null)
|
||||
return frame;
|
||||
|
||||
int p = BufferUtil.flipToFill(buffer);
|
||||
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
|
||||
if (len < 0)
|
||||
return null;
|
||||
buffer.position(buffer.position() + len);
|
||||
BufferUtil.flipToFlush(buffer, p);
|
||||
|
||||
Parser.ParsedFrame frame = parser.parse(buffer);
|
||||
if (frame != null)
|
||||
return frame;
|
||||
}
|
||||
}
|
||||
|
||||
protected void receiveEof(InputStream in) throws IOException
|
||||
{
|
||||
ByteBuffer buffer = bufferPool.acquire(4096, false);
|
||||
while (true)
|
||||
{
|
||||
BufferUtil.flipToFill(buffer);
|
||||
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
|
||||
if (len < 0)
|
||||
return;
|
||||
BufferUtil.clearToFill(buffer);
|
||||
int len = in.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
|
||||
if (len < 0)
|
||||
return;
|
||||
|
||||
throw new IllegalStateException("unexpected content");
|
||||
}
|
||||
throw new IllegalStateException("unexpected content");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,12 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.io.Writer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.websocket.EncodeException;
|
||||
import javax.websocket.RemoteEndpoint;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
|
@ -65,10 +66,10 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
|
|||
{
|
||||
LOG.debug("sendBinary({})", BufferUtil.toDetailString(data));
|
||||
}
|
||||
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
|
||||
{
|
||||
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
|
||||
}
|
||||
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(new Frame(OpCode.BINARY).setPayload(data), b, false);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,37 +80,36 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
|
|||
{
|
||||
LOG.debug("sendBinary({},{})", BufferUtil.toDetailString(partialByte), isLast);
|
||||
}
|
||||
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
|
||||
{
|
||||
Frame frame;
|
||||
switch (messageType)
|
||||
{
|
||||
case -1:
|
||||
// New message!
|
||||
frame = new Frame(OpCode.BINARY);
|
||||
break;
|
||||
case OpCode.TEXT:
|
||||
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
|
||||
case OpCode.BINARY:
|
||||
frame = new Frame(OpCode.CONTINUATION);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
|
||||
}
|
||||
|
||||
frame.setPayload(partialByte);
|
||||
frame.setFin(isLast);
|
||||
sendFrame(frame, b, false);
|
||||
Frame frame;
|
||||
switch (messageType)
|
||||
{
|
||||
case -1:
|
||||
// New message!
|
||||
frame = new Frame(OpCode.BINARY);
|
||||
break;
|
||||
case OpCode.TEXT:
|
||||
throw new IllegalStateException("Cannot send a partial BINARY message: TEXT message in progress");
|
||||
case OpCode.BINARY:
|
||||
frame = new Frame(OpCode.CONTINUATION);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Cannot send a partial BINARY message: unrecognized active message type " + OpCode.name(messageType));
|
||||
}
|
||||
|
||||
frame.setPayload(partialByte);
|
||||
frame.setFin(isLast);
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(frame, b, false);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendObject(Object data) throws IOException, EncodeException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
|
||||
{
|
||||
super.sendObject(data, b);
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
super.sendObject(data, b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,10 +120,11 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
|
|||
{
|
||||
LOG.debug("sendText({})", TextUtil.hint(text));
|
||||
}
|
||||
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
|
||||
{
|
||||
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
|
||||
}
|
||||
|
||||
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(new Frame(OpCode.TEXT).setPayload(text), b, false);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,27 +135,33 @@ public class JavaxWebSocketBasicRemote extends JavaxWebSocketRemoteEndpoint impl
|
|||
{
|
||||
LOG.debug("sendText({},{})", TextUtil.hint(partialMessage), isLast);
|
||||
}
|
||||
try (SharedBlockingCallback.Blocker b = session.getBlocking().acquire())
|
||||
{
|
||||
Frame frame;
|
||||
switch (messageType)
|
||||
{
|
||||
case -1:
|
||||
// New message!
|
||||
frame = new Frame(OpCode.TEXT);
|
||||
break;
|
||||
case OpCode.TEXT:
|
||||
frame = new Frame(OpCode.CONTINUATION);
|
||||
break;
|
||||
case OpCode.BINARY:
|
||||
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
|
||||
default:
|
||||
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
|
||||
}
|
||||
|
||||
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
|
||||
frame.setFin(isLast);
|
||||
sendFrame(frame, b, false);
|
||||
Frame frame;
|
||||
switch (messageType)
|
||||
{
|
||||
case -1:
|
||||
// New message!
|
||||
frame = new Frame(OpCode.TEXT);
|
||||
break;
|
||||
case OpCode.TEXT:
|
||||
frame = new Frame(OpCode.CONTINUATION);
|
||||
break;
|
||||
case OpCode.BINARY:
|
||||
throw new IllegalStateException("Cannot send a partial TEXT message: BINARY message in progress");
|
||||
default:
|
||||
throw new IllegalStateException("Cannot send a partial TEXT message: unrecognized active message type " + OpCode.name(messageType));
|
||||
}
|
||||
|
||||
frame.setPayload(BufferUtil.toBuffer(partialMessage, UTF_8));
|
||||
frame.setFin(isLast);
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(frame, b, false);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private long getBlockingTimeout()
|
||||
{
|
||||
long idleTimeout = getIdleTimeout();
|
||||
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,13 +21,14 @@ package org.eclipse.jetty.websocket.javax.common;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.websocket.EncodeException;
|
||||
import javax.websocket.Encoder;
|
||||
import javax.websocket.SendHandler;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
|
@ -66,10 +67,9 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
|
|||
@Override
|
||||
public void flushBatch() throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker blocker = session.getBlocking().acquire())
|
||||
{
|
||||
coreSession.flush(blocker);
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
coreSession.flush(b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -227,24 +227,22 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
|
|||
public void sendPing(ByteBuffer data) throws IOException, IllegalArgumentException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendPing({})", BufferUtil.toDetailString(data));
|
||||
}
|
||||
// TODO: is this supposed to be a blocking call?
|
||||
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
|
||||
sendFrame(new Frame(OpCode.PING).setPayload(data), Callback.NOOP, batch);
|
||||
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(new Frame(OpCode.PING).setPayload(data), b, batch);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPong(ByteBuffer data) throws IOException, IllegalArgumentException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendPong({})", BufferUtil.toDetailString(data));
|
||||
}
|
||||
// TODO: is this supposed to be a blocking call?
|
||||
// TODO: what to do on excessively large payloads (error and close connection per RFC6455, or truncate?)
|
||||
sendFrame(new Frame(OpCode.PONG).setPayload(data), Callback.NOOP, batch);
|
||||
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendFrame(new Frame(OpCode.PONG).setPayload(data), b, batch);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected void assertMessageNotNull(Object data)
|
||||
|
@ -262,4 +260,10 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
|
|||
throw new IllegalArgumentException("SendHandler cannot be null");
|
||||
}
|
||||
}
|
||||
|
||||
private long getBlockingTimeout()
|
||||
{
|
||||
long idleTimeout = getIdleTimeout();
|
||||
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.websocket.CloseReason;
|
||||
import javax.websocket.EndpointConfig;
|
||||
|
@ -38,7 +39,7 @@ import javax.websocket.RemoteEndpoint.Basic;
|
|||
import javax.websocket.Session;
|
||||
import javax.websocket.WebSocketContainer;
|
||||
|
||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.ExtensionConfig;
|
||||
|
@ -54,7 +55,6 @@ public class JavaxWebSocketSession implements javax.websocket.Session
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(JavaxWebSocketSession.class);
|
||||
|
||||
protected final SharedBlockingCallback blocking = new SharedBlockingCallback();
|
||||
private final JavaxWebSocketContainer container;
|
||||
private final FrameHandler.CoreSession coreSession;
|
||||
private final JavaxWebSocketFrameHandler frameHandler;
|
||||
|
@ -179,10 +179,7 @@ public class JavaxWebSocketSession implements javax.websocket.Session
|
|||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
|
||||
{
|
||||
coreSession.close(blocker);
|
||||
}
|
||||
close(new CloseReason(CloseReason.CloseCodes.NO_STATUS_CODE, null));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -194,10 +191,15 @@ public class JavaxWebSocketSession implements javax.websocket.Session
|
|||
@Override
|
||||
public void close(CloseReason closeReason) throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker blocker = blocking.acquire())
|
||||
{
|
||||
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), blocker);
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
coreSession.close(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase(), b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private long getBlockingTimeout()
|
||||
{
|
||||
long idleTimeout = getMaxIdleTimeout();
|
||||
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -565,9 +567,4 @@ public class JavaxWebSocketSession implements javax.websocket.Session
|
|||
return String.format("%s@%x[%s,%s]", this.getClass().getSimpleName(), this.hashCode(),
|
||||
coreSession.getBehavior(), frameHandler);
|
||||
}
|
||||
|
||||
protected SharedBlockingCallback getBlocking()
|
||||
{
|
||||
return blocking;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.javax.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
@ -25,9 +26,13 @@ import javax.websocket.CloseReason;
|
|||
import javax.websocket.Session;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(SessionTracker.class);
|
||||
|
||||
private CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
|
||||
|
||||
public Set<Session> getSessions()
|
||||
|
@ -52,8 +57,15 @@ public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketS
|
|||
{
|
||||
for (Session session : sessions)
|
||||
{
|
||||
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
|
||||
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
|
||||
try
|
||||
{
|
||||
// GOING_AWAY is abnormal close status so it will hard close connection after sent.
|
||||
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.ignore(e);
|
||||
}
|
||||
}
|
||||
|
||||
super.doStop();
|
||||
|
|
|
@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
public class SessionTrackingTest
|
||||
{
|
||||
static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>();
|
||||
private static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>();
|
||||
|
||||
@ServerEndpoint("/session-info/{sessionId}")
|
||||
public static class SessionTrackingSocket
|
||||
|
@ -104,59 +104,53 @@ public class SessionTrackingTest
|
|||
EventSocket clientSocket2 = new EventSocket();
|
||||
EventSocket clientSocket3 = new EventSocket();
|
||||
|
||||
try (Session session1 = client.connectToServer(clientSocket1, server.getWsUri().resolve("/session-info/1")))
|
||||
{
|
||||
Session serverSession1 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession1);
|
||||
sendTextFrameToAll("openSessions|in-1", session1);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-1).size=1"));
|
||||
Session session1 = client.connectToServer(clientSocket1, server.getWsUri().resolve("/session-info/1"));
|
||||
Session serverSession1 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession1);
|
||||
sendTextFrameToAll("openSessions|in-1", session1);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-1).size=1"));
|
||||
|
||||
try (Session session2 = client.connectToServer(clientSocket2, server.getWsUri().resolve("/session-info/2")))
|
||||
{
|
||||
Session serverSession2 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession2);
|
||||
sendTextFrameToAll("openSessions|in-2", session1, session2);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
|
||||
Session session2 = client.connectToServer(clientSocket2, server.getWsUri().resolve("/session-info/2"));
|
||||
Session serverSession2 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession2);
|
||||
sendTextFrameToAll("openSessions|in-2", session1, session2);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
|
||||
|
||||
try (Session session3 = client.connectToServer(clientSocket3, server.getWsUri().resolve("/session-info/3")))
|
||||
{
|
||||
Session serverSession3 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession3);
|
||||
sendTextFrameToAll("openSessions|in-3", session1, session2, session3);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
Session session3 = client.connectToServer(clientSocket3, server.getWsUri().resolve("/session-info/3"));
|
||||
Session serverSession3 = serverSessions.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(serverSession3);
|
||||
sendTextFrameToAll("openSessions|in-3", session1, session2, session3);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
|
||||
|
||||
sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
|
||||
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session3.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession3));
|
||||
assertTrue(clientSocket3.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session3.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession3));
|
||||
assertTrue(clientSocket3.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
sendTextFrameToAll("openSessions|lvl-2", session1, session2);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
|
||||
sendTextFrameToAll("openSessions|lvl-2", session1, session2);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
|
||||
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
|
||||
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session2.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession2));
|
||||
assertTrue(clientSocket2.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session2.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession2));
|
||||
assertTrue(clientSocket2.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
sendTextFrameToAll("openSessions|lvl-1", session1);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-1).size=1"));
|
||||
sendTextFrameToAll("openSessions|lvl-1", session1);
|
||||
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-1).size=1"));
|
||||
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session1.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession1));
|
||||
assertTrue(clientSocket1.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
// assert session is closed, and we have received the notification from the SessionListener
|
||||
session1.close();
|
||||
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession1));
|
||||
assertTrue(clientSocket1.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private static void sendTextFrameToAll(String msg, Session... sessions) throws IOException
|
||||
|
|
|
@ -22,11 +22,15 @@ import java.io.IOException;
|
|||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
|
@ -37,9 +41,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
|
|||
|
||||
public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket.api.RemoteEndpoint
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(JettyWebSocketRemoteEndpoint.class);
|
||||
|
||||
private final FrameHandler.CoreSession coreSession;
|
||||
private byte messageType = -1;
|
||||
private final SharedBlockingCallback blocker = new SharedBlockingCallback();
|
||||
private BatchMode batchMode;
|
||||
|
||||
public JettyWebSocketRemoteEndpoint(FrameHandler.CoreSession coreSession, BatchMode batchMode)
|
||||
|
@ -55,14 +60,7 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
*/
|
||||
public void close()
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
{
|
||||
coreSession.close(b);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
coreSession.close(Callback.NOOP);
|
||||
}
|
||||
close(StatusCode.NO_CODE, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,13 +72,15 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
*/
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
try
|
||||
{
|
||||
FutureCallback b = new FutureCallback();
|
||||
coreSession.close(statusCode, reason, b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
coreSession.close(Callback.NOOP);
|
||||
LOG.ignore(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,11 +114,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
@Override
|
||||
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
{
|
||||
sendPartialBytes(fragment, isLast, b);
|
||||
b.block();
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendPartialBytes(fragment, isLast, b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -158,11 +156,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
@Override
|
||||
public void sendPartialString(String fragment, boolean isLast) throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
{
|
||||
sendPartialText(fragment, isLast, b);
|
||||
b.block();
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
sendPartialText(fragment, isLast, b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -227,16 +223,9 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
|
||||
private void sendBlocking(Frame frame) throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
{
|
||||
coreSession.sendFrame(frame, b, false);
|
||||
b.block();
|
||||
}
|
||||
}
|
||||
|
||||
protected FrameHandler.CoreSession getCoreSession()
|
||||
{
|
||||
return coreSession;
|
||||
FutureCallback b = new FutureCallback();
|
||||
coreSession.sendFrame(frame, b, false);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -265,10 +254,14 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
|
|||
@Override
|
||||
public void flush() throws IOException
|
||||
{
|
||||
try (SharedBlockingCallback.Blocker b = blocker.acquire())
|
||||
{
|
||||
coreSession.flush(b);
|
||||
b.block();
|
||||
}
|
||||
FutureCallback b = new FutureCallback();
|
||||
coreSession.flush(b);
|
||||
b.block(getBlockingTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private long getBlockingTimeout()
|
||||
{
|
||||
long idleTimeout = coreSession.getIdleTimeout().toMillis();
|
||||
return (idleTimeout > 0) ? idleTimeout + 1000 : idleTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ public class WebSocketSession implements Session, SuspendToken, Dumpable
|
|||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
return remoteEndpoint.getCoreSession().isOutputOpen();
|
||||
return coreSession.isOutputOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.tests;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
@ -125,8 +126,7 @@ public class WebSocketStopTest
|
|||
assertThat(clientSocket.statusCode, is(StatusCode.NORMAL));
|
||||
assertThat(serverSocket.statusCode, is(StatusCode.NORMAL));
|
||||
|
||||
IllegalStateException failure = assertThrows(IllegalStateException.class,
|
||||
assertThrows(ClosedChannelException.class,
|
||||
() -> session.getRemote().sendString("this should fail before ExtensionStack"));
|
||||
assertThat(failure.getMessage(), is("CLOSED"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -285,6 +286,29 @@ public class ClientCloseTest
|
|||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDoubleClose() throws Exception
|
||||
{
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
// Client confirms connection via echo.
|
||||
confirmConnection(clientSocket, clientConnectFuture);
|
||||
|
||||
// Close twice, first close should succeed and second close is a NOOP
|
||||
clientSocket.getSession().close(StatusCode.NORMAL, "close1");
|
||||
clientSocket.getSession().close(StatusCode.NO_CODE, "close2");
|
||||
|
||||
// Second close is ignored, we are notified of the first close.
|
||||
clientSocket.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString("close1"));
|
||||
assertNull(clientSocket.error.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopLifecycle() throws Exception
|
||||
{
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
|
@ -34,6 +35,7 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
public abstract class AbstractCloseEndpoint extends WebSocketAdapter
|
||||
{
|
||||
public final Logger log;
|
||||
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
public String closeReason = null;
|
||||
public int closeStatusCode = -1;
|
||||
|
@ -44,6 +46,14 @@ public abstract class AbstractCloseEndpoint extends WebSocketAdapter
|
|||
this.log = Log.getLogger(this.getClass().getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
super.onWebSocketConnect(sess);
|
||||
log.debug("onWebSocketConnect({})", sess);
|
||||
openLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under
|
||||
// the terms of the Eclipse Public License 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// This Source Code may also be made available under the following
|
||||
// Secondary Licenses when the conditions for such availability set
|
||||
// forth in the Eclipse Public License, v. 2.0 are satisfied:
|
||||
// the Apache License v2.0 which is available at
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
|
||||
public class CloseInOnCloseEndpoint extends AbstractCloseEndpoint
|
||||
{
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
getSession().close(StatusCode.SERVER_ERROR, "this should be a noop");
|
||||
super.onWebSocketClose(statusCode, reason);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under
|
||||
// the terms of the Eclipse Public License 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// This Source Code may also be made available under the following
|
||||
// Secondary Licenses when the conditions for such availability set
|
||||
// forth in the Eclipse Public License, v. 2.0 are satisfied:
|
||||
// the Apache License v2.0 which is available at
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
|
||||
public class CloseInOnCloseEndpointNewThread extends AbstractCloseEndpoint
|
||||
{
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
try
|
||||
{
|
||||
CountDownLatch complete = new CountDownLatch(1);
|
||||
new Thread(() ->
|
||||
{
|
||||
getSession().close(StatusCode.SERVER_ERROR, "this should be a noop");
|
||||
complete.countDown();
|
||||
}).start();
|
||||
complete.await();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
super.onWebSocketClose(statusCode, reason);
|
||||
}
|
||||
}
|
|
@ -63,6 +63,16 @@ public class ServerCloseCreator implements JettyWebSocketCreator
|
|||
closeSocket = new ContainerEndpoint(container);
|
||||
resp.setAcceptedSubProtocol("container");
|
||||
}
|
||||
else if (req.hasSubProtocol("closeInOnClose"))
|
||||
{
|
||||
closeSocket = new CloseInOnCloseEndpoint();
|
||||
resp.setAcceptedSubProtocol("closeInOnClose");
|
||||
}
|
||||
else if (req.hasSubProtocol("closeInOnCloseNewThread"))
|
||||
{
|
||||
closeSocket = new CloseInOnCloseEndpointNewThread();
|
||||
resp.setAcceptedSubProtocol("closeInOnCloseNewThread");
|
||||
}
|
||||
|
||||
if (closeSocket != null)
|
||||
{
|
||||
|
|
|
@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests various close scenarios
|
||||
|
@ -275,4 +276,52 @@ public class ServerCloseTest
|
|||
close(session);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondCloseFromOnClosed() throws Exception
|
||||
{
|
||||
// Testing WebSocketSession.close() in onClosed() does not cause deadlock.
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("closeInOnClose");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
client.connect(clientEndpoint, wsUri, request).get(5, SECONDS);
|
||||
|
||||
// Hard close from the server. Server onClosed() will try to close again which should be a NOOP.
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
assertTrue(serverEndpoint.openLatch.await(5, SECONDS));
|
||||
serverEndpoint.getSession().close(StatusCode.SHUTDOWN, "SHUTDOWN hard close");
|
||||
|
||||
// Verify that client got close
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SHUTDOWN), containsString("SHUTDOWN hard close"));
|
||||
|
||||
// Verify that server socket got close event
|
||||
assertTrue(serverEndpoint.closeLatch.await(5, SECONDS));
|
||||
assertThat(serverEndpoint.closeStatusCode, is(StatusCode.SHUTDOWN));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondCloseFromOnClosedInNewThread() throws Exception
|
||||
{
|
||||
// Testing WebSocketSession.close() in onClosed() does not cause deadlock.
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("closeInOnCloseNewThread");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
client.connect(clientEndpoint, wsUri, request).get(5, SECONDS);
|
||||
|
||||
// Hard close from the server. Server onClosed() will try to close again which should be a NOOP.
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
assertTrue(serverEndpoint.openLatch.await(5, SECONDS));
|
||||
serverEndpoint.getSession().close(StatusCode.SHUTDOWN, "SHUTDOWN hard close");
|
||||
|
||||
// Verify that client got close
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SHUTDOWN), containsString("SHUTDOWN hard close"));
|
||||
|
||||
// Verify that server socket got close event
|
||||
assertTrue(serverEndpoint.closeLatch.await(5, SECONDS));
|
||||
assertThat(serverEndpoint.closeStatusCode, is(StatusCode.SHUTDOWN));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue