Issue #3170 - changes from review

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-02-22 16:36:19 +11:00
parent 3e33b35d82
commit bd50e2941c
4 changed files with 77 additions and 90 deletions

View File

@ -69,8 +69,8 @@ public interface FrameHandler extends IncomingFrames
* FrameHandler can write during this call, but can not receive frames until the callback is succeeded.
* </p>
* <p>
* If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if not
* auto-demanding or call {@link CoreSession#demand(long)} if demanding.
* If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if
* not demanding or can now call {@link CoreSession#demand(long)} to receive frames if demanding.
* If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
* the connection will be closed. <br>
* </p>
@ -85,9 +85,8 @@ public interface FrameHandler extends IncomingFrames
* sequentially to satisfy all outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.
* Control and Data frames are passed to this method.
* Control frames that require a response (eg PING and CLOSE) may be responded to by the
* the handler, but if an appropriate response is not sent once the callback is succeeded,
* then a response will be generated and sent.
* Close frames may be responded to by the handler, but if an appropriate close response is not
* sent once the callback is succeeded, then a response close will be generated and sent.
*
* @param frame the raw frame
* @param callback the callback to indicate success in processing frame (or failure)

View File

@ -107,18 +107,12 @@ class BasicFrameHandler implements FrameHandler
public static class ServerEchoHandler extends BasicFrameHandler
{
private boolean throwOnFrame;
private boolean noResponse;
public void throwOnFrame()
{
throwOnFrame = true;
}
public void noResponseOnFrame()
{
noResponse = true;
}
public ServerEchoHandler(String name)
{
super(name);
@ -133,9 +127,6 @@ class BasicFrameHandler implements FrameHandler
if (throwOnFrame)
throw new RuntimeException("intentionally throwing in server onFrame()");
if (noResponse)
return;
if (frame.isDataFrame())
{
System.err.println(name + " echoDataFrame(): " + frame);
@ -145,7 +136,6 @@ class BasicFrameHandler implements FrameHandler
{
callback.succeeded();
}
}
}
}

View File

@ -38,8 +38,8 @@ class WebSocketProxy
NOT_OPEN,
CONNECTING,
OPEN,
ICLOSED,
OCLOSED,
ISHUT,
OSHUT,
CLOSED,
FAILED
}
@ -107,7 +107,6 @@ class WebSocketProxy
{
System.err.println(toString() + " onOpenSuccess()");
boolean failServer2Proxy = false;
Throwable failure = null;
synchronized (lock)
{
@ -119,7 +118,6 @@ class WebSocketProxy
case FAILED:
failure = error;
failServer2Proxy = true;
break;
default:
@ -128,10 +126,8 @@ class WebSocketProxy
}
}
if (failServer2Proxy)
if (failure != null)
server2Proxy.fail(failure, callback);
else if (failure != null)
callback.failed(failure);
else
callback.succeeded();
}
@ -152,6 +148,7 @@ class WebSocketProxy
case FAILED:
failure = error;
failure.addSuppressed(t);
break;
default:
@ -178,13 +175,15 @@ class WebSocketProxy
case OPEN:
if (frame.getOpCode() == OpCode.CLOSE)
{
state = State.ICLOSED;
state = State.ISHUT;
// the callback is saved until a close response comes in sendFrame from Server2Proxy
// if the callback was completed here then core would send its own close response
closeCallback = callback;
sendCallback = Callback.from(()->{}, callback::failed);
}
break;
case OCLOSED:
case OSHUT:
if (frame.getOpCode() == OpCode.CLOSE)
state = State.CLOSED;
break;
@ -248,7 +247,7 @@ class WebSocketProxy
sendCallback = Callback.from(callback, failure);
break;
case ICLOSED:
case ISHUT:
state = State.FAILED;
Callback doubleCallback = Callback.from(callback, closeCallback);
sendCallback = Callback.from(doubleCallback, failure);
@ -286,10 +285,10 @@ class WebSocketProxy
{
case OPEN:
if (frame.getOpCode() == OpCode.CLOSE)
state = State.OCLOSED;
state = State.OSHUT;
break;
case ICLOSED:
case ISHUT:
if (frame.getOpCode() == OpCode.CLOSE)
{
state = State.CLOSED;
@ -376,7 +375,9 @@ class WebSocketProxy
break;
default:
failure = new IllegalStateException();
state = State.FAILED;
error = new IllegalStateException();
failure = error;
break;
}
}
@ -389,7 +390,6 @@ class WebSocketProxy
{
System.err.println(toString() + " onConnectSuccess(): " + s);
Callback sendCallback = null;
Throwable failure = null;
synchronized (lock)
{
@ -400,19 +400,18 @@ class WebSocketProxy
case FAILED:
failure = error;
sendCallback = Callback.from(callback, failure);
break;
default:
failure = new IllegalStateException();
state = State.FAILED;
error = new IllegalStateException();
failure = error;
break;
}
}
if (sendCallback != null)
s.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
else if (failure != null)
callback.failed(failure);
if (failure != null)
s.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure));
else
callback.succeeded();
}
@ -436,10 +435,13 @@ class WebSocketProxy
break;
default:
failure = new IllegalStateException();
state = State.FAILED;
error = new IllegalStateException();
failure = error;
break;
}
}
callback.failed(failure);
}
@ -489,13 +491,13 @@ class WebSocketProxy
case OPEN:
if (frame.getOpCode() == OpCode.CLOSE)
{
state = State.ICLOSED;
state = State.ISHUT;
closeCallback = callback;
sendCallback = Callback.from(()->{}, callback::failed);
}
break;
case OCLOSED:
case OSHUT:
if (frame.getOpCode() == OpCode.CLOSE)
state = State.CLOSED;
break;
@ -567,7 +569,7 @@ class WebSocketProxy
sendCallback = Callback.from(callback, failure);
break;
case ICLOSED:
case ISHUT:
state = State.FAILED;
Callback doubleCallback = Callback.from(callback, closeCallback);
sendCallback = Callback.from(doubleCallback, failure);
@ -596,10 +598,10 @@ class WebSocketProxy
{
case OPEN:
if (frame.getOpCode() == OpCode.CLOSE)
state = State.OCLOSED;
state = State.OSHUT;
break;
case ICLOSED:
case ISHUT:
if (frame.getOpCode() == OpCode.CLOSE)
{
state = State.CLOSED;

View File

@ -153,7 +153,7 @@ public class WebSocketProxyTest
WebSocketProxy.Client2Proxy proxyClientSide = proxy.client2Proxy;
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/a"), clientHandler);
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
@ -264,7 +264,7 @@ public class WebSocketProxyTest
WebSocketProxy.Server2Proxy proxyServerSide = proxy.server2Proxy;
BasicFrameHandler clientHandler = new BasicFrameHandler("CLIENT");
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/test"), clientHandler);
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
response.get(5, TimeUnit.SECONDS);
@ -276,40 +276,41 @@ public class WebSocketProxyTest
CloseStatus closeStatus;
Frame frame;
// Client
frame = clientHandler.receivedFrames.poll();
assertThat(CloseStatus.getCloseStatus(frame).getCode(), is(CloseStatus.SERVER_ERROR));
// Client2Proxy
frame = proxyClientSide.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = proxyClientSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyClientSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
// Server2Proxy
frame = proxyServerSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyServerSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
// Server
frame = serverFrameHandler.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = serverFrameHandler.receivedFrames.poll();
assertNull(frame);
// Server2Proxy
frame = proxyServerSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
// Client
frame = clientHandler.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
// Client2Proxy receiving close response from Client
frame = proxyClientSide.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
// Check Proxy is in expected final state
assertNull(proxyClientSide.receivedFrames.poll());
assertNull(proxyServerSide.receivedFrames.poll());
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.CLOSED));
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.CLOSED));
}
@Test
@ -328,13 +329,10 @@ public class WebSocketProxyTest
receivedFrames.offer(Frame.copy(frame));
}
};
ClientUpgradeRequest upgradeRequest = ClientUpgradeRequest.from(_client, new URI("ws://localhost:8080/proxy/test"), clientHandler);
CompletableFuture<CoreSession> response = _client.connect(upgradeRequest);
CompletableFuture<CoreSession> response = _client.connect(clientHandler, new URI("ws://localhost:8080/proxy/"));
response.get(5, TimeUnit.SECONDS);
clientHandler.sendText("hello world");
clientHandler.awaitClose();
serverFrameHandler.awaitClose();
awaitProxyClose(proxyClientSide, proxyServerSide);
@ -342,22 +340,16 @@ public class WebSocketProxyTest
CloseStatus closeStatus;
Frame frame;
// Client
frame = clientHandler.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = clientHandler.receivedFrames.poll();
assertNull(frame);
// Client2Proxy
frame = proxyClientSide.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = proxyClientSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
// Server
frame = serverFrameHandler.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
assertNull(serverFrameHandler.receivedFrames.poll());
// Server2Proxy
frame = proxyServerSide.receivedFrames.poll();
@ -365,15 +357,19 @@ public class WebSocketProxyTest
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
frame = proxyServerSide.receivedFrames.poll();
assertNull(frame);
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
// Client
frame = clientHandler.receivedFrames.poll();
closeStatus = CloseStatus.getCloseStatus(frame);
assertThat(closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
assertThat(closeStatus.getReason(), is("intentionally throwing in server onFrame()"));
assertNull(clientHandler.receivedFrames.poll());
// Server
frame = serverFrameHandler.receivedFrames.poll();
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayloadAsUTF8(), is("hello world"));
frame = serverFrameHandler.receivedFrames.poll();
assertNull(frame);
// Client2Proxy does NOT receive close response from the client and fails
assertNull(proxyClientSide.receivedFrames.poll());
assertThat(proxyClientSide.getState(), is(WebSocketProxy.State.FAILED));
// Server2Proxy is failed by the Client2Proxy
assertNull(proxyServerSide.receivedFrames.poll());
assertThat(proxyServerSide.getState(), is(WebSocketProxy.State.FAILED));
}
}