Issue #2175 cleanups after review

Improve ws error handling by splitting processError into handling for
errors from the network and errors from the application.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-22 15:41:46 +11:00
parent 1189ceed4c
commit 7fec51ad40
3 changed files with 93 additions and 26 deletions

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.javax.tests.client.misbehaving; package org.eclipse.jetty.websocket.javax.tests.client.misbehaving;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.javax.tests.CoreServer; import org.eclipse.jetty.websocket.javax.tests.CoreServer;
@ -25,12 +32,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -45,6 +46,7 @@ public class MisbehavingClassTest
@BeforeAll @BeforeAll
public static void startServer() throws Exception public static void startServer() throws Exception
{ {
System.err.println("START");
server = new CoreServer(new CoreServer.EchoNegotiator()); server = new CoreServer(new CoreServer.EchoNegotiator());
// Start Server // Start Server
server.start(); server.start();
@ -53,6 +55,7 @@ public class MisbehavingClassTest
@AfterAll @AfterAll
public static void stopServer() public static void stopServer()
{ {
System.err.println("STOP");
try try
{ {
server.stop(); server.stop();

View File

@ -318,11 +318,13 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
} }
} }
private CloseStatus closeStatusFor(Throwable cause) AbnormalCloseStatus closeStatusFor(Throwable cause)
{ {
int code; int code;
if (cause instanceof ProtocolException) if (cause instanceof ProtocolException)
code = CloseStatus.PROTOCOL; code = CloseStatus.PROTOCOL;
else if (cause instanceof CloseException)
code = ((CloseException)cause).getStatusCode();
else if (cause instanceof Utf8Appendable.NotUtf8Exception) else if (cause instanceof Utf8Appendable.NotUtf8Exception)
code = CloseStatus.BAD_PAYLOAD; code = CloseStatus.BAD_PAYLOAD;
else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException) else if (cause instanceof WebSocketTimeoutException || cause instanceof TimeoutException || cause instanceof SocketTimeoutException)
@ -332,7 +334,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
else else
code = CloseStatus.SERVER_ERROR; code = CloseStatus.SERVER_ERROR;
return new CloseStatus(code, cause.getMessage()); return new AbnormalCloseStatus(code, cause.getMessage());
} }
/** /**
@ -344,12 +346,19 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
CloseStatus closeStatus = closeStatusFor(cause); CloseStatus closeStatus = closeStatusFor(cause);
Callback callback = Callback.from(()->{onClosed(cause, closeStatus);connection.close();});
if (closeStatus.getCode() == CloseStatus.PROTOCOL) if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, callback, false); close(closeStatus, Callback.NOOP, false);
else else
callback.succeeded(); {
try
{
onClosed(cause, closeStatus);
}
finally
{
connection.close();
}
}
} }
/** /**
@ -360,7 +369,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void processHandlerError(Throwable cause) public void processHandlerError(Throwable cause)
{ {
CloseStatus closeStatus = closeStatusFor(cause); CloseStatus closeStatus = closeStatusFor(cause);
close(closeStatus, Callback.from(()->onClosed(cause, closeStatus)), false); close(closeStatus, Callback.NOOP, false);
} }
/** /**
@ -391,6 +400,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
catch (Throwable t) catch (Throwable t)
{ {
LOG.warn("Error during OPEN", t); LOG.warn("Error during OPEN", t);
try
{
handler.onError(t);
}
catch (Exception e)
{
t.addSuppressed(e);
}
processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t)); processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, t));
} }
} }
@ -437,11 +454,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closeConnection;
try try
{ {
assertValidOutgoing(frame); assertValidOutgoing(frame);
closeConnection = channelState.onOutgoingFrame(frame);
} }
catch (Throwable ex) catch (Throwable ex)
{ {
@ -449,11 +464,42 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
return; return;
} }
boolean closeConnection;
try
{
closeConnection = channelState.onOutgoingFrame(frame);
}
catch (Throwable ex)
{
try
{
callback.failed(ex);
}
finally
{
if (frame.getOpCode() == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof AbnormalCloseStatus)
{
try
{
handler.onClosed(CloseStatus.getCloseStatus(frame));
}
finally
{
connection.close();
}
}
}
return;
}
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch); LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
System.err.println(behavior + " Closing " + closeConnection);
if (closeConnection) if (closeConnection)
{ {
callback = new Callback.Nested(callback) callback = new Callback.Nested(callback)
@ -461,24 +507,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public void completed() public void completed()
{ {
System.err.println(behavior + " completed " + closeConnection);
try try
{ {
handler.onClosed(channelState.getCloseStatus()); handler.onClosed(channelState.getCloseStatus());
} }
catch (Throwable e) catch (Throwable e)
{ {
try
{
handler.onError(e);
}
catch (Throwable e2)
{
e.addSuppressed(e2);
LOG.warn(e); LOG.warn(e);
} }
}
finally finally
{ {
System.err.println(behavior + " connection.close ");
connection.close(); connection.close();
} }
} }
@ -713,8 +753,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override @Override
public String toString() public String toString()
{ {
return String.format("WSChannel@%x{%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", return String.format("WSChannel@%x{%s,%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s",
hashCode(), hashCode(),
behavior,
channelState, channelState,
negotiated, negotiated,
autoFragment, autoFragment,
@ -723,4 +764,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxFrameSize, maxFrameSize,
handler); handler);
} }
static class AbnormalCloseStatus extends CloseStatus
{
public AbnormalCloseStatus(int statusCode, String reasonPhrase)
{
super(statusCode, reasonPhrase);
}
}
} }

View File

@ -68,7 +68,11 @@ public class WebSocketChannelState
@Override @Override
public String toString() public String toString()
{ {
return _channelState.toString(); return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
_channelState,
OpCode.name(_incomingContinuation),
OpCode.name(_outgoingContinuation),
_closeStatus);
} }
@ -126,20 +130,31 @@ public class WebSocketChannelState
synchronized (this) synchronized (this)
{ {
if (!isOutputOpen()) if (!isOutputOpen())
{
if (opcode == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof WebSocketChannel.AbnormalCloseStatus)
_channelState = State.CLOSED;
throw new IllegalStateException(_channelState.toString()); throw new IllegalStateException(_channelState.toString());
}
if (opcode == OpCode.CLOSE) if (opcode == OpCode.CLOSE)
{ {
_closeStatus = CloseStatus.getCloseStatus(frame); _closeStatus = CloseStatus.getCloseStatus(frame);
if (_closeStatus instanceof WebSocketChannel.AbnormalCloseStatus)
{
_channelState = State.CLOSED;
return true;
}
switch (_channelState) switch (_channelState)
{ {
case OPEN: case OPEN:
_channelState = State.OSHUT; _channelState = State.OSHUT;
return false; return false;
case ISHUT: case ISHUT:
_channelState = State.CLOSED; _channelState = State.CLOSED;
return true; return true;
default: default:
throw new IllegalStateException(_channelState.toString()); throw new IllegalStateException(_channelState.toString());
} }