Added missing flush after writing frames.

Frames may be written asynchronously but without a Handler, and therefore it was possible that frames
queued up, that one flush() was writing a frame without Handler, and the flush was stopping even if the
queue was non-empty.
Now we call flush() after writing a frame.
This commit is contained in:
Simone Bordet 2012-04-02 13:22:06 +02:00
parent b44fe2094f
commit 30adf7cd18
2 changed files with 33 additions and 22 deletions

View File

@ -854,6 +854,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flushing = false;
}
frameBytes.complete();
flush();
}
@Override

View File

@ -87,25 +87,8 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
List<Callable<Object>> tasks = new ArrayList<>();
for (int i = 0; i < count; ++i)
{
tasks.add(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
synCompletedData(session, headers, iterations);
return null;
}
});
}
ExecutorService threadPool = Executors.newFixedThreadPool(count);
List<Future<Object>> futures = threadPool.invokeAll(tasks);
for (Future<Object> future : futures)
future.get(iterations, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(count * iterations, TimeUnit.SECONDS));
List<Callable<Object>> tasks = new ArrayList<>();
tasks.clear();
for (int i = 0; i < count; ++i)
@ -120,11 +103,38 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
}
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
for (Future<Object> future : futures)
future.get(iterations, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(count * iterations, TimeUnit.SECONDS));
long end = System.nanoTime();
System.err.printf("SYN+GET+DATA+GET completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
futures = threadPool.invokeAll(tasks);
for (Future<Object> future : futures)
future.get(iterations, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(count * iterations, TimeUnit.SECONDS));
tasks.clear();
for (int i = 0; i < count; ++i)
{
tasks.add(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
synCompletedData(session, headers, iterations);
return null;
}
});
}
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
for (Future<Object> future : futures)
future.get(iterations, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(count * iterations, TimeUnit.SECONDS));
long end = System.nanoTime();
System.err.printf("SYN+COMPLETED+DATA completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
threadPool.shutdown();
}