Refactor websocket close for #2175

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-10 14:50:53 +11:00
parent 6ec615dc32
commit 4165c4507b
3 changed files with 82 additions and 53 deletions

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.function.Supplier;
/**
* Representation of a WebSocket Close (status code &amp; reason)
@ -162,6 +163,15 @@ public class CloseStatus
return;
}
public static CloseStatus getCloseStatus(Frame frame)
{
if (frame instanceof CloseStatus.Supplier)
return ((CloseStatus.Supplier)frame).getCloseStatus();
if (frame.getOpCode()==OpCode.CLOSE)
return new CloseStatus(frame);
return null;
}
public int getCode()
{
return code;
@ -265,19 +275,19 @@ public class CloseStatus
public Frame toFrame()
{
return toFrame(code, reason);
if (isTransmittableStatusCode(code))
return new CloseFrame(this, OpCode.CLOSE, true, asPayloadBuffer(code, reason));
return new CloseFrame(this, OpCode.CLOSE);
}
public static Frame toFrame(int closeStatus)
{
return toFrame(closeStatus, null);
return new CloseStatus(closeStatus).toFrame();
}
public static Frame toFrame(int closeStatus, String reason)
{
if (isTransmittableStatusCode(closeStatus))
return new Frame(OpCode.CLOSE, true, asPayloadBuffer(closeStatus, reason));
return new Frame(OpCode.CLOSE);
return new CloseStatus(closeStatus, reason).toFrame();
}
public static String codeString(int closeStatus)
@ -324,4 +334,27 @@ public class CloseStatus
return String.format("{%04d=%s,%s}", code, codeString(code), reason);
}
public interface Supplier
{
CloseStatus getCloseStatus();
}
class CloseFrame extends Frame implements CloseStatus.Supplier
{
public CloseFrame(CloseStatus closeStatus, byte opcode)
{
super(opcode);
}
public CloseFrame(CloseStatus closeStatus, byte opCode, boolean fin, ByteBuffer payload)
{
super(opCode, fin, payload);
}
@Override
public CloseStatus getCloseStatus()
{
return CloseStatus.this;
}
}
}

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.core.WebSocketException;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
/**
* Parsing of a frames in WebSocket land.
@ -374,7 +375,7 @@ public class Parser
.format("Parser@%x[s=%s,c=%d,o=0x%x,m=%s,l=%d]", hashCode(), state, cursor, firstByte, mask == null?"-":TypeUtil.toHexString(mask), payloadLength);
}
public class ParsedFrame extends Frame implements Closeable
public class ParsedFrame extends Frame implements Closeable, CloseStatus.Supplier
{
final CloseStatus closeStatus;
final boolean releaseable;
@ -404,6 +405,7 @@ public class Parser
bufferPool.release(getPayload());
}
@Override
public CloseStatus getCloseStatus()
{
return closeStatus;

View File

@ -81,7 +81,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
this.behavior = behavior;
this.negotiated = negotiated;
this.demanding = handler.isDemanding();
negotiated.getExtensions().connect(new IncomingState(), new OutgoingState(), this);
negotiated.getExtensions().connect(new ExtendedIncoming(), new ExtendedOutgoing(), this);
}
/**
@ -273,44 +273,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private void close(CloseStatus closeStatus, Callback callback, boolean batch)
{
if (state.onCloseOut(closeStatus))
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
try
{
handler.onClosed(state.getCloseStatus());
}
catch (Throwable e)
{
try
{
handler.onError(e);
}
catch (Throwable e2)
{
e.addSuppressed(e2);
LOG.warn(e);
}
}
finally
{
connection.close();
}
}
};
}
if (LOG.isDebugEnabled())
{
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
}
Frame frame = closeStatus.toFrame();
negotiated.getExtensions().sendFrame(frame, callback, batch);
sendFrame(closeStatus.toFrame(), callback, batch);
}
@Override
@ -509,12 +472,43 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
close(new CloseStatus(frame.getPayload()), callback, batch);
}
else
{
negotiated.getExtensions().sendFrame(frame, callback, batch);
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
if (state.onCloseOut(closeStatus))
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
try
{
handler.onClosed(state.getCloseStatus());
}
catch (Throwable e)
{
try
{
handler.onError(e);
}
catch (Throwable e2)
{
e.addSuppressed(e2);
LOG.warn(e);
}
}
finally
{
connection.close();
}
}
};
}
}
negotiated.getExtensions().sendFrame(frame, callback, batch);
}
@Override
@ -602,7 +596,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxTextMessageSize = maxSize;
}
private class IncomingState extends FrameSequence implements IncomingFrames
private class ExtendedIncoming extends FrameSequence implements IncomingFrames
{
@Override
public void onFrame(Frame frame, Callback callback)
@ -620,7 +614,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
connection.cancelDemand();
CloseStatus closeStatus = ((ParsedFrame)frame).getCloseStatus();
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (state.onCloseIn(closeStatus))
{
callback = new Callback.Nested(callback)
@ -672,7 +666,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
}
private class OutgoingState implements OutgoingFrames
private class ExtendedOutgoing implements OutgoingFrames
{
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)