Issue #4571 - always close MessageSink before completing the Future

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-02-20 18:47:04 +11:00
parent bc6c950304
commit 34fd1481b8
2 changed files with 18 additions and 32 deletions

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.websocket.javax.tests.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -37,15 +36,13 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.MessageHandler;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.javax.tests.CoreServer;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -58,21 +55,14 @@ public class DecoderReaderManySmallTest
@BeforeEach
public void setUp() throws Exception
{
server = new CoreServer(new CoreServer.BaseNegotiator()
server = new CoreServer(WebSocketNegotiator.from((negotiation) ->
{
@Override
public FrameHandler negotiate(Negotiation negotiation) throws IOException
{
List<String> offeredSubProtocols = negotiation.getOfferedSubprotocols();
List<String> offeredSubProtocols = negotiation.getOfferedSubprotocols();
if (!offeredSubProtocols.isEmpty())
negotiation.setSubprotocol(offeredSubProtocols.get(0));
if (!offeredSubProtocols.isEmpty())
{
negotiation.setSubprotocol(offeredSubProtocols.get(0));
}
return new EventIdFrameHandler();
}
});
return new EventIdFrameHandler();
}));
server.start();
client = ContainerProvider.getWebSocketContainer();
@ -86,15 +76,13 @@ public class DecoderReaderManySmallTest
}
@Test
public void testManyIds(TestInfo testInfo) throws Exception
public void testManyIds() throws Exception
{
URI wsUri = server.getWsUri().resolve("/eventids");
EventIdSocket clientSocket = new EventIdSocket(testInfo.getTestMethod().toString());
final int from = 1000;
final int to = 2000;
try (Session clientSession = client.connectToServer(clientSocket, wsUri))
EventIdSocket clientSocket = new EventIdSocket();
try (Session clientSession = client.connectToServer(clientSocket, server.getWsUri()))
{
clientSession.getAsyncRemote().sendText("seq|" + from + "|" + to);
}
@ -154,12 +142,6 @@ public class DecoderReaderManySmallTest
{
public BlockingQueue<EventId> messageQueue = new LinkedBlockingDeque<>();
public EventIdSocket(String id)
{
super(id);
}
@SuppressWarnings("unused")
@OnMessage
public void onMessage(EventId msg)
{

View File

@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandle;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
@ -115,19 +116,22 @@ public abstract class DispatchedMessageSink extends AbstractMessageSink
dispatchComplete = new CompletableFuture<>();
// Dispatch to end user function (will likely start with blocking for data/accept).
// If the MessageSink can be closed do this after invoking and before completing the CompletableFuture.
new Thread(() ->
{
try
{
methodHandle.invoke(typeSink);
dispatchComplete.complete(null);
// If the MessageSink can be closed do this to free up resources.
if (typeSink instanceof Closeable)
((Closeable)typeSink).close();
IO.close((Closeable)typeSink);
dispatchComplete.complete(null);
}
catch (Throwable throwable)
{
if (typeSink instanceof Closeable)
IO.close((Closeable)typeSink);
dispatchComplete.completeExceptionally(throwable);
}
}).start();