WebSocketAsyncConnection.write() with byte[] array fails occasionally
This commit is contained in:
parent
84fa1163e9
commit
70ff05e955
|
@ -243,15 +243,15 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
|
|||
@Override
|
||||
public <C> void write(C context, Callback<C> callback, byte buf[], int offset, int len) throws IOException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
|
||||
}
|
||||
if (len == 0)
|
||||
{
|
||||
// nothing to write
|
||||
return;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
|
||||
}
|
||||
ByteBuffer raw = bufferPool.acquire(len + FrameGenerator.OVERHEAD,false);
|
||||
BufferUtil.clearToFill(raw);
|
||||
BinaryFrame frame = new BinaryFrame(buf,offset,len);
|
||||
|
|
|
@ -154,9 +154,35 @@ public class WebSocketServletRFCTest
|
|||
client.write(bin); // write buf3 (fin=true)
|
||||
|
||||
// Read frame echo'd back (hopefully a single binary frame)
|
||||
Queue<BaseFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500);
|
||||
Queue<BaseFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
|
||||
BinaryFrame binmsg = (BinaryFrame)frames.remove();
|
||||
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(128 * 3));
|
||||
int expectedSize = buf1.length + buf2.length + buf3.length;
|
||||
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize));
|
||||
|
||||
int aaCount = 0;
|
||||
int bbCount = 0;
|
||||
int ccCount = 0;
|
||||
byte echod[] = binmsg.getPayloadData();
|
||||
for (byte b : echod)
|
||||
{
|
||||
switch (b)
|
||||
{
|
||||
case (byte)0xAA:
|
||||
aaCount++;
|
||||
break;
|
||||
case (byte)0xBB:
|
||||
bbCount++;
|
||||
break;
|
||||
case (byte)0xCC:
|
||||
ccCount++;
|
||||
break;
|
||||
default:
|
||||
Assert.fail(String.format("Encountered invalid byte 0x%02X",(byte)(0xFF & b)));
|
||||
}
|
||||
}
|
||||
Assert.assertThat("Echoed data count for 0xAA",aaCount,is(buf1.length));
|
||||
Assert.assertThat("Echoed data count for 0xBB",bbCount,is(buf2.length));
|
||||
Assert.assertThat("Echoed data count for 0xCC",ccCount,is(buf3.length));
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
|
|
@ -206,12 +206,14 @@ public class BlockheadClient implements Parser.Listener
|
|||
{
|
||||
int startCount = incomingFrameQueue.size();
|
||||
|
||||
long expireOn = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
|
||||
|
||||
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
|
||||
BufferUtil.clearToFill(buf);
|
||||
try
|
||||
{
|
||||
long now = System.currentTimeMillis();
|
||||
long expireOn = now + TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
|
||||
LOG.debug("Now: {} - expireOn: {}",now,expireOn);
|
||||
|
||||
int len = 0;
|
||||
while (incomingFrameQueue.size() < (startCount + expectedCount))
|
||||
{
|
||||
|
@ -222,6 +224,14 @@ public class BlockheadClient implements Parser.Listener
|
|||
BufferUtil.flipToFlush(buf,0);
|
||||
parser.parse(buf);
|
||||
}
|
||||
try
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(20);
|
||||
}
|
||||
catch (InterruptedException gnore)
|
||||
{
|
||||
/* ignore */
|
||||
}
|
||||
if (!DEBUG && (System.currentTimeMillis() > expireOn))
|
||||
{
|
||||
throw new TimeoutException("Timeout reading all of the desired frames");
|
||||
|
|
Loading…
Reference in New Issue