#9538 - fixed testParallelContentSourceListenersTotalFailure (#9543)

* #9538 - fixed testParallelContentSourceListenersTotalFailure

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2023-03-29 23:00:45 +02:00 committed by GitHub
parent fadf58d615
commit 46a250ebe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 45 deletions

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -410,7 +410,7 @@ public class ResponseListeners
{
private static final Logger LOG = LoggerFactory.getLogger(ContentSourceDemultiplexer.class);
private final AtomicBiInteger counters = new AtomicBiInteger(); // HI = failures; LO = demands
private final AutoLock lock = new AutoLock();
private final List<Response.ContentSourceListener> listeners = new ArrayList<>(2);
private final List<ContentSource> contentSources = new ArrayList<>(2);
private Content.Source originalContentSource;
@ -433,6 +433,32 @@ public class ResponseListeners
}
}
private Counters countStates()
{
assert lock.isHeldByCurrentThread();
int demands = 0;
int failures = 0;
for (ContentSource contentSource : contentSources)
{
switch (contentSource.state)
{
case DEMANDED -> demands++;
case FAILED -> failures++;
}
}
return new Counters(demands, failures);
}
private void resetDemands()
{
assert lock.isHeldByCurrentThread();
for (ContentSource contentSource : contentSources)
{
if (contentSource.state == State.DEMANDED)
contentSource.state = State.IDLE;
}
}
private void onDemandCallback()
{
if (LOG.isDebugEnabled())
@ -450,46 +476,55 @@ public class ResponseListeners
chunk.release();
}
private void registerFailure(Throwable failure)
private void registerFailure(ContentSource contentSource, Throwable failure)
{
while (true)
boolean processFail = false;
boolean processDemand = false;
Counters counters;
try (AutoLock ignored = lock.lock())
{
long encoded = counters.get();
int failures = AtomicBiInteger.getHi(encoded) + 1;
int demands = AtomicBiInteger.getLo(encoded);
if (demands == listeners.size() - failures)
demands = 0;
if (counters.compareAndSet(encoded, failures, demands))
contentSource.state = State.FAILED;
counters = countStates();
if (counters.failures() == listeners.size())
{
if (LOG.isDebugEnabled())
LOG.debug("Registered failure; failures={} demands={}", failures, demands);
if (failures == listeners.size())
originalContentSource.fail(failure);
else if (demands == 0)
originalContentSource.demand(this::onDemandCallback);
break;
processFail = true;
}
else if (counters.total() == listeners.size())
{
resetDemands();
processDemand = true;
}
}
if (processFail)
originalContentSource.fail(failure);
else if (processDemand)
originalContentSource.demand(this::onDemandCallback);
if (LOG.isDebugEnabled())
LOG.debug("Registered failure on {}; {}", contentSource, counters);
}
private void registerDemand()
private void registerDemand(ContentSource contentSource)
{
while (true)
boolean processDemand = false;
Counters counters;
try (AutoLock ignored = lock.lock())
{
long encoded = counters.get();
int failures = AtomicBiInteger.getHi(encoded);
int demands = AtomicBiInteger.getLo(encoded) + 1;
if (demands == listeners.size() - failures)
demands = 0;
if (counters.compareAndSet(encoded, failures, demands))
if (contentSource.state != State.IDLE)
return;
contentSource.state = State.DEMANDED;
counters = countStates();
if (counters.total() == listeners.size())
{
if (LOG.isDebugEnabled())
LOG.debug("Registered demand; failures={} demands={}", failures, demands);
if (demands == 0)
originalContentSource.demand(this::onDemandCallback);
break;
resetDemands();
processDemand = true;
}
}
if (processDemand)
originalContentSource.demand(this::onDemandCallback);
if (LOG.isDebugEnabled())
LOG.debug("Registered demand on {}; {}", contentSource, counters);
}
private class ContentSource implements Content.Source
@ -535,6 +570,7 @@ public class ResponseListeners
private final int index;
private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk chunk;
private volatile State state = State.IDLE;
private ContentSource(int index)
{
@ -606,7 +642,7 @@ public class ResponseListeners
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} demand while current chunk is {}", index, currentChunk);
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
registerDemand();
registerDemand(this);
else
onDemandCallback();
}
@ -623,7 +659,26 @@ public class ResponseListeners
currentChunk.release();
this.chunk = Content.Chunk.from(failure);
onDemandCallback();
registerFailure(failure);
registerFailure(this, failure);
}
@Override
public String toString()
{
return "%s@%x[i=%d,d=%s,c=%s,s=%s]".formatted(getClass().getSimpleName(), hashCode(), index, demandCallbackRef, chunk, state);
}
}
enum State
{
IDLE, DEMANDED, FAILED
}
private record Counters(int demands, int failures)
{
public int total()
{
return demands + failures;
}
}
}

View File

@ -750,6 +750,7 @@ public class MultiPart
@Override
public void demand(Runnable demandCallback)
{
Part part = null;
boolean invoke = false;
try (AutoLock ignored = lock.lock())
{
@ -758,23 +759,25 @@ public class MultiPart
this.demand = Objects.requireNonNull(demandCallback);
if (state == State.CONTENT)
{
part.getContentSource().demand(() ->
{
try (AutoLock ignoredAgain = lock.lock())
{
this.demand = null;
}
demandCallback.run();
});
}
part = this.part;
else
{
invoke = !parts.isEmpty() || closed || errorChunk != null;
}
}
if (invoke)
if (part != null)
{
part.getContentSource().demand(() ->
{
try (AutoLock ignoredAgain = lock.lock())
{
this.demand = null;
}
demandCallback.run();
});
}
else if (invoke)
{
invoker.run(this::invokeDemandCallback);
}
}
@Override

View File

@ -138,5 +138,7 @@ public class ByteBufferContentSource implements Content.Source
return;
terminated = Content.Chunk.from(failure);
}
// Demands are always serviced immediately so there is no
// need to ask the invoker to run invokeDemandCallback here.
}
}

View File

@ -222,6 +222,8 @@ public class PathContentSource implements Content.Source
}
return errorChunk;
}
// Demands are always serviced immediately so there is no
// need to ask the invoker to run invokeDemandCallback here.
}
@Override