Simplified Reactive Stream update for ContentSourcePublisher (#11849)

* Issue #11803 - Follow Reactive Streams specification
* Simplification of #11804 for Reactive Stream specification support

---------

Co-authored-by: Artem Golovko <artemgolovko98@gmail.com>
Co-authored-by: Olivier Lamy <olamy@apache.org>
This commit is contained in:
Greg Wilkins 2024-06-14 08:31:46 +10:00 committed by GitHub
parent cb66c6de3d
commit d34556749f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 526 additions and 89 deletions

2
Jenkinsfile vendored
View File

@ -127,7 +127,7 @@ def mavenBuild(jdk, cmdline, mvnName) {
}
finally
{
junit testResults: '**/target/surefire-reports/*.xml,**/target/invoker-reports/TEST*.xml', allowEmptyResults: true
junit testResults: '**/target/surefire-reports/**/*.xml,**/target/invoker-reports/TEST*.xml', allowEmptyResults: true
}
}
}

View File

@ -41,6 +41,11 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
@ -50,6 +55,22 @@
<argLine>@{argLine} ${jetty.surefire.argLine}
--add-reads org.eclipse.jetty.io=org.eclipse.jetty.logging</argLine>
</configuration>
<dependencies>
<!--
surefire plugin currently does not support junit5 with testng out of the box: https://maven.apache.org/surefire/maven-surefire-plugin/examples/testng.html#running-testng-and-junit-tests
We need to specify providers explicitly: https://maven.apache.org/surefire/maven-surefire-plugin/examples/providers.html
-->
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit-platform</artifactId>
<version>${maven.surefire.plugin.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>${maven.surefire.plugin.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

View File

@ -13,11 +13,17 @@
package org.eclipse.jetty.io.content;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.StaticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>Wraps a {@link Content.Source} as a {@link Flow.Publisher}.
@ -25,135 +31,282 @@ import org.eclipse.jetty.util.thread.AutoLock;
* read from the passed {@link Content.Source} and passed to {@link Flow.Subscriber#onNext(Object)}.
* If no content is available, then the {@link Content.Source#demand(Runnable)} method is used to
* ultimately call {@link Flow.Subscriber#onNext(Object)} once content is available.</p>
* <p>{@link Content.Source} can be consumed only once and does not support multicast subscription.
* {@link Content.Source} will be consumed fully, otherwise will be failed in case of any errors
* to prevent resource leaks.</p>
*/
public class ContentSourcePublisher implements Flow.Publisher<Content.Chunk>
{
private final Content.Source content;
private static final Logger LOG = LoggerFactory.getLogger(ContentSourcePublisher.class);
private final AtomicReference<Content.Source> content;
public ContentSourcePublisher(Content.Source content)
{
this.content = content;
Objects.requireNonNull(content, "Content.Source must not be null");
this.content = new AtomicReference<>(content);
}
@Override
public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
{
subscriber.onSubscribe(new SubscriptionImpl(content, subscriber));
// As per rule 1.11, we have decided to support SINGLE subscriber
// in a UNICAST configuration for this implementation. It means
// that Content.Source can be consumed only once.
Content.Source content = this.content.getAndSet(null);
if (content != null)
onSubscribe(subscriber, content);
else
onMultiSubscribe(subscriber);
}
private static class SubscriptionImpl implements Flow.Subscription
private void onSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber, Content.Source content)
{
private final AutoLock lock = new AutoLock();
private final Content.Source content;
private final Flow.Subscriber<? super Content.Chunk> subscriber;
private long demand;
private boolean stalled;
private boolean cancelled;
private boolean terminated;
public SubscriptionImpl(Content.Source content, Flow.Subscriber<? super Content.Chunk> subscriber)
// As per rule 1.9, we need to throw a `java.lang.NullPointerException`
// if the `Subscriber` is `null`
if (subscriber == null)
{
this.content = content;
this.subscriber = subscriber;
this.stalled = true;
NullPointerException error = new NullPointerException("Flow.Subscriber must not be null");
content.fail(error);
throw error;
}
ActiveSubscription subscription = new ActiveSubscription(content, subscriber);
// As per rule 1.9, this method must return normally (i.e. not throw).
try
{
subscriber.onSubscribe(subscription);
}
catch (Throwable err)
{
// As per rule 2.13, we MUST consider subscription cancelled and
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
subscription.cancel(new SuppressedException(err));
if (LOG.isTraceEnabled())
LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
}
private void onMultiSubscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
{
// As per rule 1.9, we need to throw a `java.lang.NullPointerException`
// if the `Subscriber` is `null`
if (subscriber == null)
throw new NullPointerException("Flow.Subscriber must not be null");
ExhaustedSubscription subscription = new ExhaustedSubscription();
// As per 1.9, this method must return normally (i.e. not throw).
try
{
// As per rule 1.9, the only legal way to signal about Subscriber rejection
// is by calling onError (after calling onSubscribe).
subscriber.onSubscribe(subscription);
subscriber.onError(new IllegalStateException("Content.Source was exhausted."));
}
catch (Throwable err)
{
// As per rule 2.13, we MUST consider subscription cancelled and
// MUST raise this error condition in a fashion that is adequate for the runtime environment.
if (LOG.isTraceEnabled())
LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
}
private static final class ExhaustedSubscription implements Flow.Subscription
{
@Override
public void request(long n)
{
boolean process = false;
Throwable failure = null;
try (AutoLock ignored = lock.lock())
{
if (cancelled || terminated)
return;
if (n <= 0)
{
terminated = true;
failure = new IllegalArgumentException("invalid demand " + n);
}
demand = MathUtils.cappedAdd(demand, n);
if (stalled)
{
stalled = false;
process = true;
}
}
if (failure != null)
subscriber.onError(failure);
else if (process)
process();
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
}
@Override
public void cancel()
{
try (AutoLock ignored = lock.lock())
{
cancelled = true;
}
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
}
}
private static final class ActiveSubscription extends IteratingCallback implements Flow.Subscription
{
private static final long NO_MORE_DEMAND = -1;
private static final Throwable COMPLETED = new StaticException("Source.Content read fully");
private final AtomicReference<Throwable> cancelled;
private final AtomicLong demand;
private Content.Source content;
private Flow.Subscriber<? super Content.Chunk> subscriber;
public ActiveSubscription(Content.Source content, Flow.Subscriber<? super Content.Chunk> subscriber)
{
this.cancelled = new AtomicReference<>(null);
this.demand = new AtomicLong(0);
this.content = content;
this.subscriber = subscriber;
}
private void process()
// As per rule 3.3, Subscription MUST place an upper bound on possible synchronous
// recursion between Publisher and Subscriber
//
// As per rule 1.3, onSubscribe, onNext, onError and onComplete signaled to a
// Subscriber MUST be signaled serially.
//
// IteratingCallback guarantee that process() method will be executed by one thread only.
// The process() method can be only initiated from request() or cancel() demands methods.
@Override
protected Action process()
{
while (true)
Throwable cancelled = this.cancelled.get();
if (cancelled != null)
{
try (AutoLock ignored = lock.lock())
// As per rule 3.13, Subscription.cancel() MUST request the Publisher to eventually
// drop any references to the corresponding subscriber.
this.demand.set(NO_MORE_DEMAND);
if (cancelled != COMPLETED)
this.content.fail(cancelled);
this.content = null;
try
{
if (demand > 0)
{
--demand;
}
else
{
stalled = true;
return;
}
if (cancelled == COMPLETED)
this.subscriber.onComplete();
else if (!(cancelled instanceof SuppressedException))
this.subscriber.onError(cancelled);
}
Content.Chunk chunk = content.read();
if (chunk == null)
catch (Throwable err)
{
try (AutoLock ignored = lock.lock())
{
// Restore the demand decremented above.
++demand;
stalled = true;
}
content.demand(this::process);
return;
if (LOG.isTraceEnabled())
LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
this.subscriber = null;
return Action.SUCCEEDED;
}
if (Content.Chunk.isFailure(chunk))
{
terminate();
if (!chunk.isLast())
content.fail(chunk.getFailure());
subscriber.onError(chunk.getFailure());
return;
}
Content.Chunk chunk = content.read();
subscriber.onNext(chunk);
if (chunk == null)
{
content.demand(this::succeeded);
return Action.SCHEDULED;
}
if (Content.Chunk.isFailure(chunk))
{
cancel(chunk.getFailure());
chunk.release();
if (chunk.isLast())
{
terminate();
// Reactive Stream specification rule 2.9 allows Publishers to call onComplete()
// even without demand, and Subscribers must be prepared to handle this case.
subscriber.onComplete();
return;
}
return Action.IDLE;
}
try
{
this.subscriber.onNext(chunk);
}
catch (Throwable err)
{
cancel(new SuppressedException(err));
if (LOG.isTraceEnabled())
LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
chunk.release();
if (chunk.isLast())
{
cancel(COMPLETED);
return Action.IDLE;
}
if (demand.decrementAndGet() > 0)
this.iterate();
return Action.IDLE;
}
private void terminate()
@Override
public void request(long n)
{
try (AutoLock ignored = lock.lock())
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
if (cancelled.get() != null)
return;
// As per rule 3.9, MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0.
if (n <= 0L)
{
terminated = true;
String errorMsg = "Flow.Subscriber " + subscriber + " violated rule 3.9: non-positive requests are not allowed.";
cancel(new IllegalArgumentException(errorMsg));
return;
}
// As per rule 3.17, when demand overflows `Long.MAX_VALUE`
// we treat the signalled demand as "effectively unbounded"
if (demand.updateAndGet(it -> it == NO_MORE_DEMAND ? it : MathUtils.cappedAdd(it, n)) != NO_MORE_DEMAND)
this.iterate();
}
@Override
public void cancel()
{
cancel(new CancelledException());
}
public void cancel(Throwable cause)
{
// As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
//
// As per rule 3.5, this handles cancellation requests, and is idempotent, thread-safe and not
// synchronously performing heavy computations
if (cancelled.compareAndSet(null, cause))
this.iterate();
}
// Publisher notes
//
// 1.6 If a Publisher signals either onError or onComplete on a Subscriber,
// that Subscribers Subscription MUST be considered cancelled.
// 2.4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the
// Subscription cancelled after having received the signal.
//
// Publisher failed -> cancel(Throwable)
// 1.4 If a Publisher fails it MUST signal an onError.
//
// Publisher succeeded -> cancel(COMPLETED)
// 1.5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.
// Subscriber
// 2.13 In the case that this rule is violated, any associated Subscription to the Subscriber
// MUST be considered as cancelled, and the caller MUST raise this error condition in a
// fashion that is adequate for the runtime environment.
//
// Subscriber.onSubscribe/onNext/onError/onComplete failed -> cancel(new Suppressed(cause))
// Subscription notes
//
// Subscription.cancel -> cancel(new Cancelled())
// It's not clearly specified in the specification, but according to:
// - the issue: https://github.com/reactive-streams/reactive-streams-jvm/issues/458
// - TCK test 'untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals'
// - 1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
//
// Subscription.request with negative argument -> cancel(err)
// 3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a
// java.lang.IllegalArgumentException if the argument is <= 0.
}
private static class SuppressedException extends Exception
{
SuppressedException(String message)
{
super(message);
}
SuppressedException(Throwable cause)
{
super(cause.getMessage(), cause);
}
}
private static class CancelledException extends SuppressedException
{
CancelledException()
{
super("Subscription was cancelled");
}
}
}

View File

@ -0,0 +1,251 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io.content;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.Content;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
import org.testng.annotations.Test;
@Test
public final class ContentSourcePublisherTest extends FlowPublisherVerification<Content.Chunk>
{
public ContentSourcePublisherTest()
{
super(new TestEnvironment());
}
@Override
public Flow.Publisher<Content.Chunk> createFlowPublisher(long elements)
{
Content.Source source = new SyntheticContentSource(elements);
return new ContentSourcePublisher(source);
}
@Override
public Flow.Publisher<Content.Chunk> createFailedFlowPublisher()
{
Content.Source source = new SyntheticContentSource(0);
Flow.Publisher<Content.Chunk> publisher = new ContentSourcePublisher(source);
// Simulate exhausted Content.Source
publisher.subscribe(new Flow.Subscriber<>()
{
@Override
public void onSubscribe(Flow.Subscription subscription)
{
subscription.cancel();
}
@Override
public void onNext(Content.Chunk item)
{
}
@Override
public void onError(Throwable throwable)
{
}
@Override
public void onComplete()
{
}
});
return publisher;
}
private static final class SyntheticContentSource implements Content.Source
{
private final AtomicReference<State> state;
private final long contentSize;
public SyntheticContentSource(long chunksToRead)
{
this.state = new AtomicReference<>(new State.Reading(chunksToRead));
this.contentSize = State.Reading.chunkSize * Math.max(chunksToRead, 0);
}
@Override
public long getLength()
{
return contentSize;
}
@Override
public Content.Chunk read()
{
return state.getAndUpdate(before -> before.read()).chunk();
}
@Override
public void demand(Runnable demandCallback)
{
// recursive stack overflow not necessary for this test
demandCallback.run();
}
@Override
public void fail(Throwable failure)
{
fail(failure, true);
}
@Override
public void fail(Throwable failure, boolean last)
{
state.getAndUpdate(before -> before.fail(failure, last));
}
@Override
public boolean rewind()
{
return false;
}
private sealed interface State permits State.Reading, State.ReadFailed, State.ReadCompleted
{
Content.Chunk chunk();
State read();
State fail(Throwable failure, boolean last);
final class Reading implements State
{
public static final int chunkSize = 16;
private static final Random random = new Random();
private final long chunksToRead;
private final Content.Chunk chunk;
public Reading(long chunksToRead)
{
this.chunksToRead = chunksToRead;
this.chunk = generateValidChunk(chunksToRead);
}
public Reading(long chunksToRead, Throwable transientFailure)
{
this.chunksToRead = chunksToRead;
this.chunk = generateFailureChunk(transientFailure);
}
@Override
public Content.Chunk chunk()
{
return chunk;
}
@Override
public State read()
{
long leftToRead = leftToRead();
if (leftToRead <= 0)
return new ReadCompleted();
return new Reading(leftToRead);
}
@Override
public State fail(Throwable failure, boolean last)
{
if (last)
return new ReadFailed(failure);
return new Reading(chunksToRead, failure);
}
private long leftToRead()
{
if (chunksToRead == Long.MAX_VALUE) // endless source
return chunksToRead;
return chunksToRead - 1;
}
private static Content.Chunk generateFailureChunk(Throwable transientFailure)
{
return Content.Chunk.from(transientFailure, false);
}
private static Content.Chunk generateValidChunk(long chunksToRead)
{
if (chunksToRead <= 0)
return Content.Chunk.EOF;
if (chunksToRead == 1)
return Content.Chunk.from(randomPayload(), true);
return Content.Chunk.from(randomPayload(), false);
}
private static ByteBuffer randomPayload()
{
byte[] payload = new byte[chunkSize];
random.nextBytes(payload);
return ByteBuffer.wrap(payload);
}
}
final class ReadFailed implements State
{
private final Content.Chunk chunk;
public ReadFailed(Throwable failure)
{
this.chunk = Content.Chunk.from(failure, true);
}
@Override
public Content.Chunk chunk()
{
return chunk;
}
@Override
public State read()
{
return this;
}
@Override
public State fail(Throwable failure, boolean last)
{
return this;
}
}
final class ReadCompleted implements State
{
@Override
public Content.Chunk chunk()
{
return Content.Chunk.EOF;
}
@Override
public State read()
{
return this;
}
@Override
public State fail(Throwable failure, boolean last)
{
return this;
}
}
}
}
}

12
pom.xml
View File

@ -300,6 +300,7 @@
<plexus-xml.version>4.0.3</plexus-xml.version>
<project.build.outputTimestamp>2023-06-05T23:12:49Z</project.build.outputTimestamp>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<reactive-streams.version>1.0.4</reactive-streams.version>
<settingsPath>src/it/settings.xml</settingsPath>
<slf4j.version>2.0.12</slf4j.version>
<spifly.version>1.3.7</spifly.version>
@ -308,6 +309,7 @@
<surefire.rerunFailingTestsCount>0</surefire.rerunFailingTestsCount>
<swissbox.version>1.8.3</swissbox.version>
<testcontainers.version>1.19.7</testcontainers.version>
<testng.version>7.10.2</testng.version>
<tinybundles.version>3.0.0</tinybundles.version>
<versions.maven.plugin.version>2.16.2</versions.maven.plugin.version>
<wildfly.common.version>1.7.0.Final</wildfly.common.version>
@ -1155,6 +1157,11 @@
<artifactId>osgi.core</artifactId>
<version>${org.osgi.core.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>${reactive-streams.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl104-over-slf4j</artifactId>
@ -1191,6 +1198,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
</dependency>
<dependency>
<groupId>org.wildfly.common</groupId>
<artifactId>wildfly-common</artifactId>