423048 - Receiving a PING while sending a message kills the connection

Optimised to remove msgLock and just have a concurrent state machine on msgType
This commit is contained in:
Greg Wilkins 2013-12-13 11:50:22 +11:00
parent 0a30b4934b
commit 0104374adb
1 changed files with 187 additions and 133 deletions

View File

@ -21,9 +21,9 @@ package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -47,9 +47,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
/** JSR-356 blocking send behavior message */
private static final String PRIORMSG_ERROR = "Prior message pending, cannot start new message yet.";
/** Type of Message */
private static final int NONE = 0;
private static final int TEXT = 1;
private static final int BINARY = 2;
private enum MsgType {NONE,TEXT,BINARY,PARTIAL_TEXT,PARTIAL_BINARY};
private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
{
@ -67,11 +65,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
public final LogicalConnection connection;
public final OutgoingFrames outgoing;
/** JSR-356 blocking send behavior message */
private final ReentrantLock msgLock = new ReentrantLock();
/** Type sanity to support partial send properly */
private final AtomicInteger msgType = new AtomicInteger(NONE);
private boolean partialStarted = false;
/** JSR-356 blocking send behaviour message and Type sanity to support partial send properly */
private final AtomicReference<MsgType> msgType = new AtomicReference<>(MsgType.NONE);
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
{
@ -85,13 +81,101 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException
{
// TODO Blocking callbacks can be recycled, but they do not handle concurrent calls,
// so if some mutual exclusion can be applied, then this callback can be reused.
BlockingWriteCallback callback = new BlockingWriteCallback();
sendFrame(frame,callback);
callback.block();
sendFrame(frame,blocker);
blocker.block();
}
private void lockMsg(MsgType type)
{
while(true)
{
MsgType was=msgType.get();
switch (was)
{
case NONE:
if (msgType.compareAndSet(MsgType.NONE,type))
return;
break;
case BINARY:
case TEXT:
case PARTIAL_BINARY:
case PARTIAL_TEXT:
throw new IllegalStateException(PRIORMSG_ERROR);
}
}
}
private boolean lockPartialMsg(MsgType type)
{
while(true)
{
MsgType was=msgType.get();
switch (was)
{
case NONE:
if (msgType.compareAndSet(MsgType.NONE,type))
return true;
break;
case BINARY:
case TEXT:
throw new IllegalStateException(PRIORMSG_ERROR);
case PARTIAL_BINARY:
if (type==MsgType.BINARY && msgType.compareAndSet(MsgType.PARTIAL_BINARY,MsgType.BINARY))
return false;
throw new IllegalStateException("Prior BINARY message pending, cannot start new "+type+" message yet.");
case PARTIAL_TEXT:
if (type==MsgType.TEXT && msgType.compareAndSet(MsgType.PARTIAL_TEXT,MsgType.TEXT))
return false;
throw new IllegalStateException("Prior TEXT message pending, cannot start new "+type+" message yet.");
}
}
}
private void unlockMsg()
{
MsgType was=msgType.get();
switch (was)
{
case NONE:
throw new IllegalStateException("not locked");
case PARTIAL_BINARY:
case PARTIAL_TEXT:
throw new IllegalStateException("in partial");
default:
if (!msgType.compareAndSet(was,MsgType.NONE))
throw new IllegalStateException("concurrent unlock");
}
}
private void unlockPartialMsg()
{
MsgType was=msgType.get();
switch (was)
{
case NONE:
throw new IllegalStateException("not locked");
case PARTIAL_BINARY:
case PARTIAL_TEXT:
throw new IllegalStateException("in partial");
case BINARY:
if (!msgType.compareAndSet(was,MsgType.PARTIAL_BINARY))
throw new IllegalStateException("concurrent unlock");
return;
case TEXT:
if (!msgType.compareAndSet(was,MsgType.PARTIAL_TEXT))
throw new IllegalStateException("concurrent unlock");
return;
}
}
public InetSocketAddress getInetSocketAddress()
{
return connection.getRemoteAddress();
@ -117,50 +201,56 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendBytes(ByteBuffer data) throws IOException
{
if (msgLock.tryLock())
lockMsg(MsgType.BINARY);
try
{
try
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
{
msgType.set(BINARY);
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
}
blockingWrite(new BinaryFrame().setPayload(data));
}
finally
{
msgType.set(NONE);
msgLock.unlock();
LOG.debug("sendBytes with {}",BufferUtil.toDetailString(data));
}
blockingWrite(new BinaryFrame().setPayload(data));
}
else
finally
{
throw new IllegalStateException(PRIORMSG_ERROR);
unlockMsg();
}
}
@Override
public Future<Void> sendBytesByFuture(ByteBuffer data)
{
msgType.set(BINARY);
if (LOG.isDebugEnabled())
lockMsg(MsgType.BINARY);
try
{
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytesByFuture with {}",BufferUtil.toDetailString(data));
}
return sendAsyncFrame(new BinaryFrame().setPayload(data));
}
finally
{
unlockMsg();
}
return sendAsyncFrame(new BinaryFrame().setPayload(data));
}
@Override
public void sendBytes(ByteBuffer data, WriteCallback callback)
{
msgType.set(BINARY);
if (LOG.isDebugEnabled())
lockMsg(MsgType.BINARY);
try
{
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
}
sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
}
finally
{
unlockMsg();
}
sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
}
public void sendFrame(WebSocketFrame frame, WriteCallback callback)
@ -179,90 +269,48 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
{
if (msgLock.tryLock())
boolean first=lockPartialMsg(MsgType.TEXT);
try
{
try
if (LOG.isDebugEnabled())
{
if (msgType.get() == TEXT)
{
throw new IllegalStateException("Prior TEXT message pending, cannot start new BINARY message yet.");
}
msgType.set(BINARY);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
}
DataFrame frame = null;
if (partialStarted)
{
frame = new ContinuationFrame().setPayload(fragment);
}
else
{
frame = new BinaryFrame().setPayload(fragment);
}
frame.setFin(isLast);
blockingWrite(frame);
partialStarted = !isLast;
}
finally
{
if (isLast)
{
msgType.set(NONE);
}
msgLock.unlock();
LOG.debug("sendPartialBytes({}, {})",BufferUtil.toDetailString(fragment),isLast);
}
DataFrame frame = first?new BinaryFrame():new ContinuationFrame();
frame.setPayload(fragment);
frame.setFin(isLast);
blockingWrite(frame);
}
else
finally
{
throw new IllegalStateException(PRIORMSG_ERROR);
if (isLast)
unlockMsg();
else
unlockPartialMsg();
}
}
@Override
public void sendPartialString(String fragment, boolean isLast) throws IOException
{
if (msgLock.tryLock())
boolean first=lockPartialMsg(MsgType.BINARY);
try
{
try
if (LOG.isDebugEnabled())
{
if (msgType.get() == BINARY)
{
throw new IllegalStateException("Prior BINARY message pending, cannot start new TEXT message yet.");
}
msgType.set(TEXT);
if (LOG.isDebugEnabled())
{
LOG.debug("sendPartialString({}, {})",fragment,isLast);
}
DataFrame frame = null;
if (partialStarted)
{
frame = new ContinuationFrame().setPayload(fragment);
}
else
{
frame = new TextFrame().setPayload(fragment);
}
frame.setFin(isLast);
blockingWrite(frame);
partialStarted = !isLast;
}
finally
{
if (isLast)
{
msgType.set(NONE);
}
msgLock.unlock();
LOG.debug("sendPartialString({}, {})",fragment,isLast);
}
DataFrame frame = first?new TextFrame():new ContinuationFrame();
frame.setPayload(BufferUtil.toBuffer(fragment,StandardCharsets.UTF_8));
frame.setFin(isLast);
blockingWrite(frame);
}
else
finally
{
throw new IllegalStateException(PRIORMSG_ERROR);
if (isLast)
unlockMsg();
else
unlockPartialMsg();
}
}
@ -289,51 +337,57 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendString(String text) throws IOException
{
if (msgLock.tryLock())
lockMsg(MsgType.TEXT);
try
{
try
WebSocketFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
msgType.set(TEXT);
WebSocketFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
}
blockingWrite(frame);
}
finally
{
msgType.set(NONE);
msgLock.unlock();
LOG.debug("sendString with {}",BufferUtil.toDetailString(frame.getPayload()));
}
blockingWrite(frame);
}
else
finally
{
throw new IllegalStateException(PRIORMSG_ERROR);
unlockMsg();
}
}
@Override
public Future<Void> sendStringByFuture(String text)
{
msgType.set(TEXT);
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
lockMsg(MsgType.TEXT);
try
{
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendStringByFuture with {}",BufferUtil.toDetailString(frame.getPayload()));
}
return sendAsyncFrame(frame);
}
finally
{
unlockMsg();
}
return sendAsyncFrame(frame);
}
@Override
public void sendString(String text, WriteCallback callback)
{
msgType.set(TEXT);
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
lockMsg(MsgType.TEXT);
try
{
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
}
sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
}
finally
{
unlockMsg();
}
sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
}
}