Fixed more bugs in flow control.

This commit is contained in:
Simone Bordet 2012-02-14 10:23:18 +01:00
parent 3660537151
commit 1c18cba38a
2 changed files with 102 additions and 23 deletions

View File

@ -18,9 +18,9 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -60,7 +60,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
private static final Logger logger = LoggerFactory.getLogger(Session.class); private static final Logger logger = LoggerFactory.getLogger(Session.class);
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final Queue<FrameBytes> queue = new LinkedList<>(); private final Deque<FrameBytes> queue = new LinkedList<>();
private final Controller<FrameBytes> controller; private final Controller<FrameBytes> controller;
private final AtomicInteger streamIds; private final AtomicInteger streamIds;
private final AtomicInteger pingIds; private final AtomicInteger pingIds;
@ -592,7 +592,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
updateLastStreamId(stream); updateLastStreamId(stream);
ByteBuffer buffer = generator.control(frame); ByteBuffer buffer = generator.control(frame);
logger.debug("Posting {} on {}", frame, stream); logger.debug("Posting {} on {}", frame, stream);
enqueue(new ControlFrameBytes(frame, buffer)); enqueueLast(new ControlFrameBytes(frame, buffer));
} }
private void updateLastStreamId(IStream stream) private void updateLastStreamId(IStream stream)
@ -611,7 +611,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
public void data(IStream stream, DataInfo dataInfo) public void data(IStream stream, DataInfo dataInfo)
{ {
logger.debug("Posting {} on {}", dataInfo, stream); logger.debug("Posting {} on {}", dataInfo, stream);
enqueue(new DataFrameBytes(stream, dataInfo)); enqueueLast(new DataFrameBytes(stream, dataInfo));
flush(); flush();
} }
@ -639,7 +639,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
buffer = frameBytes.getByteBuffer(); buffer = frameBytes.getByteBuffer();
if (buffer == null) if (buffer == null)
{ {
enqueue(frameBytes); enqueueFirst(frameBytes);
logger.debug("Flush skipped, {} frame(s) in queue", queue.size()); logger.debug("Flush skipped, {} frame(s) in queue", queue.size());
return; return;
} }
@ -652,24 +652,32 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
write(buffer, this, frameBytes); write(buffer, this, frameBytes);
} }
private void enqueue(FrameBytes frameBytes) private void enqueueLast(FrameBytes frameBytes)
{ {
// TODO: handle priority; e.g. use queues to prioritize the buffers ? // TODO: handle priority; e.g. use queues to prioritize the buffers ?
synchronized (queue) synchronized (queue)
{ {
queue.offer(frameBytes); queue.offerLast(frameBytes);
}
}
private void enqueueFirst(FrameBytes frameBytes)
{
synchronized (queue)
{
queue.offerFirst(frameBytes);
} }
} }
@Override @Override
public void complete(FrameBytes context) public void complete(FrameBytes frameBytes)
{ {
logger.debug("Completed write of {}, {} frame(s) in queue", context, queue.size());
synchronized (queue) synchronized (queue)
{ {
logger.debug("Completed write of {}, {} frame(s) in queue", frameBytes, queue.size());
flushing = false; flushing = false;
} }
context.complete(); frameBytes.complete();
flush(); flush();
} }
@ -678,14 +686,14 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
controller.write(buffer, handler, frameBytes); controller.write(buffer, handler, frameBytes);
} }
protected abstract static class FrameBytes public interface FrameBytes
{ {
protected abstract ByteBuffer getByteBuffer(); public abstract ByteBuffer getByteBuffer();
public abstract void complete(); public abstract void complete();
} }
private class ControlFrameBytes extends FrameBytes private class ControlFrameBytes implements FrameBytes
{ {
private final ControlFrame frame; private final ControlFrame frame;
private final ByteBuffer buffer; private final ByteBuffer buffer;
@ -697,7 +705,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
} }
@Override @Override
protected ByteBuffer getByteBuffer() public ByteBuffer getByteBuffer()
{ {
return buffer; return buffer;
} }
@ -720,7 +728,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
} }
} }
private class DataFrameBytes extends FrameBytes private class DataFrameBytes implements FrameBytes
{ {
private final IStream stream; private final IStream stream;
private final DataInfo data; private final DataInfo data;
@ -733,7 +741,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
} }
@Override @Override
protected ByteBuffer getByteBuffer() public ByteBuffer getByteBuffer()
{ {
int windowSize = stream.getWindowSize(); int windowSize = stream.getWindowSize();
if (windowSize <= 0) if (windowSize <= 0)
@ -752,7 +760,9 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
if (!data.isConsumed()) if (!data.isConsumed())
{ {
enqueue(this); // If we could not write a full data frame, then we need first
// to finish it, and then process the others (to avoid data garbling)
enqueueFirst(this);
} }
else else
{ {
@ -765,7 +775,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
@Override @Override
public String toString() public String toString()
{ {
return "data on " + stream; return String.format("data@%x consumed=%b on %s", hashCode(), data.isConsumed(), stream);
} }
} }
} }

View File

@ -36,13 +36,14 @@ public class FlowControlTest extends AbstractTest
@Test @Test
public void testServerFlowControlOneBigWrite() throws Exception public void testServerFlowControlOneBigWrite() throws Exception
{ {
final int length = 128 * 1024;
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{ {
@Override @Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo) public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{ {
stream.reply(new ReplyInfo(false)); stream.reply(new ReplyInfo(false));
stream.data(new BytesDataInfo(new byte[128 * 1024], true)); stream.data(new BytesDataInfo(new byte[length], true));
return null; return null;
} }
}), null); }), null);
@ -60,20 +61,22 @@ public class FlowControlTest extends AbstractTest
} }
}); });
Assert.assertTrue(dataLatch.await(500, TimeUnit.SECONDS)); Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(length, bytes.get());
} }
@Test @Test
public void testServerFlowControlTwoBigWrites() throws Exception public void testServerFlowControlTwoBigWrites() throws Exception
{ {
final int length = 128 * 1024;
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{ {
@Override @Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo) public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{ {
stream.reply(new ReplyInfo(false)); stream.reply(new ReplyInfo(false));
stream.data(new BytesDataInfo(new byte[128 * 1024], false)); stream.data(new BytesDataInfo(new byte[length], false));
stream.data(new BytesDataInfo(new byte[128 * 1024], true)); stream.data(new BytesDataInfo(new byte[length], true));
return null; return null;
} }
}), null); }), null);
@ -91,6 +94,72 @@ public class FlowControlTest extends AbstractTest
} }
}); });
Assert.assertTrue(dataLatch.await(500, TimeUnit.SECONDS)); Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2 * length, bytes.get());
}
@Test
public void testClientFlowControlOneBigWrite() throws Exception
{
final AtomicInteger bytes = new AtomicInteger();
final CountDownLatch dataLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false));
return new Stream.FrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
bytes.addAndGet(dataInfo.getBytesCount());
if (dataInfo.isClose())
dataLatch.countDown();
}
};
}
}), null);
Stream stream = session.syn(SPDY.V2, new SynInfo(true), null);
int length = 128 * 1024;
stream.data(new BytesDataInfo(new byte[length], true));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(length, bytes.get());
}
@Test
public void testClientFlowControlTwoBigWrites() throws Exception
{
final AtomicInteger bytes = new AtomicInteger();
final CountDownLatch dataLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public Stream.FrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false));
return new Stream.FrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
bytes.addAndGet(dataInfo.getBytesCount());
if (dataInfo.isClose())
dataLatch.countDown();
}
};
}
}), null);
Stream stream = session.syn(SPDY.V2, new SynInfo(true), null);
int length = 128 * 1024;
stream.data(new BytesDataInfo(new byte[length], false));
stream.data(new BytesDataInfo(new byte[length], true));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2 * length, bytes.get());
} }
} }