From 4ce461559739c4280914c806ada4b32dd0f169ee Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 8 Aug 2013 12:08:41 +0200 Subject: [PATCH] 414652 - WebSocket's sendMessage() may hang on congested connections. Fixed by using Locks instead of the synchronized keyword, and by using tryLock() in flushBuffer() and flush(), that are called by WebSocketConnection.handle(). It is quite a hack, but the real fix is in Jetty 9 where the architecture is different and there are two different paths for reading and writing, so that this problem could not even happen. --- .../websocket/WebSocketGeneratorRFC6455.java | 397 ++++++++++-------- 1 file changed, 224 insertions(+), 173 deletions(-) diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorRFC6455.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorRFC6455.java index dedaa735aa3..6c72bfc17f4 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorRFC6455.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorRFC6455.java @@ -19,14 +19,16 @@ package org.eclipse.jetty.websocket; import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; -/* ------------------------------------------------------------ */ -/** WebSocketGenerator. +/** + * WebSocketGenerator. * This class generates websocket packets. * It is fully synchronized because it is likely that async * threads will call the addMessage methods while other @@ -34,214 +36,263 @@ import org.eclipse.jetty.io.EofException; */ public class WebSocketGeneratorRFC6455 implements WebSocketGenerator { - final private WebSocketBuffers _buffers; - final private EndPoint _endp; + private final Lock _lock = new ReentrantLock(); + private final WebSocketBuffers _buffers; + private final EndPoint _endp; + private final byte[] _mask = new byte[4]; + private final MaskGen _maskGen; private Buffer _buffer; - private final byte[] _mask=new byte[4]; private int _m; private boolean _opsent; - private final MaskGen _maskGen; private boolean _closed; public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp) { - _buffers=buffers; - _endp=endp; - _maskGen=null; + this(buffers, endp, null); } public WebSocketGeneratorRFC6455(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen) { - _buffers=buffers; - _endp=endp; - _maskGen=maskGen; + _buffers = buffers; + _endp = endp; + _maskGen = maskGen; } - public synchronized Buffer getBuffer() + public Buffer getBuffer() { - return _buffer; - } - - public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException - { - // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length); - - if (_closed) - throw new EofException("Closed"); - if (opcode==WebSocketConnectionRFC6455.OP_CLOSE) - _closed=true; - - boolean mask=_maskGen!=null; - - if (_buffer==null) - _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer(); - - boolean last=WebSocketConnectionRFC6455.isLastFrame(flags); - - int space=mask?14:10; - - do + _lock.lock(); + try { - opcode = _opsent?WebSocketConnectionRFC6455.OP_CONTINUATION:opcode; - opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode)); - _opsent=true; + return _buffer; + } + finally + { + _lock.unlock(); + } + } - int payload=length; - if (payload+space>_buffer.capacity()) - { - // We must fragement, so clear FIN bit - opcode=(byte)(opcode&0x7F); // Clear the FIN bit - payload=_buffer.capacity()-space; - } - else if (last) - opcode= (byte)(opcode|0x80); // Set the FIN bit + public void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException + { + _lock.lock(); + try + { + if (_closed) + throw new EofException("Closed"); + if (opcode == WebSocketConnectionRFC6455.OP_CLOSE) + _closed = true; - // ensure there is space for header - if (_buffer.space() <= space) + boolean mask = _maskGen != null; + + if (_buffer == null) + _buffer = mask ? _buffers.getBuffer() : _buffers.getDirectBuffer(); + + boolean last = WebSocketConnectionRFC6455.isLastFrame(flags); + + int space = mask ? 14 : 10; + + do { - flushBuffer(); + opcode = _opsent ? WebSocketConnectionRFC6455.OP_CONTINUATION : opcode; + opcode = (byte)(((0xf & flags) << 4) + (0xf & opcode)); + _opsent = true; + + int payload = length; + if (payload + space > _buffer.capacity()) + { + // We must fragement, so clear FIN bit + opcode = (byte)(opcode & 0x7F); // Clear the FIN bit + payload = _buffer.capacity() - space; + } + else if (last) + opcode = (byte)(opcode | 0x80); // Set the FIN bit + + // ensure there is space for header if (_buffer.space() <= space) - flush(); - } + { + flushBuffer(); + if (_buffer.space() <= space) + flush(); + } - // write the opcode and length - if (payload>0xffff) - { - _buffer.put(new byte[]{ - opcode, - mask?(byte)0xff:(byte)0x7f, - (byte)0, - (byte)0, - (byte)0, - (byte)0, - (byte)((payload>>24)&0xff), - (byte)((payload>>16)&0xff), - (byte)((payload>>8)&0xff), - (byte)(payload&0xff)}); - } - else if (payload >=0x7e) - { - _buffer.put(new byte[]{ - opcode, - mask?(byte)0xfe:(byte)0x7e, - (byte)(payload>>8), - (byte)(payload&0xff)}); - } - else - { - _buffer.put(new byte[]{ - opcode, - (byte)(mask?(0x80|payload):payload)}); - } - - // write mask - if (mask) - { - _maskGen.genMask(_mask); - _m=0; - _buffer.put(_mask); - } - - // write payload - int remaining = payload; - while (remaining > 0) - { - _buffer.compact(); - int chunk = remaining < _buffer.space() ? remaining : _buffer.space(); + // write the opcode and length + if (payload > 0xffff) + { + _buffer.put(new byte[]{ + opcode, + mask ? (byte)0xff : (byte)0x7f, + (byte)0, + (byte)0, + (byte)0, + (byte)0, + (byte)((payload >> 24) & 0xff), + (byte)((payload >> 16) & 0xff), + (byte)((payload >> 8) & 0xff), + (byte)(payload & 0xff)}); + } + else if (payload >= 0x7e) + { + _buffer.put(new byte[]{ + opcode, + mask ? (byte)0xfe : (byte)0x7e, + (byte)(payload >> 8), + (byte)(payload & 0xff)}); + } + else + { + _buffer.put(new byte[]{ + opcode, + (byte)(mask ? (0x80 | payload) : payload)}); + } + // write mask if (mask) { - for (int i=0;i 0) + // write payload + int remaining = payload; + while (remaining > 0) { - // Gently flush the data, issuing a non-blocking write - flushBuffer(); - } - else - { - // Forcibly flush the data, issuing a blocking write - flush(); - if (remaining == 0) + _buffer.compact(); + int chunk = remaining < _buffer.space() ? remaining : _buffer.space(); + + if (mask) + { + for (int i = 0; i < chunk; i++) + _buffer.put((byte)(content[offset + (payload - remaining) + i] ^ _mask[+_m++ % 4])); + } + else + _buffer.put(content, offset + (payload - remaining), chunk); + + remaining -= chunk; + if (_buffer.space() > 0) { // Gently flush the data, issuing a non-blocking write flushBuffer(); } + else + { + // Forcibly flush the data, issuing a blocking write + flush(); + if (remaining == 0) + { + // Gently flush the data, issuing a non-blocking write + flushBuffer(); + } + } } + offset += payload; + length -= payload; } - offset+=payload; - length-=payload; - } - while (length>0); - _opsent=!last; + while (length > 0); + _opsent = !last; - if (_buffer!=null && _buffer.length()==0) - { - _buffers.returnBuffer(_buffer); - _buffer=null; - } - } - - public synchronized int flushBuffer() throws IOException - { - if (!_endp.isOpen()) - throw new EofException(); - - if (_buffer!=null) - { - int flushed=_buffer.hasContent()?_endp.flush(_buffer):0; - if (_closed&&_buffer.length()==0) - _endp.shutdownOutput(); - return flushed; - } - - return 0; - } - - public synchronized int flush() throws IOException - { - if (_buffer==null) - return 0; - int result = flushBuffer(); - - if (!_endp.isBlocking()) - { - long now = System.currentTimeMillis(); - long end=now+_endp.getMaxIdleTime(); - while (_buffer.length()>0) + if (_buffer != null && _buffer.length() == 0) { - boolean ready = _endp.blockWritable(end-now); - if (!ready) - { - now = System.currentTimeMillis(); - if (now 0) + { + boolean ready = _endp.blockWritable(end - now); + if (!ready) + { + now = System.currentTimeMillis(); + if (now < end) + continue; + throw new IOException("Write timeout"); + } + + result += flushBuffer(); + } + } + _buffer.compact(); + return result; + } + finally + { + _lock.unlock(); + } + } + + public boolean isBufferEmpty() + { + _lock.lock(); + try + { + return _buffer == null || _buffer.length() == 0; + } + finally + { + _lock.unlock(); + } + } + + public void returnBuffer() + { + _lock.lock(); + try + { + if (_buffer != null && _buffer.length() == 0) + { + _buffers.returnBuffer(_buffer); + _buffer = null; + } + } + finally + { + _lock.unlock(); } }