414652 - WebSocket's sendMessage() may hang on congested connections.

Fixed by using Locks instead of the synchronized keyword, and by using tryLock()
in flushBuffer() and flush(), that are called by WebSocketConnection.handle().

It is quite a hack, but the real fix is in Jetty 9 where the architecture is different
and there are two different paths for reading and writing, so that this problem
could not even happen.
This commit is contained in:
Simone Bordet 2013-08-08 12:08:41 +02:00
parent f983629434
commit 4ce4615597
1 changed files with 224 additions and 173 deletions

View File

@ -19,14 +19,16 @@
package org.eclipse.jetty.websocket; package org.eclipse.jetty.websocket;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
/* ------------------------------------------------------------ */ /**
/** WebSocketGenerator. * WebSocketGenerator.
* This class generates websocket packets. * This class generates websocket packets.
* It is fully synchronized because it is likely that async * It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other * threads will call the addMessage methods while other
@ -34,214 +36,263 @@ import org.eclipse.jetty.io.EofException;
*/ */
public class WebSocketGeneratorRFC6455 implements WebSocketGenerator public class WebSocketGeneratorRFC6455 implements WebSocketGenerator
{ {
final private WebSocketBuffers _buffers; private final Lock _lock = new ReentrantLock();
final private EndPoint _endp; private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final byte[] _mask = new byte[4];
private final MaskGen _maskGen;
private Buffer _buffer; private Buffer _buffer;
private final byte[] _mask=new byte[4];
private int _m; private int _m;
private boolean _opsent; private boolean _opsent;
private final MaskGen _maskGen;
private boolean _closed; private boolean _closed;
public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp) public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp)
{ {
_buffers=buffers; this(buffers, endp, null);
_endp=endp;
_maskGen=null;
} }
public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen) public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
{ {
_buffers=buffers; _buffers = buffers;
_endp=endp; _endp = endp;
_maskGen=maskGen; _maskGen = maskGen;
} }
public synchronized Buffer getBuffer() public Buffer getBuffer()
{ {
return _buffer; _lock.lock();
} try
public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
{
// System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
if (_closed)
throw new EofException("Closed");
if (opcode==WebSocketConnectionRFC6455.OP_CLOSE)
_closed=true;
boolean mask=_maskGen!=null;
if (_buffer==null)
_buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
boolean last=WebSocketConnectionRFC6455.isLastFrame(flags);
int space=mask?14:10;
do
{ {
opcode = _opsent?WebSocketConnectionRFC6455.OP_CONTINUATION:opcode; return _buffer;
opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode)); }
_opsent=true; finally
{
_lock.unlock();
}
}
int payload=length; public void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException
if (payload+space>_buffer.capacity()) {
{ _lock.lock();
// We must fragement, so clear FIN bit try
opcode=(byte)(opcode&0x7F); // Clear the FIN bit {
payload=_buffer.capacity()-space; if (_closed)
} throw new EofException("Closed");
else if (last) if (opcode == WebSocketConnectionRFC6455.OP_CLOSE)
opcode= (byte)(opcode|0x80); // Set the FIN bit _closed = true;
// ensure there is space for header boolean mask = _maskGen != null;
if (_buffer.space() <= space)
if (_buffer == null)
_buffer = mask ? _buffers.getBuffer() : _buffers.getDirectBuffer();
boolean last = WebSocketConnectionRFC6455.isLastFrame(flags);
int space = mask ? 14 : 10;
do
{ {
flushBuffer(); opcode = _opsent ? WebSocketConnectionRFC6455.OP_CONTINUATION : opcode;
opcode = (byte)(((0xf & flags) << 4) + (0xf & opcode));
_opsent = true;
int payload = length;
if (payload + space > _buffer.capacity())
{
// We must fragement, so clear FIN bit
opcode = (byte)(opcode & 0x7F); // Clear the FIN bit
payload = _buffer.capacity() - space;
}
else if (last)
opcode = (byte)(opcode | 0x80); // Set the FIN bit
// ensure there is space for header
if (_buffer.space() <= space) if (_buffer.space() <= space)
flush(); {
} flushBuffer();
if (_buffer.space() <= space)
flush();
}
// write the opcode and length // write the opcode and length
if (payload>0xffff) if (payload > 0xffff)
{ {
_buffer.put(new byte[]{ _buffer.put(new byte[]{
opcode, opcode,
mask?(byte)0xff:(byte)0x7f, mask ? (byte)0xff : (byte)0x7f,
(byte)0, (byte)0,
(byte)0, (byte)0,
(byte)0, (byte)0,
(byte)0, (byte)0,
(byte)((payload>>24)&0xff), (byte)((payload >> 24) & 0xff),
(byte)((payload>>16)&0xff), (byte)((payload >> 16) & 0xff),
(byte)((payload>>8)&0xff), (byte)((payload >> 8) & 0xff),
(byte)(payload&0xff)}); (byte)(payload & 0xff)});
} }
else if (payload >=0x7e) else if (payload >= 0x7e)
{ {
_buffer.put(new byte[]{ _buffer.put(new byte[]{
opcode, opcode,
mask?(byte)0xfe:(byte)0x7e, mask ? (byte)0xfe : (byte)0x7e,
(byte)(payload>>8), (byte)(payload >> 8),
(byte)(payload&0xff)}); (byte)(payload & 0xff)});
} }
else else
{ {
_buffer.put(new byte[]{ _buffer.put(new byte[]{
opcode, opcode,
(byte)(mask?(0x80|payload):payload)}); (byte)(mask ? (0x80 | payload) : payload)});
} }
// write mask
if (mask)
{
_maskGen.genMask(_mask);
_m=0;
_buffer.put(_mask);
}
// write payload
int remaining = payload;
while (remaining > 0)
{
_buffer.compact();
int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
// write mask
if (mask) if (mask)
{ {
for (int i=0;i<chunk;i++) _maskGen.genMask(_mask);
_buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4])); _m = 0;
_buffer.put(_mask);
} }
else
_buffer.put(content, offset + (payload - remaining), chunk);
remaining -= chunk; // write payload
if (_buffer.space() > 0) int remaining = payload;
while (remaining > 0)
{ {
// Gently flush the data, issuing a non-blocking write _buffer.compact();
flushBuffer(); int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
}
else if (mask)
{ {
// Forcibly flush the data, issuing a blocking write for (int i = 0; i < chunk; i++)
flush(); _buffer.put((byte)(content[offset + (payload - remaining) + i] ^ _mask[+_m++ % 4]));
if (remaining == 0) }
else
_buffer.put(content, offset + (payload - remaining), chunk);
remaining -= chunk;
if (_buffer.space() > 0)
{ {
// Gently flush the data, issuing a non-blocking write // Gently flush the data, issuing a non-blocking write
flushBuffer(); flushBuffer();
} }
else
{
// Forcibly flush the data, issuing a blocking write
flush();
if (remaining == 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
}
} }
offset += payload;
length -= payload;
} }
offset+=payload; while (length > 0);
length-=payload; _opsent = !last;
}
while (length>0);
_opsent=!last;
if (_buffer!=null && _buffer.length()==0) if (_buffer != null && _buffer.length() == 0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
}
public synchronized int flushBuffer() throws IOException
{
if (!_endp.isOpen())
throw new EofException();
if (_buffer!=null)
{
int flushed=_buffer.hasContent()?_endp.flush(_buffer):0;
if (_closed&&_buffer.length()==0)
_endp.shutdownOutput();
return flushed;
}
return 0;
}
public synchronized int flush() throws IOException
{
if (_buffer==null)
return 0;
int result = flushBuffer();
if (!_endp.isBlocking())
{
long now = System.currentTimeMillis();
long end=now+_endp.getMaxIdleTime();
while (_buffer.length()>0)
{ {
boolean ready = _endp.blockWritable(end-now); _buffers.returnBuffer(_buffer);
if (!ready) _buffer = null;
{
now = System.currentTimeMillis();
if (now<end)
continue;
throw new IOException("Write timeout");
}
result += flushBuffer();
} }
} }
_buffer.compact(); finally
return result;
}
public synchronized boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
public synchronized void returnBuffer()
{
if (_buffer!=null && _buffer.length()==0)
{ {
_buffers.returnBuffer(_buffer); _lock.unlock();
_buffer=null; }
}
public int flushBuffer() throws IOException
{
if (!_lock.tryLock())
return 0;
try
{
if (!_endp.isOpen())
throw new EofException();
if (_buffer != null)
{
int flushed = _buffer.hasContent() ? _endp.flush(_buffer) : 0;
if (_closed && _buffer.length() == 0)
_endp.shutdownOutput();
return flushed;
}
return 0;
}
finally
{
_lock.unlock();
}
}
public int flush() throws IOException
{
if (!_lock.tryLock())
return 0;
try
{
if (_buffer == null)
return 0;
int result = flushBuffer();
if (!_endp.isBlocking())
{
long now = System.currentTimeMillis();
long end = now + _endp.getMaxIdleTime();
while (_buffer.length() > 0)
{
boolean ready = _endp.blockWritable(end - now);
if (!ready)
{
now = System.currentTimeMillis();
if (now < end)
continue;
throw new IOException("Write timeout");
}
result += flushBuffer();
}
}
_buffer.compact();
return result;
}
finally
{
_lock.unlock();
}
}
public boolean isBufferEmpty()
{
_lock.lock();
try
{
return _buffer == null || _buffer.length() == 0;
}
finally
{
_lock.unlock();
}
}
public void returnBuffer()
{
_lock.lock();
try
{
if (_buffer != null && _buffer.length() == 0)
{
_buffers.returnBuffer(_buffer);
_buffer = null;
}
}
finally
{
_lock.unlock();
} }
} }