Issue #2429 - Review HttpClient backpressure semantic.

Updated after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-10-08 00:52:11 +02:00
parent ef05f730e6
commit a4603c4237
1 changed files with 22 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -689,8 +690,6 @@ public abstract class HttpReceiver
private void notifyContent(HttpResponse response, ByteBuffer buffer, Callback callback)
{
if (hasManyListeners())
demands.replaceAll((k, v) -> v - 1);
HttpReceiver.this.demand(d -> d - 1);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(response, this::demand, buffer, callback, listeners);
@ -698,20 +697,18 @@ public abstract class HttpReceiver
private void demand(Object context, long value)
{
if (hasManyListeners())
if (listeners.size() > 1)
accept(context, value);
else
demand.accept(value);
}
private boolean hasManyListeners()
{
return listeners.size() > 1;
}
private void accept(Object context, long value)
{
// Increment the demand for the given listener.
demands.merge(context, value, MathUtils::cappedAdd);
// Check if we have demand from all listeners.
if (demands.size() == listeners.size())
{
long minDemand = Long.MAX_VALUE;
@ -721,7 +718,24 @@ public abstract class HttpReceiver
minDemand = demand;
}
if (minDemand > 0)
{
// We are going to demand for minDemand content
// chunks, so decrement the listener's demand by
// minDemand and remove those that have no demand left.
Iterator<Map.Entry<Object, Long>> iterator = demands.entrySet().iterator();
while (iterator.hasNext())
{
Map.Entry<Object, Long> entry = iterator.next();
long newValue = entry.getValue() - minDemand;
if (newValue == 0)
iterator.remove();
else
entry.setValue(newValue);
}
// Demand more content chunks for all the listeners.
demand.accept(minDemand);
}
}
}
}