Issue #3170 - ProxyFrameHandler race between onOpen and onError

- Introduced an EMPTY_SESSION in the ProxyFrameHandler as a terminal
state to know whether a FailedCoreSession has been handled

- Use while(true) loops to do the compareAndSet in ProxyFrameHandler

- Improved the tests for the proxy so that it tests the frames received
at every state (ie Client Proxy and Server)

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-13 11:30:53 +11:00
parent bc3df4fd28
commit bace5cf82c
3 changed files with 107 additions and 56 deletions

View File

@ -66,9 +66,9 @@ class BasicFrameHandler implements FrameHandler
session.sendFrame(textFrame, Callback.NOOP, false); session.sendFrame(textFrame, Callback.NOOP, false);
} }
public void close() throws InterruptedException public void close(String message) throws InterruptedException
{ {
session.close(CloseStatus.NORMAL, "standard close", Callback.NOOP); session.close(CloseStatus.NORMAL, message, Callback.NOOP);
awaitClose(); awaitClose();
} }
@ -78,9 +78,9 @@ class BasicFrameHandler implements FrameHandler
} }
public static class EchoHandler extends BasicFrameHandler public static class ServerEchoHandler extends BasicFrameHandler
{ {
public EchoHandler(String name) public ServerEchoHandler(String name)
{ {
super(name); super(name);
} }
@ -89,6 +89,7 @@ class BasicFrameHandler implements FrameHandler
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
System.err.println(name + " onFrame(): " + frame); System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
if (frame.isDataFrame()) if (frame.isDataFrame())
{ {
@ -96,9 +97,10 @@ class BasicFrameHandler implements FrameHandler
session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false); session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false);
} }
else else
{
callback.succeeded(); callback.succeeded();
}
receivedFrames.offer(Frame.copy(frame));
} }
} }
} }

View File

@ -2,8 +2,10 @@ package org.eclipse.jetty.websocket.core.proxy;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.Frame;
@ -16,23 +18,19 @@ class ProxyFrameHandler implements FrameHandler
{ {
private String name = "[ClientToProxy]"; private String name = "[ClientToProxy]";
private URI serverUri; private URI serverUri;
private WebSocketCoreClient client = new WebSocketCoreClient(); private WebSocketCoreClient client;
private CoreSession clientSession; private CoreSession clientSession;
private AtomicReference<CoreSession> serverSession = new AtomicReference<>(); private AtomicReference<CoreSession> serverSession = new AtomicReference<>();
private AtomicReference<Callback> closeFrameCallback = new AtomicReference<>(); private AtomicReference<Callback> closeFrameCallback = new AtomicReference<>();
public ProxyFrameHandler() private static CoreSession EMPTY_SESSION = new CoreSession.Empty();
protected BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
public ProxyFrameHandler(WebSocketCoreClient client, URI serverUri)
{ {
try this.client = client;
{ this.serverUri = serverUri;
serverUri = new URI("ws://localhost:8080/server");
client.start();
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException(e);
}
} }
@Override @Override
@ -48,27 +46,35 @@ class ProxyFrameHandler implements FrameHandler
{ {
if (t != null) if (t != null)
{ {
// We have failed to create the client so onClosed will never be called
// so it is our responsibility to close the WebSocketCoreClient
try
{
client.stop();
}
catch (Exception e)
{
t.addSuppressed(e);
}
// If an onError callback was waiting to be completed in serverToProxyFH onOpen, then we must fail it. // If an onError callback was waiting to be completed in serverToProxyFH onOpen, then we must fail it.
CoreSession session = this.serverSession.get(); while (true)
if (session instanceof FailedCoreSession)
{ {
FailedCoreSession failedSession = (FailedCoreSession)session; CoreSession session = serverSession.get();
failedSession.failed(t);
t.addSuppressed(failedSession.getThrowable()); if (session == null)
{
if (serverSession.compareAndSet(null, EMPTY_SESSION))
break;
}
else if (session == EMPTY_SESSION)
{
break;
}
else
{
if (serverSession.compareAndSet(session, EMPTY_SESSION))
{
if (session instanceof FailedCoreSession)
{
FailedCoreSession failedSession = (FailedCoreSession)session;
failedSession.failed(t);
t.addSuppressed(failedSession.getThrowable());
}
break;
}
}
} }
else
throw new IllegalStateException("onOpen was called but this callback was failed?");
callback.failed(t); callback.failed(t);
} }
@ -80,7 +86,7 @@ class ProxyFrameHandler implements FrameHandler
} }
catch (IOException e) catch (IOException e)
{ {
clientSession.close(CloseStatus.SERVER_ERROR, e.getMessage(), Callback.from(callback,e)); callback.failed(e);
} }
} }
@ -88,6 +94,7 @@ class ProxyFrameHandler implements FrameHandler
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
System.err.println(name + " onFrame(): " + frame); System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
onFrame(serverSession.get(), frame, callback); onFrame(serverSession.get(), frame, callback);
} }
@ -128,8 +135,28 @@ class ProxyFrameHandler implements FrameHandler
System.err.println(name + " onError(): " + cause); System.err.println(name + " onError(): " + cause);
cause.printStackTrace(); cause.printStackTrace();
if (!serverSession.compareAndSet(null, new FailedCoreSession(cause, callback))) while (true)
serverSession.get().close(CloseStatus.SHUTDOWN, cause.getMessage(), callback); {
CoreSession session = serverSession.get();
if (session == EMPTY_SESSION)
{
callback.failed(cause);
break;
}
else if (session == null)
{
if (serverSession.compareAndSet(null, new FailedCoreSession(cause, callback)))
break;
}
else
{
if (serverSession.compareAndSet(session, EMPTY_SESSION))
{
serverSession.get().close(CloseStatus.SHUTDOWN, cause.getMessage(), callback);
break;
}
}
}
} }
@Override @Override
@ -162,6 +189,7 @@ class ProxyFrameHandler implements FrameHandler
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
System.err.println(name + " onFrame(): " + frame); System.err.println(name + " onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame));
ProxyFrameHandler.this.onFrame(clientSession, frame, callback); ProxyFrameHandler.this.onFrame(clientSession, frame, callback);
} }
@ -177,16 +205,7 @@ class ProxyFrameHandler implements FrameHandler
public void onClosed(CloseStatus closeStatus, Callback callback) public void onClosed(CloseStatus closeStatus, Callback callback)
{ {
System.err.println(name + " onClosed(): " + closeStatus); System.err.println(name + " onClosed(): " + closeStatus);
callback.succeeded();
try
{
client.stop();
callback.succeeded();
}
catch (Exception e)
{
callback.failed(e);
}
} }
} }

View File

@ -8,6 +8,8 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession; import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
@ -17,11 +19,17 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
public class WebSocketProxyTest import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ProxyFrameHandlerTest
{ {
Server _server; Server _server;
WebSocketCoreClient _client; WebSocketCoreClient _client;
ProxyFrameHandler proxyFrameHandler;
BasicFrameHandler.ServerEchoHandler serverFrameHandler;
@BeforeEach @BeforeEach
public void start() throws Exception public void start() throws Exception
@ -34,22 +42,25 @@ public class WebSocketProxyTest
HandlerList handlers = new HandlerList(); HandlerList handlers = new HandlerList();
ContextHandler serverContext = new ContextHandler("/server"); ContextHandler serverContext = new ContextHandler("/server");
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> new BasicFrameHandler.EchoHandler("SERVER")); serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER");
WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler);
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator); WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
serverContext.setHandler(upgradeHandler); serverContext.setHandler(upgradeHandler);
handlers.addHandler(serverContext); handlers.addHandler(serverContext);
_client = new WebSocketCoreClient();
_client.start();
URI uri = new URI("ws://localhost:8080/server");
ContextHandler proxyContext = new ContextHandler("/proxy"); ContextHandler proxyContext = new ContextHandler("/proxy");
negotiator = WebSocketNegotiator.from((negotiation) -> new ProxyFrameHandler()); proxyFrameHandler = new ProxyFrameHandler(_client, uri);
negotiator = WebSocketNegotiator.from((negotiation) -> proxyFrameHandler);
upgradeHandler = new WebSocketUpgradeHandler(negotiator); upgradeHandler = new WebSocketUpgradeHandler(negotiator);
proxyContext.setHandler(upgradeHandler); proxyContext.setHandler(upgradeHandler);
handlers.addHandler(proxyContext); handlers.addHandler(proxyContext);
_server.setHandler(handlers); _server.setHandler(handlers);
_server.start(); _server.start();
_client = new WebSocketCoreClient();
_client.start();
} }
@AfterEach @AfterEach
@ -59,7 +70,6 @@ public class WebSocketProxyTest
_server.stop(); _server.stop();
} }
@Test @Test
public void testHello() throws Exception public void testHello() throws Exception
{ {
@ -69,6 +79,26 @@ public class WebSocketProxyTest
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest); CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS); response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world"); clientHandler.sendText("hello world");
clientHandler.close(); clientHandler.close("standard close");
Frame frame;
// Verify the the text frame was received
assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world"));
// Verify the right close frame was received
assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close"));
assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close"));
// Verify no other frames were received
assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(serverFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
assertNull(clientHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS));
} }
} }