WebSocket - making flush loop and callback notify more consistent
This commit is contained in:
parent
bfd68d7e61
commit
098b1e174b
|
@ -209,39 +209,37 @@ public class WriteBytesProvider implements Callback
|
||||||
|
|
||||||
public void failAll(Throwable t)
|
public void failAll(Throwable t)
|
||||||
{
|
{
|
||||||
boolean notified = false;
|
// Collect entries for callback
|
||||||
|
List<FrameEntry> callbacks = new ArrayList<>();
|
||||||
|
|
||||||
// fail active (if set)
|
synchronized (this)
|
||||||
if (active != null)
|
|
||||||
{
|
{
|
||||||
FrameEntry entry = active;
|
// fail active (if set)
|
||||||
active = null;
|
if (active != null)
|
||||||
entry.notifyFailure(t);
|
{
|
||||||
notified = true;
|
FrameEntry entry = active;
|
||||||
|
active = null;
|
||||||
|
callbacks.add(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
callbacks.addAll(past);
|
||||||
|
callbacks.addAll(queue);
|
||||||
|
|
||||||
|
past.clear();
|
||||||
|
queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
failure = t;
|
// notify flush callback
|
||||||
|
if (!callbacks.isEmpty())
|
||||||
// fail past
|
|
||||||
while (!past.isEmpty())
|
|
||||||
{
|
{
|
||||||
FrameEntry entry = past.pop();
|
// TODO: always notify instead?
|
||||||
entry.notifyFailure(t);
|
|
||||||
notified = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// fail others
|
|
||||||
while (!queue.isEmpty())
|
|
||||||
{
|
|
||||||
FrameEntry entry = queue.pop();
|
|
||||||
entry.notifyFailure(t);
|
|
||||||
notified = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (notified)
|
|
||||||
{
|
|
||||||
// notify flush callback
|
|
||||||
flushCallback.failed(t);
|
flushCallback.failed(t);
|
||||||
|
|
||||||
|
// notify entry callbacks
|
||||||
|
for (FrameEntry entry : callbacks)
|
||||||
|
{
|
||||||
|
entry.notifyFailure(t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,22 +361,31 @@ public class WriteBytesProvider implements Callback
|
||||||
@Override
|
@Override
|
||||||
public void succeeded()
|
public void succeeded()
|
||||||
{
|
{
|
||||||
if ((active != null) && (active.frame.remaining() <= 0))
|
// Collect entries for callback
|
||||||
{
|
List<FrameEntry> callbacks = new ArrayList<>();
|
||||||
// All done with active FrameEntry
|
|
||||||
FrameEntry entry = active;
|
|
||||||
active = null;
|
|
||||||
entry.notifySucceeded();
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!past.isEmpty())
|
synchronized (this)
|
||||||
{
|
{
|
||||||
FrameEntry entry = past.pop();
|
if ((active != null) && (active.frame.remaining() <= 0))
|
||||||
entry.notifySucceeded();
|
{
|
||||||
|
// All done with active FrameEntry
|
||||||
|
FrameEntry entry = active;
|
||||||
|
active = null;
|
||||||
|
callbacks.add(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
callbacks.addAll(past);
|
||||||
|
past.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify flush callback
|
// notify flush callback
|
||||||
flushCallback.succeeded();
|
flushCallback.succeeded();
|
||||||
|
|
||||||
|
// notify entry callbacks outside of synchronize
|
||||||
|
for (FrameEntry entry : callbacks)
|
||||||
|
{
|
||||||
|
entry.notifySucceeded();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue