From 001bc8f2964cc784da3caa49ebea1d56dddf816c Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 14 Feb 2019 15:40:25 +1100 Subject: [PATCH] Issue #3170 - WebSocketProxy separate actions from the state changes only do state changes inside synchronized blocks, remember what action to do and only do this outside of the synchronized block Signed-off-by: Lachlan Roberts --- .../websocket/core/proxy/WebSocketProxy.java | 399 ++++++++++-------- 1 file changed, 221 insertions(+), 178 deletions(-) diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java index 32190b4299b..6fe7f2846f1 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java @@ -54,6 +54,7 @@ class WebSocketProxy { System.err.println("[Client2Proxy] onOpen: " + session); + Throwable failure = null; synchronized (lock) { switch (state) @@ -61,62 +62,74 @@ class WebSocketProxy case NOT_OPEN: state = State.CONNECTING; client = session; - - Callback wrappedOnOpenCallback = new Callback() - { - @Override - public void succeeded() - { - synchronized (lock) - { - switch (state) - { - case CONNECTING: - state = State.OPEN; - callback.succeeded(); - break; - - case FAILED: - server2Proxy.fail(error, callback); - break; - - default: - callback.failed(new IllegalStateException()); - } - } - } - - @Override - public void failed(Throwable x) - { - synchronized (lock) - { - switch (state) - { - case CONNECTING: - state = State.FAILED; - error = x; - callback.failed(x); - break; - - case FAILED: - callback.failed(x); - break; - - default: - callback.failed(new IllegalStateException()); - } - } - } - }; - - server2Proxy.connect(wrappedOnOpenCallback); break; default: - throw new IllegalStateException(); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + server2Proxy.connect(Callback.from(() -> onOpenSuccess(callback), (t) -> onOpenFail(callback, t))); + } + + private void onOpenSuccess(Callback callback) + { + boolean failServer2Proxy = false; + Throwable failure = null; + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.OPEN; + break; + + case FAILED: + failure = error; + failServer2Proxy = true; + break; + + default: + failure = new IllegalStateException(); + break; + } + } + + if (failServer2Proxy) + server2Proxy.fail(failure, callback); + else if (failure != null) + callback.failed(failure); + else + callback.succeeded(); + } + + private void onOpenFail(Callback callback, Throwable t) + { + Throwable failure = t; + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.FAILED; + error = t; + break; + + case FAILED: + failure = error; + break; + + default: + failure = new IllegalStateException(); + break; + } + } + + callback.failed(failure); } @Override @@ -125,6 +138,8 @@ class WebSocketProxy System.err.println("[Client2Proxy] onFrame(): " + frame); receivedFrames.offer(Frame.copy(frame)); + Callback sendCallback = callback; + Throwable failure = null; synchronized (lock) { switch (state) @@ -134,28 +149,29 @@ class WebSocketProxy { state = State.ICLOSED; closeCallback = callback; - server2Proxy.send(frame, Callback.from(()->{}, callback::failed)); - } - else - { - server2Proxy.send(frame, callback); + sendCallback = Callback.from(()->{}, callback::failed); } break; case OCLOSED: if (frame.getOpCode() == OpCode.CLOSE) state = State.CLOSED; - - server2Proxy.send(frame, callback); break; case FAILED: - callback.failed(error); + failure = error; + break; default: - callback.failed(new IllegalStateException()); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + server2Proxy.send(frame, sendCallback); } @Override @@ -164,79 +180,75 @@ class WebSocketProxy System.err.println("[Client2Proxy] onError(): " + failure); failure.printStackTrace(); + boolean failServer2Proxy; synchronized (lock) { switch (state) { case FAILED: case CLOSED: - callback.failed(failure); + failServer2Proxy = false; break; default: state = State.FAILED; error = failure; - server2Proxy.fail(failure,callback); + failServer2Proxy = true; break; } } + if (failServer2Proxy) + server2Proxy.fail(failure,callback); + else + callback.failed(failure); } public void fail(Throwable failure, Callback callback) { System.err.println("[Client2Proxy] fail(): " + failure); + Callback sendCallback = null; synchronized (lock) { switch (state) { - case NOT_OPEN: - state = State.FAILED; - callback.failed(failure); - break; - - case CONNECTING: - state = State.FAILED; - callback.failed(failure); - break; - case OPEN: state = State.FAILED; - client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); + sendCallback = Callback.from(callback, failure); break; case ICLOSED: state = State.FAILED; Callback doubleCallback = Callback.from(callback, closeCallback); - client.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); - - case FAILED: - case CLOSED: - case OCLOSED: - state = State.FAILED; - callback.failed(failure); + sendCallback = Callback.from(doubleCallback, failure); break; default: - throw new IllegalStateException(); + state = State.FAILED; + break; } } + + if (sendCallback != null) + client.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback); + else + callback.failed(failure); } @Override public void onClosed(CloseStatus closeStatus, Callback callback) { System.err.println("[Client2Proxy] onClosed(): " + closeStatus); - callback.succeeded(); } - public void send(Frame frame, Callback callback) { System.err.println("[Client2Proxy] onClosed(): " + frame); + Callback sendCallback = callback; + Throwable failure = null; synchronized (lock) { switch (state) @@ -244,30 +256,30 @@ class WebSocketProxy case OPEN: if (frame.getOpCode() == OpCode.CLOSE) state = State.OCLOSED; - - client.sendFrame(frame, callback, false); break; case ICLOSED: if (frame.getOpCode() == OpCode.CLOSE) { state = State.CLOSED; - client.sendFrame(frame, Callback.from(callback, closeCallback), false); - } - else - { - client.sendFrame(frame, callback, false); + sendCallback = Callback.from(callback, closeCallback); } break; case FAILED: - callback.failed(error); + failure = error; break; default: - callback.failed(new IllegalStateException()); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + client.sendFrame(frame, sendCallback, false); } } @@ -285,6 +297,7 @@ class WebSocketProxy { System.err.println("[Server2Proxy] connect()"); + Throwable failure = null; synchronized (lock) { switch (state) @@ -293,64 +306,87 @@ class WebSocketProxy try { state = State.CONNECTING; - client.connect(this, serverUri).whenComplete((s,t)->{ + client.connect(this, serverUri).whenComplete((s,t)-> + { if (t != null) - { - synchronized (lock) - { - switch (state) - { - case CONNECTING: - state = State.FAILED; - callback.failed(t); - break; - - case FAILED: - callback.failed(t); - break; - - default: - callback.failed(new IllegalStateException()); - } - } - } + onConnectFailure(t, callback); else - { - synchronized (lock) - { - switch (state) - { - case CONNECTING: - state = State.OPEN; - callback.succeeded(); - break; - - case FAILED: - s.close(CloseStatus.SHUTDOWN, error.getMessage(), Callback.from(callback, error)); - break; - - default: - callback.failed(new IllegalStateException()); - } - } - } + onConnectSuccess(s, callback); }); } catch (IOException e) { state = State.FAILED; - callback.failed(e); + error = e; + failure = e; } break; case FAILED: - callback.failed(error); + failure = error; break; default: - throw new IllegalStateException(); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + } + + private void onConnectSuccess(CoreSession s, Callback callback) + { + Callback sendCallback = null; + Throwable failure = null; + synchronized (lock) + { + switch (state) + { + case OPEN: + break; + + case FAILED: + failure = error; + sendCallback = Callback.from(callback, failure); + break; + + default: + failure = new IllegalStateException(); + break; + } + } + + if (sendCallback != null) + s.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback); + else if (failure != null) + callback.failed(failure); + else + callback.succeeded(); + } + + private void onConnectFailure(Throwable t, Callback callback) + { + Throwable failure = t; + synchronized (lock) + { + switch (state) + { + case CONNECTING: + state = State.FAILED; + break; + + case FAILED: + failure = error; + break; + + default: + failure = new IllegalStateException(); + break; + } + } + callback.failed(failure); } @Override @@ -358,23 +394,30 @@ class WebSocketProxy { System.err.println("[Server2Proxy] onOpen(): " + session); + Throwable failure = null; synchronized (lock) { switch (state) { case CONNECTING: + state = State.OPEN; server = session; - callback.succeeded(); break; case FAILED: - callback.failed(error); + failure = error; break; default: - callback.failed(new IllegalStateException()); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + callback.succeeded(); } @Override @@ -383,6 +426,8 @@ class WebSocketProxy System.err.println("[Server2Proxy] onFrame(): " + frame); receivedFrames.offer(Frame.copy(frame)); + Callback sendCallback = callback; + Throwable failure = null; synchronized (lock) { switch (state) @@ -392,28 +437,30 @@ class WebSocketProxy { state = State.ICLOSED; closeCallback = callback; - client2Proxy.send(frame, Callback.from(()->{}, callback::failed)); - } - else - { - client2Proxy.send(frame, callback); + sendCallback = Callback.from(()->{}, callback::failed); } break; case OCLOSED: if (frame.getOpCode() == OpCode.CLOSE) state = State.CLOSED; - - client2Proxy.send(frame, callback); break; case FAILED: - callback.failed(error); + failure = error; + break; default: - callback.failed(new IllegalStateException()); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + client2Proxy.send(frame, sendCallback); + } @Override @@ -422,29 +469,33 @@ class WebSocketProxy System.err.println("[Server2Proxy] onError(): " + failure); failure.printStackTrace(); + boolean failClient2Proxy = false; synchronized (lock) { switch (state) { case FAILED: case CLOSED: - callback.failed(failure); break; default: state = State.FAILED; error = failure; - client2Proxy.fail(failure,callback); + failClient2Proxy = true; break; } } + + if (failClient2Proxy) + client2Proxy.fail(failure,callback); + else + callback.failed(failure); } @Override public void onClosed(CloseStatus closeStatus, Callback callback) { System.err.println("[Server2Proxy] onClosed(): " + closeStatus); - callback.succeeded(); } @@ -452,47 +503,39 @@ class WebSocketProxy { System.err.println("[Server2Proxy] fail(): " + failure); + Callback sendCallback = null; synchronized (lock) { switch (state) { - case NOT_OPEN: - state = State.FAILED; - callback.failed(failure); - break; - - case CONNECTING: - state = State.FAILED; - callback.failed(failure); - break; - case OPEN: state = State.FAILED; - server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(callback, failure)); + sendCallback = Callback.from(callback, failure); break; case ICLOSED: state = State.FAILED; Callback doubleCallback = Callback.from(callback, closeCallback); - server.close(CloseStatus.SHUTDOWN, failure.getMessage(), Callback.from(doubleCallback, failure)); - - case FAILED: - case CLOSED: - case OCLOSED: - state = State.FAILED; - callback.failed(failure); - break; + sendCallback = Callback.from(doubleCallback, failure); default: - throw new IllegalStateException(); + state = State.FAILED; + break; } } + + if (sendCallback != null) + server.close(CloseStatus.SHUTDOWN, failure.getMessage(), sendCallback); + else + callback.failed(failure); } public void send(Frame frame, Callback callback) { System.err.println("[Server2Proxy] send(): " + frame); + Callback sendCallback = callback; + Throwable failure = null; synchronized (lock) { switch (state) @@ -500,30 +543,30 @@ class WebSocketProxy case OPEN: if (frame.getOpCode() == OpCode.CLOSE) state = State.OCLOSED; - - server.sendFrame(frame, callback, false); break; case ICLOSED: if (frame.getOpCode() == OpCode.CLOSE) { state = State.CLOSED; - server.sendFrame(frame, Callback.from(callback, closeCallback), false); - } - else - { - server.sendFrame(frame, callback, false); + sendCallback = Callback.from(callback, closeCallback); } break; case FAILED: - callback.failed(error); + failure = error; break; default: - callback.failed(new IllegalStateException()); + failure = new IllegalStateException(); + break; } } + + if (failure != null) + callback.failed(failure); + else + server.sendFrame(frame, sendCallback, false); } } } \ No newline at end of file