diff --git a/Jenkinsfile b/Jenkinsfile
index c794594c44f..b15de87c1d5 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -127,7 +127,7 @@ def mavenBuild(jdk, cmdline, mvnName) {
}
finally
{
- junit testResults: '**/target/surefire-reports/*.xml,**/target/invoker-reports/TEST*.xml', allowEmptyResults: true
+ junit testResults: '**/target/surefire-reports/**/*.xml,**/target/invoker-reports/TEST*.xml', allowEmptyResults: true
}
}
}
diff --git a/jetty-core/jetty-io/pom.xml b/jetty-core/jetty-io/pom.xml
index b1169af256e..706ad10427a 100644
--- a/jetty-core/jetty-io/pom.xml
+++ b/jetty-core/jetty-io/pom.xml
@@ -41,6 +41,11 @@
jetty-test-helper
test
+
+ org.reactivestreams
+ reactive-streams-tck-flow
+ test
+
@@ -50,6 +55,22 @@
@{argLine} ${jetty.surefire.argLine}
--add-reads org.eclipse.jetty.io=org.eclipse.jetty.logging
+
+
+
+ org.apache.maven.surefire
+ surefire-junit-platform
+ ${maven.surefire.plugin.version}
+
+
+ org.apache.maven.surefire
+ surefire-testng
+ ${maven.surefire.plugin.version}
+
+
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java
index e48419104f5..0e3642daadf 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java
@@ -13,11 +13,17 @@
package org.eclipse.jetty.io.content;
+import java.util.Objects;
import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.MathUtils;
-import org.eclipse.jetty.util.thread.AutoLock;
+import org.eclipse.jetty.util.StaticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
Wraps a {@link Content.Source} as a {@link Flow.Publisher}.
@@ -25,135 +31,282 @@ import org.eclipse.jetty.util.thread.AutoLock;
* read from the passed {@link Content.Source} and passed to {@link Flow.Subscriber#onNext(Object)}.
* If no content is available, then the {@link Content.Source#demand(Runnable)} method is used to
* ultimately call {@link Flow.Subscriber#onNext(Object)} once content is available.
+ * {@link Content.Source} can be consumed only once and does not support multicast subscription.
+ * {@link Content.Source} will be consumed fully, otherwise will be failed in case of any errors
+ * to prevent resource leaks.
*/
public class ContentSourcePublisher implements Flow.Publisher
{
- private final Content.Source content;
+ private static final Logger LOG = LoggerFactory.getLogger(ContentSourcePublisher.class);
+
+ private final AtomicReference content;
public ContentSourcePublisher(Content.Source content)
{
- this.content = content;
+ Objects.requireNonNull(content, "Content.Source must not be null");
+ this.content = new AtomicReference<>(content);
}
@Override
public void subscribe(Flow.Subscriber super Content.Chunk> subscriber)
{
- subscriber.onSubscribe(new SubscriptionImpl(content, subscriber));
+ // As per rule 1.11, we have decided to support SINGLE subscriber
+ // in a UNICAST configuration for this implementation. It means
+ // that Content.Source can be consumed only once.
+ Content.Source content = this.content.getAndSet(null);
+ if (content != null)
+ onSubscribe(subscriber, content);
+ else
+ onMultiSubscribe(subscriber);
}
- private static class SubscriptionImpl implements Flow.Subscription
+ private void onSubscribe(Flow.Subscriber super Content.Chunk> subscriber, Content.Source content)
{
- private final AutoLock lock = new AutoLock();
- private final Content.Source content;
- private final Flow.Subscriber super Content.Chunk> subscriber;
- private long demand;
- private boolean stalled;
- private boolean cancelled;
- private boolean terminated;
-
- public SubscriptionImpl(Content.Source content, Flow.Subscriber super Content.Chunk> subscriber)
+ // As per rule 1.9, we need to throw a `java.lang.NullPointerException`
+ // if the `Subscriber` is `null`
+ if (subscriber == null)
{
- this.content = content;
- this.subscriber = subscriber;
- this.stalled = true;
+ NullPointerException error = new NullPointerException("Flow.Subscriber must not be null");
+ content.fail(error);
+ throw error;
}
+ ActiveSubscription subscription = new ActiveSubscription(content, subscriber);
+ // As per rule 1.9, this method must return normally (i.e. not throw).
+ try
+ {
+ subscriber.onSubscribe(subscription);
+ }
+ catch (Throwable err)
+ {
+ // As per rule 2.13, we MUST consider subscription cancelled and
+ // MUST raise this error condition in a fashion that is adequate for the runtime environment.
+ subscription.cancel(new SuppressedException(err));
+ if (LOG.isTraceEnabled())
+ LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
+ }
+ }
+
+ private void onMultiSubscribe(Flow.Subscriber super Content.Chunk> subscriber)
+ {
+ // As per rule 1.9, we need to throw a `java.lang.NullPointerException`
+ // if the `Subscriber` is `null`
+ if (subscriber == null)
+ throw new NullPointerException("Flow.Subscriber must not be null");
+
+ ExhaustedSubscription subscription = new ExhaustedSubscription();
+ // As per 1.9, this method must return normally (i.e. not throw).
+ try
+ {
+ // As per rule 1.9, the only legal way to signal about Subscriber rejection
+ // is by calling onError (after calling onSubscribe).
+ subscriber.onSubscribe(subscription);
+ subscriber.onError(new IllegalStateException("Content.Source was exhausted."));
+ }
+ catch (Throwable err)
+ {
+ // As per rule 2.13, we MUST consider subscription cancelled and
+ // MUST raise this error condition in a fashion that is adequate for the runtime environment.
+ if (LOG.isTraceEnabled())
+ LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
+ }
+ }
+
+ private static final class ExhaustedSubscription implements Flow.Subscription
+ {
@Override
public void request(long n)
{
- boolean process = false;
- Throwable failure = null;
- try (AutoLock ignored = lock.lock())
- {
- if (cancelled || terminated)
- return;
- if (n <= 0)
- {
- terminated = true;
- failure = new IllegalArgumentException("invalid demand " + n);
- }
- demand = MathUtils.cappedAdd(demand, n);
- if (stalled)
- {
- stalled = false;
- process = true;
- }
- }
- if (failure != null)
- subscriber.onError(failure);
- else if (process)
- process();
+ // As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
}
@Override
public void cancel()
{
- try (AutoLock ignored = lock.lock())
- {
- cancelled = true;
- }
+ // As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
+ }
+ }
+
+ private static final class ActiveSubscription extends IteratingCallback implements Flow.Subscription
+ {
+ private static final long NO_MORE_DEMAND = -1;
+ private static final Throwable COMPLETED = new StaticException("Source.Content read fully");
+ private final AtomicReference cancelled;
+ private final AtomicLong demand;
+ private Content.Source content;
+ private Flow.Subscriber super Content.Chunk> subscriber;
+
+ public ActiveSubscription(Content.Source content, Flow.Subscriber super Content.Chunk> subscriber)
+ {
+ this.cancelled = new AtomicReference<>(null);
+ this.demand = new AtomicLong(0);
+ this.content = content;
+ this.subscriber = subscriber;
}
- private void process()
+ // As per rule 3.3, Subscription MUST place an upper bound on possible synchronous
+ // recursion between Publisher and Subscriber
+ //
+ // As per rule 1.3, onSubscribe, onNext, onError and onComplete signaled to a
+ // Subscriber MUST be signaled serially.
+ //
+ // IteratingCallback guarantee that process() method will be executed by one thread only.
+ // The process() method can be only initiated from request() or cancel() demands methods.
+ @Override
+ protected Action process()
{
- while (true)
+ Throwable cancelled = this.cancelled.get();
+ if (cancelled != null)
{
- try (AutoLock ignored = lock.lock())
+ // As per rule 3.13, Subscription.cancel() MUST request the Publisher to eventually
+ // drop any references to the corresponding subscriber.
+ this.demand.set(NO_MORE_DEMAND);
+ if (cancelled != COMPLETED)
+ this.content.fail(cancelled);
+ this.content = null;
+ try
{
- if (demand > 0)
- {
- --demand;
- }
- else
- {
- stalled = true;
- return;
- }
+ if (cancelled == COMPLETED)
+ this.subscriber.onComplete();
+ else if (!(cancelled instanceof SuppressedException))
+ this.subscriber.onError(cancelled);
}
-
- Content.Chunk chunk = content.read();
-
- if (chunk == null)
+ catch (Throwable err)
{
- try (AutoLock ignored = lock.lock())
- {
- // Restore the demand decremented above.
- ++demand;
- stalled = true;
- }
- content.demand(this::process);
- return;
+ if (LOG.isTraceEnabled())
+ LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
}
+ this.subscriber = null;
+ return Action.SUCCEEDED;
+ }
- if (Content.Chunk.isFailure(chunk))
- {
- terminate();
- if (!chunk.isLast())
- content.fail(chunk.getFailure());
- subscriber.onError(chunk.getFailure());
- return;
- }
+ Content.Chunk chunk = content.read();
- subscriber.onNext(chunk);
+ if (chunk == null)
+ {
+ content.demand(this::succeeded);
+ return Action.SCHEDULED;
+ }
+
+ if (Content.Chunk.isFailure(chunk))
+ {
+ cancel(chunk.getFailure());
chunk.release();
-
- if (chunk.isLast())
- {
- terminate();
- // Reactive Stream specification rule 2.9 allows Publishers to call onComplete()
- // even without demand, and Subscribers must be prepared to handle this case.
- subscriber.onComplete();
- return;
- }
+ return Action.IDLE;
}
+
+ try
+ {
+ this.subscriber.onNext(chunk);
+ }
+ catch (Throwable err)
+ {
+ cancel(new SuppressedException(err));
+ if (LOG.isTraceEnabled())
+ LOG.trace("Flow.Subscriber " + subscriber + " violated rule 2.13", err);
+ }
+ chunk.release();
+
+ if (chunk.isLast())
+ {
+ cancel(COMPLETED);
+ return Action.IDLE;
+ }
+
+ if (demand.decrementAndGet() > 0)
+ this.iterate();
+
+ return Action.IDLE;
}
- private void terminate()
+ @Override
+ public void request(long n)
{
- try (AutoLock ignored = lock.lock())
+ // As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
+ if (cancelled.get() != null)
+ return;
+
+ // As per rule 3.9, MUST signal onError with a java.lang.IllegalArgumentException if the argument is <= 0.
+ if (n <= 0L)
{
- terminated = true;
+ String errorMsg = "Flow.Subscriber " + subscriber + " violated rule 3.9: non-positive requests are not allowed.";
+ cancel(new IllegalArgumentException(errorMsg));
+ return;
}
+
+ // As per rule 3.17, when demand overflows `Long.MAX_VALUE`
+ // we treat the signalled demand as "effectively unbounded"
+ if (demand.updateAndGet(it -> it == NO_MORE_DEMAND ? it : MathUtils.cappedAdd(it, n)) != NO_MORE_DEMAND)
+ this.iterate();
+ }
+
+ @Override
+ public void cancel()
+ {
+ cancel(new CancelledException());
+ }
+
+ public void cancel(Throwable cause)
+ {
+ // As per rules 3.6 and 3.7, after the Subscription is cancelled all operations MUST be NOPs.
+ //
+ // As per rule 3.5, this handles cancellation requests, and is idempotent, thread-safe and not
+ // synchronously performing heavy computations
+ if (cancelled.compareAndSet(null, cause))
+ this.iterate();
+ }
+
+ // Publisher notes
+ //
+ // 1.6 If a Publisher signals either onError or onComplete on a Subscriber,
+ // that Subscriber’s Subscription MUST be considered cancelled.
+ // 2.4 Subscriber.onComplete() and Subscriber.onError(Throwable t) MUST consider the
+ // Subscription cancelled after having received the signal.
+ //
+ // Publisher failed -> cancel(Throwable)
+ // 1.4 If a Publisher fails it MUST signal an onError.
+ //
+ // Publisher succeeded -> cancel(COMPLETED)
+ // 1.5 If a Publisher terminates successfully (finite stream) it MUST signal an onComplete.
+
+ // Subscriber
+ // 2.13 In the case that this rule is violated, any associated Subscription to the Subscriber
+ // MUST be considered as cancelled, and the caller MUST raise this error condition in a
+ // fashion that is adequate for the runtime environment.
+ //
+ // Subscriber.onSubscribe/onNext/onError/onComplete failed -> cancel(new Suppressed(cause))
+
+ // Subscription notes
+ //
+ // Subscription.cancel -> cancel(new Cancelled())
+ // It's not clearly specified in the specification, but according to:
+ // - the issue: https://github.com/reactive-streams/reactive-streams-jvm/issues/458
+ // - TCK test 'untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals'
+ // - 1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
+ //
+ // Subscription.request with negative argument -> cancel(err)
+ // 3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST signal onError with a
+ // java.lang.IllegalArgumentException if the argument is <= 0.
+ }
+
+ private static class SuppressedException extends Exception
+ {
+ SuppressedException(String message)
+ {
+ super(message);
+ }
+
+ SuppressedException(Throwable cause)
+ {
+ super(cause.getMessage(), cause);
+ }
+ }
+
+ private static class CancelledException extends SuppressedException
+ {
+ CancelledException()
+ {
+ super("Subscription was cancelled");
}
}
}
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/content/ContentSourcePublisherTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/content/ContentSourcePublisherTest.java
new file mode 100644
index 00000000000..415e6ed2098
--- /dev/null
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/content/ContentSourcePublisherTest.java
@@ -0,0 +1,251 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io.content;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.io.Content;
+import org.reactivestreams.tck.TestEnvironment;
+import org.reactivestreams.tck.flow.FlowPublisherVerification;
+import org.testng.annotations.Test;
+
+@Test
+public final class ContentSourcePublisherTest extends FlowPublisherVerification
+{
+ public ContentSourcePublisherTest()
+ {
+ super(new TestEnvironment());
+ }
+
+ @Override
+ public Flow.Publisher createFlowPublisher(long elements)
+ {
+ Content.Source source = new SyntheticContentSource(elements);
+ return new ContentSourcePublisher(source);
+ }
+
+ @Override
+ public Flow.Publisher createFailedFlowPublisher()
+ {
+ Content.Source source = new SyntheticContentSource(0);
+ Flow.Publisher publisher = new ContentSourcePublisher(source);
+ // Simulate exhausted Content.Source
+ publisher.subscribe(new Flow.Subscriber<>()
+ {
+ @Override
+ public void onSubscribe(Flow.Subscription subscription)
+ {
+ subscription.cancel();
+ }
+
+ @Override
+ public void onNext(Content.Chunk item)
+ {
+ }
+
+ @Override
+ public void onError(Throwable throwable)
+ {
+ }
+
+ @Override
+ public void onComplete()
+ {
+ }
+ });
+ return publisher;
+ }
+
+ private static final class SyntheticContentSource implements Content.Source
+ {
+ private final AtomicReference state;
+ private final long contentSize;
+
+ public SyntheticContentSource(long chunksToRead)
+ {
+ this.state = new AtomicReference<>(new State.Reading(chunksToRead));
+ this.contentSize = State.Reading.chunkSize * Math.max(chunksToRead, 0);
+ }
+
+ @Override
+ public long getLength()
+ {
+ return contentSize;
+ }
+
+ @Override
+ public Content.Chunk read()
+ {
+ return state.getAndUpdate(before -> before.read()).chunk();
+ }
+
+ @Override
+ public void demand(Runnable demandCallback)
+ {
+ // recursive stack overflow not necessary for this test
+ demandCallback.run();
+ }
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ fail(failure, true);
+ }
+
+ @Override
+ public void fail(Throwable failure, boolean last)
+ {
+ state.getAndUpdate(before -> before.fail(failure, last));
+ }
+
+ @Override
+ public boolean rewind()
+ {
+ return false;
+ }
+
+ private sealed interface State permits State.Reading, State.ReadFailed, State.ReadCompleted
+ {
+ Content.Chunk chunk();
+
+ State read();
+
+ State fail(Throwable failure, boolean last);
+
+ final class Reading implements State
+ {
+ public static final int chunkSize = 16;
+ private static final Random random = new Random();
+
+ private final long chunksToRead;
+ private final Content.Chunk chunk;
+
+ public Reading(long chunksToRead)
+ {
+ this.chunksToRead = chunksToRead;
+ this.chunk = generateValidChunk(chunksToRead);
+ }
+
+ public Reading(long chunksToRead, Throwable transientFailure)
+ {
+ this.chunksToRead = chunksToRead;
+ this.chunk = generateFailureChunk(transientFailure);
+ }
+
+ @Override
+ public Content.Chunk chunk()
+ {
+ return chunk;
+ }
+
+ @Override
+ public State read()
+ {
+ long leftToRead = leftToRead();
+ if (leftToRead <= 0)
+ return new ReadCompleted();
+ return new Reading(leftToRead);
+ }
+
+ @Override
+ public State fail(Throwable failure, boolean last)
+ {
+ if (last)
+ return new ReadFailed(failure);
+ return new Reading(chunksToRead, failure);
+ }
+
+ private long leftToRead()
+ {
+ if (chunksToRead == Long.MAX_VALUE) // endless source
+ return chunksToRead;
+ return chunksToRead - 1;
+ }
+
+ private static Content.Chunk generateFailureChunk(Throwable transientFailure)
+ {
+ return Content.Chunk.from(transientFailure, false);
+ }
+
+ private static Content.Chunk generateValidChunk(long chunksToRead)
+ {
+ if (chunksToRead <= 0)
+ return Content.Chunk.EOF;
+ if (chunksToRead == 1)
+ return Content.Chunk.from(randomPayload(), true);
+ return Content.Chunk.from(randomPayload(), false);
+ }
+
+ private static ByteBuffer randomPayload()
+ {
+ byte[] payload = new byte[chunkSize];
+ random.nextBytes(payload);
+ return ByteBuffer.wrap(payload);
+ }
+ }
+
+ final class ReadFailed implements State
+ {
+ private final Content.Chunk chunk;
+
+ public ReadFailed(Throwable failure)
+ {
+ this.chunk = Content.Chunk.from(failure, true);
+ }
+
+ @Override
+ public Content.Chunk chunk()
+ {
+ return chunk;
+ }
+
+ @Override
+ public State read()
+ {
+ return this;
+ }
+
+ @Override
+ public State fail(Throwable failure, boolean last)
+ {
+ return this;
+ }
+ }
+
+ final class ReadCompleted implements State
+ {
+ @Override
+ public Content.Chunk chunk()
+ {
+ return Content.Chunk.EOF;
+ }
+
+ @Override
+ public State read()
+ {
+ return this;
+ }
+
+ @Override
+ public State fail(Throwable failure, boolean last)
+ {
+ return this;
+ }
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index b1cd6d3139a..1b4fa90a607 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,6 +300,7 @@
4.0.3
2023-06-05T23:12:49Z
UTF-8
+ 1.0.4
src/it/settings.xml
2.0.12
1.3.7
@@ -308,6 +309,7 @@
0
1.8.3
1.19.7
+ 7.10.2
3.0.0
2.16.2
1.7.0.Final
@@ -1155,6 +1157,11 @@
osgi.core
${org.osgi.core.version}
+
+ org.reactivestreams
+ reactive-streams-tck-flow
+ ${reactive-streams.version}
+
org.slf4j
jcl104-over-slf4j
@@ -1191,6 +1198,11 @@
+
+ org.testng
+ testng
+ ${testng.version}
+
org.wildfly.common
wildfly-common