Issue #3170 - WebSocketProxy separate actions from the state changes

only do state changes inside synchronized blocks, remember what action
to do and only do this outside of the synchronized block

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-14 15:40:25 +11:00
parent ace5e7bbe2
commit 001bc8f296
1 changed files with 221 additions and 178 deletions

View File

@ -54,6 +54,7 @@ class WebSocketProxy
{ {
System.err.println("[Client2Proxy] onOpen: " + session); System.err.println("[Client2Proxy] onOpen: " + session);
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -61,62 +62,74 @@ class WebSocketProxy
case NOT_OPEN: case NOT_OPEN:
state = State.CONNECTING; state = State.CONNECTING;
client = session; client = session;
Callback wrappedOnOpenCallback = new Callback()
{
@Override
public void succeeded()
{
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.OPEN;
callback.succeeded();
break;
case FAILED:
server2Proxy.fail(error, callback);
break;
default:
callback.failed(new IllegalStateException());
}
}
}
@Override
public void failed(Throwable x)
{
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.FAILED;
error = x;
callback.failed(x);
break;
case FAILED:
callback.failed(x);
break;
default:
callback.failed(new IllegalStateException());
}
}
}
};
server2Proxy.connect(wrappedOnOpenCallback);
break; break;
default: default:
throw new IllegalStateException(); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
server2Proxy.connect(Callback.from(() -> onOpenSuccess(callback), (t) -> onOpenFail(callback, t)));
}
private void onOpenSuccess(Callback callback)
{
boolean failServer2Proxy = false;
Throwable failure = null;
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.OPEN;
break;
case FAILED:
failure = error;
failServer2Proxy = true;
break;
default:
failure = new IllegalStateException();
break;
}
}
if (failServer2Proxy)
server2Proxy.fail(failure, callback);
else if (failure != null)
callback.failed(failure);
else
callback.succeeded();
}
private void onOpenFail(Callback callback, Throwable t)
{
Throwable failure = t;
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.FAILED;
error = t;
break;
case FAILED:
failure = error;
break;
default:
failure = new IllegalStateException();
break;
}
}
callback.failed(failure);
} }
@Override @Override
@ -125,6 +138,8 @@ class WebSocketProxy
System.err.println("[Client2Proxy] onFrame(): " + frame); System.err.println("[Client2Proxy] onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame)); receivedFrames.offer(Frame.copy(frame));
Callback sendCallback = callback;
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -134,28 +149,29 @@ class WebSocketProxy
{ {
state = State.ICLOSED; state = State.ICLOSED;
closeCallback = callback; closeCallback = callback;
server2Proxy.send(frame, Callback.from(()->{}, callback::failed)); sendCallback = Callback.from(()->{}, callback::failed);
}
else
{
server2Proxy.send(frame, callback);
} }
break; break;
case OCLOSED: case OCLOSED:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
state = State.CLOSED; state = State.CLOSED;
server2Proxy.send(frame, callback);
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break;
default: default:
callback.failed(new IllegalStateException()); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
server2Proxy.send(frame, sendCallback);
} }
@Override @Override
@ -164,79 +180,75 @@ class WebSocketProxy
System.err.println("[Client2Proxy] onError(): " + failure); System.err.println("[Client2Proxy] onError(): " + failure);
failure.printStackTrace(); failure.printStackTrace();
boolean failServer2Proxy;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
{ {
case FAILED: case FAILED:
case CLOSED: case CLOSED:
callback.failed(failure); failServer2Proxy = false;
break; break;
default: default:
state = State.FAILED; state = State.FAILED;
error = failure; error = failure;
server2Proxy.fail(failure,callback); failServer2Proxy = true;
break; break;
} }
} }
if (failServer2Proxy)
server2Proxy.fail(failure,callback);
else
callback.failed(failure);
} }
public void fail(Throwable failure, Callback callback) public void fail(Throwable failure, Callback callback)
{ {
System.err.println("[Client2Proxy] fail(): " + failure); System.err.println("[Client2Proxy] fail(): " + failure);
Callback sendCallback = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
{ {
case NOT_OPEN:
state = State.FAILED;
callback.failed(failure);
break;
case CONNECTING:
state = State.FAILED;
callback.failed(failure);
break;
case OPEN: case OPEN:
state = State.FAILED; state = State.FAILED;
client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); sendCallback = Callback.from(callback, failure);
break; break;
case ICLOSED: case ICLOSED:
state = State.FAILED; state = State.FAILED;
Callback doubleCallback = Callback.from(callback, closeCallback); Callback doubleCallback = Callback.from(callback, closeCallback);
client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); sendCallback = Callback.from(doubleCallback, failure);
case FAILED:
case CLOSED:
case OCLOSED:
state = State.FAILED;
callback.failed(failure);
break; break;
default: default:
throw new IllegalStateException(); state = State.FAILED;
break;
} }
} }
if (sendCallback != null)
client.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
else
callback.failed(failure);
} }
@Override @Override
public void onClosed(CloseStatus closeStatus, Callback callback) public void onClosed(CloseStatus closeStatus, Callback callback)
{ {
System.err.println("[Client2Proxy] onClosed(): " + closeStatus); System.err.println("[Client2Proxy] onClosed(): " + closeStatus);
callback.succeeded(); callback.succeeded();
} }
public void send(Frame frame, Callback callback) public void send(Frame frame, Callback callback)
{ {
System.err.println("[Client2Proxy] onClosed(): " + frame); System.err.println("[Client2Proxy] onClosed(): " + frame);
Callback sendCallback = callback;
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -244,30 +256,30 @@ class WebSocketProxy
case OPEN: case OPEN:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
state = State.OCLOSED; state = State.OCLOSED;
client.sendFrame(frame, callback, false);
break; break;
case ICLOSED: case ICLOSED:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
state = State.CLOSED; state = State.CLOSED;
client.sendFrame(frame, Callback.from(callback, closeCallback), false); sendCallback = Callback.from(callback, closeCallback);
}
else
{
client.sendFrame(frame, callback, false);
} }
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break; break;
default: default:
callback.failed(new IllegalStateException()); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
client.sendFrame(frame, sendCallback, false);
} }
} }
@ -285,6 +297,7 @@ class WebSocketProxy
{ {
System.err.println("[Server2Proxy] connect()"); System.err.println("[Server2Proxy] connect()");
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -293,64 +306,87 @@ class WebSocketProxy
try try
{ {
state = State.CONNECTING; state = State.CONNECTING;
client.connect(this, serverUri).whenComplete((s,t)->{ client.connect(this, serverUri).whenComplete((s,t)->
{
if (t != null) if (t != null)
{ onConnectFailure(t, callback);
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.FAILED;
callback.failed(t);
break;
case FAILED:
callback.failed(t);
break;
default:
callback.failed(new IllegalStateException());
}
}
}
else else
{ onConnectSuccess(s, callback);
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.OPEN;
callback.succeeded();
break;
case FAILED:
s.close(CloseStatus.SHUTDOWN, error.getMessage(), Callback.from(callback, error));
break;
default:
callback.failed(new IllegalStateException());
}
}
}
}); });
} }
catch (IOException e) catch (IOException e)
{ {
state = State.FAILED; state = State.FAILED;
callback.failed(e); error = e;
failure = e;
} }
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break; break;
default: default:
throw new IllegalStateException(); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
}
private void onConnectSuccess(CoreSession s, Callback callback)
{
Callback sendCallback = null;
Throwable failure = null;
synchronized (lock)
{
switch (state)
{
case OPEN:
break;
case FAILED:
failure = error;
sendCallback = Callback.from(callback, failure);
break;
default:
failure = new IllegalStateException();
break;
}
}
if (sendCallback != null)
s.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
else if (failure != null)
callback.failed(failure);
else
callback.succeeded();
}
private void onConnectFailure(Throwable t, Callback callback)
{
Throwable failure = t;
synchronized (lock)
{
switch (state)
{
case CONNECTING:
state = State.FAILED;
break;
case FAILED:
failure = error;
break;
default:
failure = new IllegalStateException();
break;
}
}
callback.failed(failure);
} }
@Override @Override
@ -358,23 +394,30 @@ class WebSocketProxy
{ {
System.err.println("[Server2Proxy] onOpen(): " + session); System.err.println("[Server2Proxy] onOpen(): " + session);
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
{ {
case CONNECTING: case CONNECTING:
state = State.OPEN;
server = session; server = session;
callback.succeeded();
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break; break;
default: default:
callback.failed(new IllegalStateException()); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
callback.succeeded();
} }
@Override @Override
@ -383,6 +426,8 @@ class WebSocketProxy
System.err.println("[Server2Proxy] onFrame(): " + frame); System.err.println("[Server2Proxy] onFrame(): " + frame);
receivedFrames.offer(Frame.copy(frame)); receivedFrames.offer(Frame.copy(frame));
Callback sendCallback = callback;
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -392,28 +437,30 @@ class WebSocketProxy
{ {
state = State.ICLOSED; state = State.ICLOSED;
closeCallback = callback; closeCallback = callback;
client2Proxy.send(frame, Callback.from(()->{}, callback::failed)); sendCallback = Callback.from(()->{}, callback::failed);
}
else
{
client2Proxy.send(frame, callback);
} }
break; break;
case OCLOSED: case OCLOSED:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
state = State.CLOSED; state = State.CLOSED;
client2Proxy.send(frame, callback);
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break;
default: default:
callback.failed(new IllegalStateException()); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
client2Proxy.send(frame, sendCallback);
} }
@Override @Override
@ -422,29 +469,33 @@ class WebSocketProxy
System.err.println("[Server2Proxy] onError(): " + failure); System.err.println("[Server2Proxy] onError(): " + failure);
failure.printStackTrace(); failure.printStackTrace();
boolean failClient2Proxy = false;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
{ {
case FAILED: case FAILED:
case CLOSED: case CLOSED:
callback.failed(failure);
break; break;
default: default:
state = State.FAILED; state = State.FAILED;
error = failure; error = failure;
client2Proxy.fail(failure,callback); failClient2Proxy = true;
break; break;
} }
} }
if (failClient2Proxy)
client2Proxy.fail(failure,callback);
else
callback.failed(failure);
} }
@Override @Override
public void onClosed(CloseStatus closeStatus, Callback callback) public void onClosed(CloseStatus closeStatus, Callback callback)
{ {
System.err.println("[Server2Proxy] onClosed(): " + closeStatus); System.err.println("[Server2Proxy] onClosed(): " + closeStatus);
callback.succeeded(); callback.succeeded();
} }
@ -452,47 +503,39 @@ class WebSocketProxy
{ {
System.err.println("[Server2Proxy] fail(): " + failure); System.err.println("[Server2Proxy] fail(): " + failure);
Callback sendCallback = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
{ {
case NOT_OPEN:
state = State.FAILED;
callback.failed(failure);
break;
case CONNECTING:
state = State.FAILED;
callback.failed(failure);
break;
case OPEN: case OPEN:
state = State.FAILED; state = State.FAILED;
server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); sendCallback = Callback.from(callback, failure);
break; break;
case ICLOSED: case ICLOSED:
state = State.FAILED; state = State.FAILED;
Callback doubleCallback = Callback.from(callback, closeCallback); Callback doubleCallback = Callback.from(callback, closeCallback);
server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); sendCallback = Callback.from(doubleCallback, failure);
case FAILED:
case CLOSED:
case OCLOSED:
state = State.FAILED;
callback.failed(failure);
break;
default: default:
throw new IllegalStateException(); state = State.FAILED;
break;
} }
} }
if (sendCallback != null)
server.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback);
else
callback.failed(failure);
} }
public void send(Frame frame, Callback callback) public void send(Frame frame, Callback callback)
{ {
System.err.println("[Server2Proxy] send(): " + frame); System.err.println("[Server2Proxy] send(): " + frame);
Callback sendCallback = callback;
Throwable failure = null;
synchronized (lock) synchronized (lock)
{ {
switch (state) switch (state)
@ -500,30 +543,30 @@ class WebSocketProxy
case OPEN: case OPEN:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
state = State.OCLOSED; state = State.OCLOSED;
server.sendFrame(frame, callback, false);
break; break;
case ICLOSED: case ICLOSED:
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
state = State.CLOSED; state = State.CLOSED;
server.sendFrame(frame, Callback.from(callback, closeCallback), false); sendCallback = Callback.from(callback, closeCallback);
}
else
{
server.sendFrame(frame, callback, false);
} }
break; break;
case FAILED: case FAILED:
callback.failed(error); failure = error;
break; break;
default: default:
callback.failed(new IllegalStateException()); failure = new IllegalStateException();
break;
} }
} }
if (failure != null)
callback.failed(failure);
else
server.sendFrame(frame, sendCallback, false);
} }
} }
} }