Experiment/jetty 12 chunk isError and warnings (#9904)

* Remove usage of instanceof Content.Chunk.Error
* Updated AsyncContent to accept a transient failures
* Updated AsyncContent to accept a transient failure with inputstream
This commit is contained in:
Greg Wilkins 2023-06-23 09:17:15 +02:00 committed by GitHub
parent 9a38e3ba41
commit a3e82326cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 525 additions and 211 deletions

View File

@ -0,0 +1,153 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.docs.programming;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("unused")
public class ContentDocs
{
private static final Logger LOG = LoggerFactory.getLogger(ContentDocs.class);
// tag::echo[]
public void echo(Content.Source source, Content.Sink sink, Callback callback)
{
Callback echo = new Callback()
{
private Content.Chunk chunk;
public void succeeded()
{
// release any previous chunk
if (chunk != null)
{
chunk.release();
// complete if it was the last
if (chunk.isLast())
{
callback.succeeded();
return;
}
}
while (true)
{
// read the next chunk
chunk = source.read();
if (chunk == null)
{
// if no chunk, demand more and call succeeded when demand is met.
source.demand(this::succeeded);
return;
}
if (Content.Chunk.isFailure(chunk, true))
{
// if it is a persistent failure, then fail the callback
callback.failed(chunk.getFailure());
return;
}
if (chunk.hasRemaining() || chunk.isLast())
{
// if chunk has content or is last, write it to the sink and resume this loop in callback
sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
return;
}
chunk.release();
}
}
public void failed(Throwable x)
{
source.fail(x);
callback.failed(x);
}
};
source.demand(echo::succeeded);
}
// tag::echo[]
public static void testEcho() throws Exception
{
AsyncContent source = new AsyncContent();
AsyncContent sink = new AsyncContent();
Callback.Completable echoCallback = new Callback.Completable();
new ContentDocs().echo(source, sink, echoCallback);
Content.Chunk chunk = sink.read();
if (chunk != null)
throw new IllegalStateException("No chunk expected yet");
FutureCallback onContentAvailable = new FutureCallback();
sink.demand(onContentAvailable::succeeded);
if (onContentAvailable.isDone())
throw new IllegalStateException("No demand expected yet");
Callback.Completable writeCallback = new Callback.Completable();
Content.Sink.write(source, false, "One", writeCallback);
if (writeCallback.isDone())
throw new IllegalStateException("Should wait until first chunk is consumed");
onContentAvailable.get();
chunk = sink.read();
if (!"One".equals(BufferUtil.toString(chunk.getByteBuffer())))
throw new IllegalStateException("first chunk is expected");
if (writeCallback.isDone())
throw new IllegalStateException("Should wait until first chunk is consumed");
chunk.release();
writeCallback.get();
writeCallback = new Callback.Completable();
Content.Sink.write(source, true, "Two", writeCallback);
if (writeCallback.isDone())
throw new IllegalStateException("Should wait until second chunk is consumed");
onContentAvailable = new FutureCallback();
sink.demand(onContentAvailable::succeeded);
if (!onContentAvailable.isDone())
throw new IllegalStateException("Demand expected for second chunk");
chunk = sink.read();
if (!"Two".equals(BufferUtil.toString(chunk.getByteBuffer())))
throw new IllegalStateException("second chunk is expected");
chunk.release();
writeCallback.get();
onContentAvailable = new FutureCallback();
sink.demand(onContentAvailable::succeeded);
if (!onContentAvailable.isDone())
throw new IllegalStateException("Demand expected for EOF");
chunk = sink.read();
if (!chunk.isLast())
throw new IllegalStateException("EOF expected");
}
public static void main(String... args) throws Exception
{
testEcho();
}
}

View File

@ -175,9 +175,9 @@ public interface Response
contentSource.demand(demandCallback);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
response.abort(error.getCause());
response.abort(chunk.getFailure());
return;
}
if (chunk.isLast() && !chunk.hasRemaining())

View File

@ -554,7 +554,7 @@ public abstract class HttpReceiver
_chunk = inputChunk;
if (_chunk == null)
return null;
if (_chunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(_chunk))
return _chunk;
// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
@ -748,7 +748,7 @@ public abstract class HttpReceiver
LOG.debug("Erroring {}", this);
try (AutoLock ignored = lock.lock())
{
if (currentChunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(currentChunk))
return false;
if (currentChunk != null)
currentChunk.release();

View File

@ -503,8 +503,8 @@ public abstract class HttpSender
}
}
if (chunk instanceof Content.Chunk.Error error)
throw error.getCause();
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
ByteBuffer buffer = chunk.getByteBuffer();
contentBuffer = buffer.asReadOnlyBuffer();

View File

@ -676,7 +676,7 @@ public class ResponseListeners
Content.Chunk currentChunk = chunk;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} fail while current chunk is {}", index, currentChunk);
if (currentChunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(currentChunk))
return;
if (currentChunk != null)
currentChunk.release();

View File

@ -176,8 +176,8 @@ public class ConnectionPoolTest
continue;
}
}
if (chunk instanceof Content.Chunk.Error error)
throw error.getCause();
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
if (chunk.hasRemaining())
{

View File

@ -30,7 +30,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
@ -248,11 +247,11 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
.onResponseContentSource((response, contentSource) -> response.abort(new Throwable()).whenComplete((failed, x) ->
{
Content.Chunk chunk = contentSource.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
contentSource.demand(() ->
{
Content.Chunk c = contentSource.read();
assertInstanceOf(Content.Chunk.Error.class, c);
assertTrue(Content.Chunk.isFailure(c, true));
errorContentLatch.countDown();
});
}))

View File

@ -197,7 +197,7 @@ public class HttpStreamOverFCGI implements HttpStream
{
if (_chunk == null)
_chunk = Content.Chunk.EOF;
else if (!_chunk.isLast() && !(_chunk instanceof Content.Chunk.Error))
else if (!_chunk.isLast() && !(Content.Chunk.isFailure(_chunk)))
throw new IllegalStateException();
}

View File

@ -563,7 +563,7 @@ public class MultiPart
private State state = State.FIRST;
private boolean closed;
private Runnable demand;
private Content.Chunk.Error errorChunk;
private Content.Chunk errorChunk;
private Part part;
public AbstractContentSource(String boundary)
@ -759,7 +759,7 @@ public class MultiPart
case CONTENT ->
{
Content.Chunk chunk = part.getContentSource().read();
if (chunk == null || chunk instanceof Content.Chunk.Error)
if (chunk == null || Content.Chunk.isFailure(chunk))
yield chunk;
if (!chunk.isLast())
yield chunk;

View File

@ -101,9 +101,9 @@ public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.P
content.demand(this);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
listener.onFailure(error.getCause());
listener.onFailure(chunk.getFailure());
return;
}
parse(chunk);

View File

@ -117,9 +117,9 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
content.demand(this);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
listener.onFailure(error.getCause());
listener.onFailure(chunk.getFailure());
return;
}
parse(chunk);

View File

@ -607,9 +607,9 @@ public class IdleTimeoutTest extends AbstractTest
_request.demand(this::onContentAvailable);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
_callback.failed(error.getCause());
_callback.failed(chunk.getFailure());
return;
}
chunk.release();

View File

@ -53,8 +53,9 @@ public class Content
/**
* <p>Copies the given content source to the given content sink, notifying
* the given callback when the copy is complete (either succeeded or failed).</p>
* <p>In case of failures, the content source is {@link Source#fail(Throwable) failed}
* too.</p>
* <p>In case of {@link Chunk#getFailure() failure chunks},
* the content source is {@link Source#fail(Throwable) failed} if the failure
* chunk is {@link Chunk#isLast() last}, else the failing is transient and is ignored.</p>
*
* @param source the source to copy from
* @param sink the sink to copy to
@ -76,6 +77,9 @@ public class Content
* <p>If the predicate returns {@code false}, it means that the chunk is not
* handled, its callback will not be completed, and the implementation will
* handle the chunk and its callback.</p>
* <p>In case of {@link Chunk#getFailure() failure chunks} not handled by any {@code chunkHandler},
* the content source is {@link Source#fail(Throwable) failed} if the failure
* chunk is {@link Chunk#isLast() last}, else the failure is transient and is ignored.</p>
*
* @param source the source to copy from
* @param sink the sink to copy to
@ -103,10 +107,11 @@ public class Content
* return;
* }
*
* // The chunk is an error.
* if (chunk instanceof Chunk.Error error) {
* // Handle the error.
* Throwable cause = error.getCause();
* // The chunk is a failure.
* if (Content.Chunk.isFailure(chunk)) {
* // Handle the failure.
* Throwable cause = chunk.getFailure();
* boolean transient = !chunk.isLast();
* // ...
* return;
* }
@ -190,7 +195,7 @@ public class Content
* @return the String obtained from the content
* @throws IOException if reading the content fails
*/
public static String asString(Source source, Charset charset) throws IOException
static String asString(Source source, Charset charset) throws IOException
{
try
{
@ -227,12 +232,12 @@ public class Content
}
/**
* <p>Reads, non-blocking, the given content source, until either an error or EOF,
* <p>Reads, non-blocking, the given content source, until a {@link Chunk#isFailure(Chunk) failure} or EOF
* and discards the content.</p>
*
* @param source the source to read from
* @param callback the callback to notify when the whole content has been read
* or an error occurred while reading the content
* or a failure occurred while reading the content
*/
static void consumeAll(Source source, Callback callback)
{
@ -240,7 +245,7 @@ public class Content
}
/**
* <p>Reads, blocking if necessary, the given content source, until either an error
* <p>Reads, blocking if necessary, the given content source, until a {@link Chunk#isFailure(Chunk) failure}
* or EOF, and discards the content.</p>
*
* @param source the source to read from
@ -274,13 +279,15 @@ public class Content
* <p>The returned chunk could be:</p>
* <ul>
* <li>{@code null}, to signal that there isn't a chunk of content available</li>
* <li>an {@link Chunk.Error error} instance, to signal that there was an error
* <li>an {@link Chunk} instance with non null {@link Chunk#getFailure()}, to signal that there was a failure
* trying to produce a chunk of content, or that the content production has been
* {@link #fail(Throwable) failed} externally</li>
* <li>a {@link Chunk} instance, containing the chunk of content.</li>
* </ul>
* <p>Once a read returns an {@link Chunk.Error error} instance, further reads
* will continue to return the same error instance.</p>
* <p>Once a read returns an {@link Chunk} instance with non-null {@link Chunk#getFailure()}
* then if the failure is {@link Chunk#isLast() last} further reads
* will continue to return the same failure chunk instance, otherwise further
* {@code read()} operations may return different non-failure chunks.</p>
* <p>Once a read returns a {@link Chunk#isLast() last chunk}, further reads will
* continue to return a last chunk (although the instance may be different).</p>
* <p>The content reader code must ultimately arrange for a call to
@ -296,7 +303,7 @@ public class Content
* race condition (the thread that reads with the thread that invokes the
* demand callback).</p>
*
* @return a chunk of content, possibly an error instance, or {@code null}
* @return a chunk of content, possibly a failure instance, or {@code null}
* @see #demand(Runnable)
* @see Retainable
*/
@ -327,18 +334,38 @@ public class Content
void demand(Runnable demandCallback);
/**
* <p>Fails this content source, possibly failing and discarding accumulated
* content chunks that were not yet read.</p>
* <p>Fails this content source with a {@link Chunk#isLast() last} {@link Chunk#getFailure() failure chunk},
* failing and discarding accumulated content chunks that were not yet read.</p>
* <p>The failure may be notified to the content reader at a later time, when
* the content reader reads a content chunk, via an {@link Chunk.Error} instance.</p>
* the content reader reads a content chunk, via a {@link Chunk} instance
* with a non null {@link Chunk#getFailure()}.</p>
* <p>If {@link #read()} has returned a last chunk, this is a no operation.</p>
* <p>Typical failure: the content being aborted by user code, or idle timeouts.</p>
* <p>If this method has already been called, then it is a no operation.</p>
*
* @param failure the cause of the failure
* @see Chunk#getFailure()
*/
void fail(Throwable failure);
/**
* <p>Fails this content source with a {@link Chunk#getFailure() failure chunk}
* that may or not may be {@link Chunk#isLast() last}.
* If {@code last} is {@code true}, then the failure is persistent and a call to this method acts
* as {@link #fail(Throwable)}. Otherwise the failure is transient and a
* {@link Chunk#getFailure() failure chunk} will be {@link #read() read} in order with content chunks,
* and subsequent calls to {@link #read() read} may produce other content.</p>
* <p>A {@code Content.Source} or its {@link #read() reader} may treat a transient failure as persistent.</p>
*
* @param failure A failure.
* @param last true if the failure is persistent, false if the failure is transient.
* @see Chunk#getFailure()
*/
default void fail(Throwable failure, boolean last)
{
fail(failure);
}
/**
* <p>Rewinds this content, if possible, so that subsequent reads return
* chunks starting from the beginning of this content.</p>
@ -555,14 +582,52 @@ public class Content
}
/**
* <p>Creates an {@link Error error chunk} with the given failure.</p>
* <p>Creates an {@link Chunk#isFailure(Chunk) failure chunk} with the given failure
* and {@link Chunk#isLast()} returning true.</p>
*
* @param failure the cause of the failure
* @return a new Error.Chunk
* @return a new {@link Chunk#isFailure(Chunk) failure chunk}
*/
static Error from(Throwable failure)
static Chunk from(Throwable failure)
{
return new Error(failure);
return from(failure, true);
}
/**
* <p>Creates an {@link Chunk#isFailure(Chunk) failure chunk} with the given failure
* and given {@link Chunk#isLast() last} state.</p>
*
* @param failure the cause of the failure
* @param last true if the failure is terminal, else false for transient failure
* @return a new {@link Chunk#isFailure(Chunk) failure chunk}
*/
static Chunk from(Throwable failure, boolean last)
{
return new Chunk()
{
public Throwable getFailure()
{
return failure;
}
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public boolean isLast()
{
return last;
}
@Override
public String toString()
{
return String.format("Chunk@%x{c=%s,l=%b}", hashCode(), failure, last);
}
};
}
/**
@ -581,8 +646,12 @@ public class Content
* <td>{@code null}</td>
* </tr>
* <tr>
* <td>{@link Chunk#isFailure(Chunk) Failure} and {@link Chunk#isLast() last}</td>
* <td>{@link Error Error}</td>
* <td>{@link Error Error}</td>
* </tr>
* <tr>
* <td>{@link Chunk#isFailure(Chunk) Failure} and {@link Chunk#isLast() not last}</td>
* <td>{@code null}</td>
* </tr>
* <tr>
* <td>{@link #isLast()}</td>
@ -597,18 +666,57 @@ public class Content
*/
static Chunk next(Chunk chunk)
{
if (chunk == null || chunk instanceof Error)
return chunk;
if (chunk == null)
return null;
if (Content.Chunk.isFailure(chunk))
return chunk.isLast() ? chunk : null;
if (chunk.isLast())
return EOF;
return null;
}
/**
* @param chunk The chunk to test for an {@link Chunk#getFailure() failure}.
* @return True if the chunk is non-null and {@link Chunk#getFailure() chunk.getError()} returns non-null.
*/
static boolean isFailure(Chunk chunk)
{
return chunk != null && chunk.getFailure() != null;
}
/**
* @param chunk The chunk to test for an {@link Chunk#getFailure() failure}
* @param last The {@link Chunk#isLast() last} status to test for.
* @return True if the chunk is non-null and {@link Chunk#getFailure()} returns non-null
* and {@link Chunk#isLast()} matches the passed status.
*/
static boolean isFailure(Chunk chunk, boolean last)
{
return chunk != null && chunk.getFailure() != null && chunk.isLast() == last;
}
/**
* @return the ByteBuffer of this Chunk
*/
ByteBuffer getByteBuffer();
/**
* Get a failure (which may be from a {@link Source#fail(Throwable) failure} or
* a {@link Source#fail(Throwable, boolean) warning}), if any, associated with the chunk.
* <ul>
* <li>A {@code chunk} must not have a failure and a {@link #getByteBuffer()} with content.</li>
* <li>A {@code chunk} with a failure may or may not be {@link #isLast() last}.</li>
* <li>A {@code chunk} with a failure must not be {@link #canRetain() retainable}.</li>
* </ul>
* @return A {@link Throwable} indicating the failure or null if there is no failure or warning.
* @see Source#fail(Throwable)
* @see Source#fail(Throwable, boolean)
*/
default Throwable getFailure()
{
return null;
}
/**
* @return whether this is the last Chunk
*/
@ -674,46 +782,6 @@ public class Content
return asChunk(getByteBuffer().asReadOnlyBuffer(), isLast(), this);
}
/**
* <p>A chunk that wraps a failure.</p>
* <p>Error Chunks are always last and have no bytes to read,
* as such they are <em>terminal</em> Chunks.</p>
*
* @see #from(Throwable)
*/
final class Error implements Chunk
{
private final Throwable cause;
private Error(Throwable cause)
{
this.cause = cause;
}
public Throwable getCause()
{
return cause;
}
@Override
public ByteBuffer getByteBuffer()
{
return BufferUtil.EMPTY_BUFFER;
}
@Override
public boolean isLast()
{
return true;
}
@Override
public String toString()
{
return String.format("%s@%x{c=%s}", getClass().getSimpleName(), hashCode(), cause);
}
}
/**
* <p>Implementations of this interface may process {@link Chunk}s being copied by the
* {@link Content#copy(Source, Sink, Processor, Callback)} method, so that

View File

@ -51,8 +51,8 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final SerializedInvoker invoker = new SerializedInvoker();
private final Queue<AsyncChunk> chunks = new ArrayDeque<>();
private Content.Chunk.Error errorChunk;
private final Queue<Content.Chunk> chunks = new ArrayDeque<>();
private Content.Chunk persistentFailure;
private boolean readClosed;
private boolean writeClosed;
private Runnable demandCallback;
@ -62,7 +62,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* {@inheritDoc}
* <p>The write completes:</p>
* <ul>
* <li>immediately with a failure when this instance is closed or already in error</li>
* <li>immediately with a failure when this instance is closed or already has a failure</li>
* <li>successfully when a non empty {@link Content.Chunk} returned by {@link #read()} is released</li>
* <li>successfully just before the {@link Content.Chunk} is returned by {@link #read()},
* for any empty chunk {@link Content.Chunk}.</li>
@ -79,7 +79,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
* or succeeded if and only if the chunk is terminal, as non-terminal
* chunks have to bind the succeeding of the callback to their release.
*/
private void offer(AsyncChunk chunk)
private void offer(Content.Chunk chunk)
{
Throwable failure = null;
boolean wasEmpty = false;
@ -89,9 +89,9 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
failure = new IOException("closed");
}
else if (errorChunk != null)
else if (persistentFailure != null)
{
failure = errorChunk.getCause();
failure = persistentFailure.getFailure();
}
else
{
@ -105,14 +105,14 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
if (length == UNDETERMINED_LENGTH)
{
length = 0;
for (AsyncChunk c : chunks)
for (Content.Chunk c : chunks)
length += c.remaining();
}
}
}
}
if (failure != null)
chunk.failed(failure);
if (failure != null && chunk instanceof AsyncChunk asyncChunk)
asyncChunk.failed(failure);
if (wasEmpty)
invoker.run(this::invokeDemandCallback);
}
@ -125,14 +125,14 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
// Always wrap the exception to make sure
// the stack trace comes from flush().
if (errorChunk != null)
throw new IOException(errorChunk.getCause());
if (persistentFailure != null)
throw new IOException(persistentFailure.getFailure());
if (chunks.isEmpty())
return;
// Special case for a last empty chunk that may not be read.
if (writeClosed && chunks.size() == 1)
{
AsyncChunk chunk = chunks.peek();
Content.Chunk chunk = chunks.peek();
if (chunk.isLast() && !chunk.hasRemaining())
return;
}
@ -171,7 +171,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
@Override
public Content.Chunk read()
{
AsyncChunk current;
Content.Chunk current;
try (AutoLock.WithCondition condition = lock.lock())
{
if (length == UNDETERMINED_LENGTH)
@ -181,8 +181,8 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
if (readClosed)
return Content.Chunk.EOF;
if (errorChunk != null)
return errorChunk;
if (persistentFailure != null)
return persistentFailure;
return null;
}
readClosed = current.isLast();
@ -195,7 +195,12 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
return current;
// If the chunk is not reference counted, we can succeed it now and return a chunk with a noop release.
current.succeeded();
if (current instanceof AsyncChunk asyncChunk)
asyncChunk.succeeded();
if (Content.Chunk.isFailure(current))
return current;
return current.isLast() ? Content.Chunk.EOF : Content.Chunk.EMPTY;
}
@ -208,7 +213,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
if (this.demandCallback != null)
throw new IllegalStateException("demand pending");
this.demandCallback = Objects.requireNonNull(demandCallback);
invoke = !chunks.isEmpty() || readClosed || errorChunk != null;
invoke = !chunks.isEmpty() || readClosed || persistentFailure != null;
}
if (invoke)
invoker.run(this::invokeDemandCallback);
@ -241,22 +246,35 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
@Override
public void fail(Throwable failure)
{
List<AsyncChunk> drained;
List<Content.Chunk> drained;
try (AutoLock.WithCondition condition = lock.lock())
{
if (readClosed)
return;
if (errorChunk != null)
if (persistentFailure != null)
return;
errorChunk = Content.Chunk.from(failure);
persistentFailure = Content.Chunk.from(failure);
drained = List.copyOf(chunks);
chunks.clear();
condition.signal();
}
drained.forEach(ac -> ac.failed(failure));
drained.forEach(c ->
{
if (c instanceof AsyncChunk ac)
ac.failed(failure);
});
invoker.run(this::invokeDemandCallback);
}
@Override
public void fail(Throwable failure, boolean last)
{
if (last)
fail(failure);
else
offer(Content.Chunk.from(failure, false));
}
public int count()
{
try (AutoLock ignored = lock.lock())

View File

@ -54,8 +54,12 @@ public class ContentSourceInputStream extends InputStream
{
if (chunk != null)
{
if (chunk instanceof Content.Chunk.Error error)
throw IO.rethrow(error.getCause());
if (Content.Chunk.isFailure(chunk))
{
Content.Chunk c = chunk;
chunk = null;
throw IO.rethrow(c.getFailure());
}
ByteBuffer byteBuffer = chunk.getByteBuffer();
if (chunk.isLast() && !byteBuffer.hasRemaining())
@ -96,8 +100,8 @@ public class ContentSourceInputStream extends InputStream
@Override
public void close()
{
// If we have already reached a real EOF or an error, close is a noop.
if (chunk == Content.Chunk.EOF || chunk instanceof Content.Chunk.Error)
// If we have already reached a real EOF or a persistent failure, close is a noop.
if (chunk == Content.Chunk.EOF || Content.Chunk.isFailure(chunk, true))
return;
// If we have a chunk here, then it needs to be released

View File

@ -125,10 +125,10 @@ public class ContentSourcePublisher implements Flow.Publisher<Content.Chunk>
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
terminate();
subscriber.onError(error.getCause());
subscriber.onError(chunk.getFailure());
return;
}

View File

@ -60,10 +60,10 @@ public abstract class ContentSourceTransformer implements Content.Source
return null;
}
if (rawChunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(rawChunk))
return rawChunk;
if (transformedChunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(transformedChunk))
return transformedChunk;
transformedChunk = process(rawChunk);
@ -142,13 +142,14 @@ public abstract class ContentSourceTransformer implements Content.Source
* <p>The input chunk is released as soon as this method returns, so
* implementations that must hold onto the input chunk must arrange to call
* {@link Content.Chunk#retain()} and its correspondent {@link Content.Chunk#release()}.</p>
* <p>Implementations should return an {@link Content.Chunk.Error error chunk} in case
* <p>Implementations should return an {@link Content.Chunk} with non-null
* {@link Content.Chunk#getFailure()} in case
* of transformation errors.</p>
* <p>Exceptions thrown by this method are equivalent to returning an error chunk.</p>
* <p>Implementations of this method may return:</p>
* <ul>
* <li>{@code null}, if more input chunks are necessary to produce an output chunk</li>
* <li>the {@code inputChunk} itself, typically in case of {@link Content.Chunk.Error}s,
* <li>the {@code inputChunk} itself, typically in case of non-null {@link Content.Chunk#getFailure()},
* or when no transformation is required</li>
* <li>a new {@link Content.Chunk} derived from {@code inputChunk}.</li>
* </ul>

View File

@ -40,7 +40,7 @@ public class InputStreamContentSource implements Content.Source
private final ByteBufferPool bufferPool;
private int bufferSize = 4096;
private Runnable demandCallback;
private Content.Chunk.Error errorChunk;
private Content.Chunk errorChunk;
private boolean closed;
public InputStreamContentSource(InputStream inputStream)

View File

@ -46,7 +46,7 @@ public class PathContentSource implements Content.Source
private SeekableByteChannel channel;
private long totalRead;
private Runnable demandCallback;
private Content.Chunk.Error errorChunk;
private Content.Chunk errorChunk;
public PathContentSource(Path path)
{

View File

@ -16,9 +16,13 @@ package org.eclipse.jetty.io.internal;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContentCopier extends IteratingNestedCallback
{
private static final Logger LOG = LoggerFactory.getLogger(ContentCopier.class);
private final Content.Source source;
private final Content.Sink sink;
private final Content.Chunk.Processor chunkProcessor;
@ -56,8 +60,15 @@ public class ContentCopier extends IteratingNestedCallback
if (chunkProcessor != null && chunkProcessor.process(current, this))
return Action.SCHEDULED;
if (current instanceof Error error)
throw error.getCause();
if (Content.Chunk.isFailure(current))
{
if (current.isLast())
throw current.getFailure();
if (LOG.isDebugEnabled())
LOG.debug("ignored transient failure", current.getFailure());
succeeded();
return Action.SCHEDULED;
}
sink.write(current.isLast(), current.getByteBuffer(), this);
return Action.SCHEDULED;

View File

@ -44,9 +44,9 @@ public class ContentSourceByteBuffer implements Runnable
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
promise.failed(error.getCause());
promise.failed(chunk.getFailure());
return;
}

View File

@ -41,9 +41,9 @@ public class ContentSourceConsumer implements Invocable.Task
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
callback.failed(error.getCause());
callback.failed(chunk.getFailure());
return;
}

View File

@ -42,9 +42,9 @@ public class ContentSourceString
content.demand(this::convert);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
promise.failed(error.getCause());
promise.failed(chunk.getFailure());
return;
}
text.append(chunk.getByteBuffer());

View File

@ -33,7 +33,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -101,7 +100,7 @@ public class AsyncContentTest
// We must read the error.
chunk = async.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
// Offering more should fail.
CountDownLatch failLatch = new CountDownLatch(1);
@ -209,14 +208,14 @@ public class AsyncContentTest
assertThat(chunk.release(), is(true));
callback1.assertNoFailureWithSuccesses(1);
Exception error1 = new Exception("test1");
async.fail(error1);
Exception failure1 = new Exception("test1");
async.fail(failure1);
chunk = async.read();
assertSame(error1, ((Content.Chunk.Error)chunk).getCause());
assertSame(failure1, chunk.getFailure());
callback2.assertSingleFailureSameInstanceNoSuccess(error1);
callback3.assertSingleFailureSameInstanceNoSuccess(error1);
callback2.assertSingleFailureSameInstanceNoSuccess(failure1);
callback3.assertSingleFailureSameInstanceNoSuccess(failure1);
}
}

View File

@ -15,8 +15,10 @@ package org.eclipse.jetty.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@ -27,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -49,13 +52,14 @@ import org.junit.jupiter.params.provider.MethodSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ContentSourceTest
{
@ -237,7 +241,7 @@ public class ContentSourceTest
// We must read the error.
chunk = source.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@ParameterizedTest
@ -259,7 +263,7 @@ public class ContentSourceTest
source.fail(new CancellationException());
Content.Chunk chunk = source.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
CountDownLatch latch = new CountDownLatch(1);
source.demand(latch::countDown);
@ -285,7 +289,7 @@ public class ContentSourceTest
});
chunk = source.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@Test
@ -560,4 +564,63 @@ public class ContentSourceTest
{
}
}
@Test
public void testAsyncContentWithWarnings()
{
AsyncContent content = new AsyncContent();
Content.Sink.write(content, false, "One", Callback.NOOP);
content.fail(new TimeoutException("test"), false);
Content.Sink.write(content, true, "Two", Callback.NOOP);
Content.Chunk chunk = content.read();
assertFalse(chunk.isLast());
assertFalse(Content.Chunk.isFailure(chunk));
assertThat(BufferUtil.toString(chunk.getByteBuffer()), is("One"));
chunk = content.read();
assertFalse(chunk.isLast());
assertTrue(Content.Chunk.isFailure(chunk));
assertThat(chunk.getFailure(), instanceOf(TimeoutException.class));
chunk = content.read();
assertTrue(chunk.isLast());
assertFalse(Content.Chunk.isFailure(chunk));
assertThat(BufferUtil.toString(chunk.getByteBuffer()), is("Two"));
}
@Test
public void testAsyncContentWithWarningsAsInputStream() throws Exception
{
AsyncContent content = new AsyncContent();
Content.Sink.write(content, false, "One", Callback.NOOP);
content.fail(new TimeoutException("test"), false);
Content.Sink.write(content, true, "Two", Callback.NOOP);
InputStream in = Content.Source.asInputStream(content);
byte[] buffer = new byte[1024];
int len = in.read(buffer);
assertThat(len, is(3));
assertThat(new String(buffer, 0, 3, StandardCharsets.ISO_8859_1), is("One"));
try
{
int ignored = in.read();
fail();
}
catch (IOException ioe)
{
assertThat(ioe.getCause(), instanceOf(TimeoutException.class));
}
len = in.read(buffer);
assertThat(len, is(3));
assertThat(new String(buffer, 0, 3, StandardCharsets.ISO_8859_1), is("Two"));
len = in.read(buffer);
assertThat(len, is(-1));
}
}

View File

@ -34,7 +34,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -259,11 +258,11 @@ public class ContentSourceTransformerTest
chunk.release();
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
// Trying to read again returns the error again.
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
// Make sure that the source is failed.
assertEquals(0, source.count());
@ -284,11 +283,11 @@ public class ContentSourceTransformerTest
chunk.release();
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
// Trying to read again returns the error again.
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@Test
@ -306,11 +305,11 @@ public class ContentSourceTransformerTest
source.fail(new IOException());
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
// Trying to read again returns the error again.
chunk = transformer.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertTrue(Content.Chunk.isFailure(chunk, true));
}
private static class WordSplitLowCaseTransformer extends ContentSourceTransformer

View File

@ -160,9 +160,9 @@ public class FormFields extends CompletableFuture<Fields> implements Runnable
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
completeExceptionally(error.getCause());
completeExceptionally(chunk.getFailure());
return;
}

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.util.StaticException;
/**
* A HttpStream is an abstraction that together with {@link MetaData.Request}, represents the
* flow of data from and to a single request and response cycle. It is roughly analogous to the
* Stream within a HTTP/2 connection, in that a connection can have many streams, each used once
* Stream within an HTTP/2 connection, in that a connection can have many streams, each used once
* and each representing a single request and response exchange.
*/
public interface HttpStream extends Callback
@ -42,7 +42,7 @@ public interface HttpStream extends Callback
/**
* @return an ID unique within the lifetime scope of the associated protocol connection.
* This may be a protocol ID (eg HTTP/2 stream ID) or it may be unrelated to the protocol.
* This may be a protocol ID (e.g. HTTP/2 stream ID) or it may be unrelated to the protocol.
*/
String getId();
@ -50,7 +50,7 @@ public interface HttpStream extends Callback
* <p>Reads a chunk of content, with the same semantic as {@link Content.Source#read()}.</p>
* <p>This method is called from the implementation of {@link Request#read()}.</p>
*
* @return a chunk of content, possibly an {@link Chunk.Error error} or {@code null}.
* @return a chunk of content, possibly with non-null {@link Chunk#getFailure()} or {@code null}.
*/
Content.Chunk read();
@ -125,8 +125,8 @@ public interface HttpStream extends Callback
content.release();
// if the input failed, then fail the stream for same reason
if (content instanceof Chunk.Error error)
return error.getCause();
if (Content.Chunk.isFailure(content))
return content.getFailure();
if (content.isLast())
return null;

View File

@ -69,12 +69,14 @@ import org.eclipse.jetty.util.thread.Invocable;
* return true;
* }
*
* if (chunk instanceof Content.Chunk.Error error)
* if (Content.Chunk.isError(chunk))
* {
* Throwable failure = error.getCause();
*
* // Handle errors.
* // Mark the handling as complete, either generating a custom
* // If the chunk is not last, then the error can be ignored and reading can be tried again.
* // Otherwise, if the chunk is last, or we do not wish to ignore a non-last error, then
* // mark the handling as complete, either generating a custom
* // response and succeeding the callback, or failing the callback.
* callback.failed(failure);
* return true;

View File

@ -342,9 +342,9 @@ public class DelayedHandler extends Handler.Wrapper
getRequest().demand(this::readAndParse);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
_formData.completeExceptionally(error.getCause());
_formData.completeExceptionally(chunk.getFailure());
return;
}
_formData.parse(chunk);

View File

@ -229,7 +229,7 @@ public abstract class EventsHandler extends Handler.Wrapper
* {@link Request#read()}).
*
* @param request the request object. The {@code read()}, {@code demand(Runnable)} and {@code fail(Throwable)} methods must not be called by the listener.
* @param chunk a potentially null request content chunk, including {@link org.eclipse.jetty.io.Content.Chunk.Error}
* @param chunk a potentially null request content chunk, including {@link org.eclipse.jetty.io.Content.Chunk#isFailure(Content.Chunk) error}
* and {@link org.eclipse.jetty.http.Trailers} chunks.
* If a reference to the chunk (or its {@link ByteBuffer}) is kept,
* then {@link Content.Chunk#retain()} must be called.

View File

@ -268,7 +268,7 @@ public class StatisticsHandler extends EventsHandler
protected class MinimumDataRateRequest extends Request.Wrapper
{
private Content.Chunk.Error _errorContent;
private Content.Chunk _errorContent;
private MinimumDataRateRequest(Request request)
{

View File

@ -159,7 +159,7 @@ public class GzipRequest extends Request.Wrapper
_chunk = inputChunk;
if (_chunk == null)
return null;
if (_chunk instanceof Content.Chunk.Error)
if (Content.Chunk.isFailure(_chunk))
return _chunk;
if (_chunk.isLast() && !_chunk.hasRemaining())
return Content.Chunk.EOF;

View File

@ -119,7 +119,7 @@ public class HttpChannelState implements HttpChannel, Components
/**
* Failure passed to {@link #onFailure(Throwable)}
*/
private Content.Chunk.Error _failure;
private Content.Chunk _failure;
/**
* Listener for {@link #onFailure(Throwable)} events
*/
@ -400,9 +400,9 @@ public class HttpChannelState implements HttpChannel, Components
{
_failure = Content.Chunk.from(x);
}
else if (ExceptionUtil.areNotAssociated(_failure.getCause(), x) && _failure.getCause().getClass() != x.getClass())
else if (ExceptionUtil.areNotAssociated(_failure.getFailure(), x) && _failure.getFailure().getClass() != x.getClass())
{
_failure.getCause().addSuppressed(x);
_failure.getFailure().addSuppressed(x);
}
// If not handled, then we just fail the request callback
@ -1243,8 +1243,8 @@ public class HttpChannelState implements HttpChannel, Components
protected Throwable getFailure(HttpChannelState httpChannelState)
{
Content.Chunk.Error failure = httpChannelState._failure;
return failure == null ? null : failure.getCause();
Content.Chunk failure = httpChannelState._failure;
return failure == null ? null : failure.getFailure();
}
/**
@ -1694,7 +1694,7 @@ public class HttpChannelState implements HttpChannel, Components
protected void onError(Runnable task, Throwable failure)
{
ChannelRequest request;
Content.Chunk.Error error;
Content.Chunk error;
boolean callbackCompleted;
try (AutoLock ignore = _lock.lock())
{
@ -1726,9 +1726,9 @@ public class HttpChannelState implements HttpChannel, Components
{
// We are already in error, so we will not handle this one,
// but we will add as suppressed if we have not seen it already.
Throwable cause = error.getCause();
Throwable cause = error.getFailure();
if (ExceptionUtil.areNotAssociated(cause, failure))
error.getCause().addSuppressed(failure);
error.getFailure().addSuppressed(failure);
}
}
}

View File

@ -1098,8 +1098,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
{
BadMessageException bad = new BadMessageException("Early EOF");
if (stream._chunk instanceof Error error)
error.getCause().addSuppressed(bad);
if (Content.Chunk.isFailure(stream._chunk))
stream._chunk.getFailure().addSuppressed(bad);
else
{
if (stream._chunk != null)

View File

@ -657,9 +657,9 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
request.demand(this);
return;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
callback.failed(error.getCause());
callback.failed(chunk.getFailure());
return;
}
// copy buffer

View File

@ -710,9 +710,9 @@ public class GracefulHandlerTest
}
}
LOG.debug("chunk = {}", chunk);
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
Response.writeError(request, response, callback, error.getCause());
Response.writeError(request, response, callback, chunk.getFailure());
return true;
}
bytesRead += chunk.remaining();

View File

@ -66,7 +66,6 @@ import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -1214,8 +1213,8 @@ public class HttpChannelTest
Request rq = handling.get().getRequest();
Content.Chunk chunk = rq.read();
assertTrue(chunk.isLast());
assertInstanceOf(Content.Chunk.Error.class, chunk);
assertThat(((Content.Chunk.Error)chunk).getCause(), sameInstance(failure));
assertTrue(Content.Chunk.isFailure(chunk, true));
assertThat(chunk.getFailure(), sameInstance(failure));
CountDownLatch demand = new CountDownLatch(1);
// Callback serialized until after onError task

View File

@ -542,10 +542,10 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
continue;
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
earlyEOFException.countDown();
throw IO.rethrow(error.getCause());
throw IO.rethrow(chunk.getFailure());
}
if (chunk.hasRemaining())

View File

@ -105,9 +105,9 @@ public class DumpHandler extends Handler.Abstract
}
}
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
callback.failed(error.getCause());
callback.failed(chunk.getFailure());
return true;
}

View File

@ -100,9 +100,9 @@ public class StatisticsHandlerTest
return true;
}
if (chunk instanceof Content.Chunk.Error errorContent)
if (Content.Chunk.isFailure(chunk))
{
callback.failed(errorContent.getCause());
callback.failed(chunk.getFailure());
return true;
}

View File

@ -284,8 +284,8 @@ public class ThreadLimitHandlerTest
request.demand(this);
return;
}
if (chunk instanceof Error error)
throw error.getCause();
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
if (chunk.hasRemaining())
read.addAndGet(chunk.remaining());

View File

@ -63,7 +63,6 @@ import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -256,8 +255,8 @@ public class HttpClientTest extends AbstractTest
continue;
}
}
if (chunk instanceof Content.Chunk.Error error)
throw IO.rethrow(error.getCause());
if (Content.Chunk.isFailure(chunk))
throw IO.rethrow(chunk.getFailure());
total += chunk.remaining();
if (total >= sleep)
@ -941,7 +940,7 @@ public class HttpClientTest extends AbstractTest
assertThat(chunks2.stream().mapToInt(c -> c.getByteBuffer().remaining()).sum(), is(totalBytes));
assertThat(chunks3.stream().mapToInt(c -> c.getByteBuffer().remaining()).sum(), is(0));
assertThat(chunks3.size(), is(1));
assertThat(chunks3.get(0), instanceOf(Content.Chunk.Error.class));
assertTrue(Content.Chunk.isFailure(chunks3.get(0), true));
chunks1.forEach(Content.Chunk::release);
chunks2.forEach(Content.Chunk::release);
@ -983,7 +982,7 @@ public class HttpClientTest extends AbstractTest
assertThat(chunks3Latch.await(5, TimeUnit.SECONDS), is(true));
assertThat(chunks3.stream().mapToInt(c -> c.getByteBuffer().remaining()).sum(), is(0));
assertThat(chunks3.size(), is(1));
assertThat(chunks3.get(0), instanceOf(Content.Chunk.Error.class));
assertTrue(Content.Chunk.isFailure(chunks3.get(0), true));
chunks1.forEach(Content.Chunk::release);
chunks2.forEach(Content.Chunk::release);

View File

@ -130,8 +130,8 @@ public class ServerTimeoutsTest extends AbstractTest
// Reads should yield the idle timeout.
Content.Chunk chunk = requestRef.get().read();
assertThat(chunk, instanceOf(Content.Chunk.Error.class));
Throwable cause = ((Content.Chunk.Error)chunk).getCause();
assertTrue(Content.Chunk.isFailure(chunk, true));
Throwable cause = chunk.getFailure();
assertThat(cause, instanceOf(TimeoutException.class));
// Complete the callback as the error listener promised.

View File

@ -257,9 +257,9 @@ public class ProxyServlet extends AbstractProxyServlet
public Content.Chunk read()
{
Content.Chunk chunk = super.read();
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
onClientRequestFailure(request, proxyRequest, response, error.getCause());
onClientRequestFailure(request, proxyRequest, response, chunk.getFailure());
}
else
{

View File

@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private static final Content.Chunk.Error RECYCLED_ERROR_CHUNK = Content.Chunk.from(new StaticException("ContentProducer has been recycled"));
private static final Content.Chunk RECYCLED_ERROR_CHUNK = Content.Chunk.from(new StaticException("ContentProducer has been recycled"), true);
final AutoLock _lock;
private final ServletChannel _servletChannel;
@ -101,10 +101,10 @@ class AsyncContentProducer implements ContentProducer
public boolean isError()
{
assertLocked();
boolean error = _chunk instanceof Content.Chunk.Error;
boolean failure = Content.Chunk.isFailure(_chunk);
if (LOG.isDebugEnabled())
LOG.debug("isError = {} {}", error, this);
return error;
LOG.debug("isFailure = {} {}", failure, this);
return failure;
}
@Override

View File

@ -255,14 +255,14 @@ public class HttpInput extends ServletInputStream implements Runnable
return read;
}
if (chunk instanceof Content.Chunk.Error errorChunk)
if (Content.Chunk.isFailure(chunk))
{
Throwable error = errorChunk.getCause();
Throwable failure = chunk.getFailure();
if (LOG.isDebugEnabled())
LOG.debug("read error={} {}", error, this);
if (error instanceof IOException)
throw (IOException)error;
throw new IOException(error);
LOG.debug("read failure={} {}", failure, this);
if (failure instanceof IOException)
throw (IOException)failure;
throw new IOException(failure);
}
if (LOG.isDebugEnabled())
@ -343,14 +343,14 @@ public class HttpInput extends ServletInputStream implements Runnable
return;
}
if (chunk instanceof Content.Chunk.Error errorChunk)
if (Content.Chunk.isFailure(chunk))
{
Throwable error = errorChunk.getCause();
Throwable failure = chunk.getFailure();
if (LOG.isDebugEnabled())
LOG.debug("running error={} {}", error, this);
LOG.debug("running failure={} {}", failure, this);
// TODO is this necessary to add here?
_servletChannel.getServletContextResponse().getHeaders().add(HttpFields.CONNECTION_CLOSE);
_readListener.onError(error);
_readListener.onError(failure);
}
else if (chunk.isLast() && !chunk.hasRemaining())
{

View File

@ -572,8 +572,8 @@ public class HttpInput extends ServletInputStream implements Runnable
{
public static Content asChunk(org.eclipse.jetty.io.Content.Chunk chunk)
{
if (chunk instanceof org.eclipse.jetty.io.Content.Chunk.Error error)
return new ErrorContent(error.getCause());
if (org.eclipse.jetty.io.Content.Chunk.isFailure(chunk))
return new ErrorContent(chunk.getFailure());
if (chunk.isLast() && !chunk.hasRemaining())
return new EofContent();
Content content = new Content(chunk.getByteBuffer())

View File

@ -257,9 +257,9 @@ public class ProxyServlet extends AbstractProxyServlet
public Content.Chunk read()
{
Content.Chunk chunk = super.read();
if (chunk instanceof Content.Chunk.Error error)
if (Content.Chunk.isFailure(chunk))
{
onClientRequestFailure(request, proxyRequest, response, error.getCause());
onClientRequestFailure(request, proxyRequest, response, chunk.getFailure());
}
else
{

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.ee9.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.ee9.websocket.client.WebSocketClient;
import org.eclipse.jetty.ee9.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.BufferUtil;