Vastly improved queueing of FrameBytes, by appending at the end of the queue (instead of iterating over the queue).

Also, implemented a better fix for the missing flush() in case of missing handlers: now instead of flushing in the write
completion handler (which could lead to stack overflows), we use the same mechanism employed for FrameBytes,
where we avoid stack overflows by dispatching to a new thread after few recursive invocations.
This commit is contained in:
Simone Bordet 2012-04-04 19:21:16 +02:00
parent 911643b783
commit 50f545b29a
1 changed files with 39 additions and 35 deletions

View File

@ -820,15 +820,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void append(FrameBytes frameBytes) private void append(FrameBytes frameBytes)
{ {
enqueue(frameBytes, false); synchronized (queue)
{
int index = queue.size();
while (index > 0)
{
FrameBytes element = queue.get(index - 1);
if (element.compareTo(frameBytes) >= 0)
break;
--index;
}
queue.add(index, frameBytes);
}
} }
private void prepend(FrameBytes frameBytes) private void prepend(FrameBytes frameBytes)
{
enqueue(frameBytes, true);
}
private void enqueue(FrameBytes frameBytes, boolean prepend)
{ {
synchronized (queue) synchronized (queue)
{ {
@ -836,8 +842,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
while (index < queue.size()) while (index < queue.size())
{ {
FrameBytes element = queue.get(index); FrameBytes element = queue.get(index);
int comparison = element.compareTo(frameBytes); if (element.compareTo(frameBytes) <= 0)
if (comparison > 0 || prepend && comparison == 0)
break; break;
++index; ++index;
} }
@ -854,7 +859,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flushing = false; flushing = false;
} }
frameBytes.complete(); frameBytes.complete();
flush();
} }
@Override @Override
@ -874,37 +878,36 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private <C> void complete(final Handler<C> handler, final C context) private <C> void complete(final Handler<C> handler, final C context)
{ {
if (handler != null) // Applications may send and queue up a lot of frames and
// if we call Handler.completed() only synchronously we risk
// starvation (for the last frames sent) and stack overflow.
// Therefore every some invocation, we dispatch to a new thread
Integer invocations = handlerInvocations.get();
if (invocations >= 4)
{ {
// Applications may send and queue up a lot of frames and execute(new Runnable()
// if we call Handler.completed() only synchronously we risk
// starvation (for the last frames sent) and stack overflow.
// Therefore every some invocation, we dispatch to a new thread
Integer invocations = handlerInvocations.get();
if (invocations >= 4)
{ {
execute(new Runnable() @Override
public void run()
{ {
@Override if (handler != null)
public void run()
{
notifyHandlerCompleted(handler, context); notifyHandlerCompleted(handler, context);
flush();
}
});
}
else
{
handlerInvocations.set(invocations + 1);
try
{
notifyHandlerCompleted(handler, context);
flush(); flush();
} }
finally });
{ }
handlerInvocations.set(invocations); else
} {
handlerInvocations.set(invocations + 1);
try
{
if (handler != null)
notifyHandlerCompleted(handler, context);
flush();
}
finally
{
handlerInvocations.set(invocations);
} }
} }
} }
@ -967,7 +970,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override @Override
public int compareTo(FrameBytes that) public int compareTo(FrameBytes that)
{ {
return getStream().getPriority() - that.getStream().getPriority(); // If this.stream.priority > that.stream.priority => -1 (this.stream has less priority than that.stream)
return that.getStream().getPriority() - getStream().getPriority();
} }
@Override @Override