Converted writeTrailers to a static method (#8940)

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2022-11-29 15:08:36 +11:00 committed by GitHub
parent 7ddfbcc8d8
commit eec5e69079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 21 deletions

View File

@ -415,7 +415,7 @@ public class TrailersTest extends AbstractTest
{
HttpFields.Mutable trailers = HttpFields.build();
response.setTrailersSupplier(() -> trailers);
Content.copy(request, response, response::writeTrailers, callback);
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
}
});

View File

@ -21,7 +21,6 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
@ -80,12 +79,12 @@ public class Content
*
* @param source the source to copy from
* @param sink the sink to copy to
* @param chunkHandler a (possibly {@code null}) predicate to handle the current chunk and its callback
* @param chunkProcessor a (possibly {@code null}) predicate to handle the current chunk and its callback
* @param callback the callback to notify when the copy is complete
*/
public static void copy(Source source, Sink sink, BiPredicate<Chunk, Callback> chunkHandler, Callback callback)
public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor, Callback callback)
{
new ContentCopier(source, sink, chunkHandler, callback).iterate();
new ContentCopier(source, sink, chunkProcessor, callback).iterate();
}
/**
@ -683,5 +682,21 @@ public class Content
return true;
}
}
/**
* <p>Implementations of this interface may process {@link Chunk}s being copied by the
* {@link Content#copy(Source, Sink, Processor, Callback)} method, so that
* {@link Chunk}s of unknown types can be copied.
* @see Content#copy(Source, Sink, Processor, Callback)
*/
interface Processor
{
/**
* @param chunk The chunk to be considered for processing.
* @param callback The callback that will be called once the accepted chunk is processed.
* @return True if the chunk will be process and the callback will be called (or may have already been called), false otherwise.
*/
boolean process(Chunk chunk, Callback callback);
}
}
}

View File

@ -13,8 +13,6 @@
package org.eclipse.jetty.io.internal;
import java.util.function.BiPredicate;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
@ -23,16 +21,16 @@ public class ContentCopier extends IteratingNestedCallback
{
private final Content.Source source;
private final Content.Sink sink;
private final BiPredicate<Content.Chunk, Callback> chunkHandler;
private final Content.Chunk.Processor chunkProcessor;
private Content.Chunk current;
private boolean terminated;
public ContentCopier(Content.Source source, Content.Sink sink, BiPredicate<Content.Chunk, Callback> chunkHandler, Callback callback)
public ContentCopier(Content.Source source, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
{
super(callback);
this.source = source;
this.sink = sink;
this.chunkHandler = chunkHandler;
this.chunkProcessor = chunkProcessor;
}
@Override
@ -55,7 +53,7 @@ public class ContentCopier extends IteratingNestedCallback
return Action.IDLE;
}
if (chunkHandler != null && chunkHandler.test(current, this))
if (chunkProcessor != null && chunkProcessor.process(current, this))
return Action.SCHEDULED;
if (current instanceof Error error)

View File

@ -65,24 +65,49 @@ public interface Response extends Content.Sink
CompletableFuture<Void> writeInterim(int status, HttpFields headers);
// TODO: make it static, otherwise we must override it in Wrapper.
default boolean writeTrailers(Content.Chunk chunk, Callback ignored)
/**
* <p>Returns a chunk processor suitable to be passed to the
* {@link Content#copy(Content.Source, Content.Sink, Content.Chunk.Processor, Callback)}
* method, that will handle {@link Trailers} chunks
* by adding their fields to the {@link HttpFields} supplied by
* {@link Response#getTrailersSupplier()}.</p>
* <p>This is specifically useful for writing trailers that have been received via
* the {@link Content.Source#read()} API, for example when echoing a request to a response:</p>
* <pre>
* Content.copy(request, response, Response.asTrailerChunkHandler(response), callback);
* </pre>
* @param response The response for which to process a trailers chunk.
* If the {@link Response#setTrailersSupplier(Supplier)}
* method has not been called prior to this method, then a noop processor is returned.
* @return A chunk processor that will add trailer chunks to the response's trailer supplied fields.
* @see Content#copy(Content.Source, Content.Sink, Content.Chunk.Processor, Callback)
* @see Trailers
*/
static Content.Chunk.Processor newTrailersChunkProcessor(Response response)
{
Supplier<HttpFields> supplier = response.getTrailersSupplier();
if (supplier == null)
return (chunk, callback) -> false;
return (chunk, callback) ->
{
if (chunk instanceof Trailers trailers)
{
HttpFields requestTrailers = trailers.getTrailers();
if (requestTrailers != null)
{
Supplier<HttpFields> supplier = getTrailersSupplier();
if (supplier != null)
{
// Call supplier in lambda to get latest responseTrailers
HttpFields responseTrailers = supplier.get();
if (responseTrailers instanceof HttpFields.Mutable mutable)
{
mutable.add(requestTrailers);
callback.succeeded();
return true;
}
}
}
return false;
};
}
@SuppressWarnings("unchecked")

View File

@ -48,7 +48,7 @@ public class EchoHandler extends Handler.Processor.NonBlocking
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, contentLength);
if (contentLength > 0 || contentLength == -1 && request.getHeaders().contains(HttpHeader.TRANSFER_ENCODING))
Content.copy(request, response, response::writeTrailers, callback);
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
else
callback.succeeded();
}