Jetty-12 Improved Content testing and javadoc (#8263)

Improved Content javadoc
Fixed ContentSourceTest that was consuming the same ByteBufferSource multiple times.
Using ByteBufferPool.NOOP instead of allocating NoopByteBufferPool.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2022-07-08 22:52:28 +10:00 committed by GitHub
parent 1031bf1374
commit 676833e57e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 582 additions and 120 deletions

View File

@ -0,0 +1,196 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.io.PrintStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Flow;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.SerializedInvoker;
public class HandlerDocs
{
// Work in progress
public static class HelloHandler extends Handler.Abstract
{
@Override
public Request.Processor handle(Request request) throws Exception
{
return (req, response, callback) ->
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
response.write(true, BufferUtil.toBuffer("Hello World\n"), callback);
};
}
}
public static class HelloHandler2 extends Handler.Abstract
{
@Override
public Request.Processor handle(Request request) throws Exception
{
return this::process;
}
private void process(Request request, Response response, Callback callback)
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
response.write(true, BufferUtil.toBuffer("Hello World\n"), callback);
}
}
public static class HelloHandler3 extends Handler.Processor.NonBlocking
{
@Override
public void process(Request request, Response response, Callback callback)
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
response.write(true, BufferUtil.toBuffer("Hello World\n"), callback);
}
}
public static class HelloHandler35 extends Handler.Processor.NonBlocking
{
@Override
public void process(Request request, Response response, Callback callback) throws IOException
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
Blocker.Shared blocker = new Blocker.Shared();
try (Blocker.Callback cb = blocker.callback())
{
response.write(true, BufferUtil.toBuffer("Hello "), callback);
cb.block();
}
try (Blocker.Callback cb = blocker.callback())
{
response.write(true, BufferUtil.toBuffer("World\n"), callback);
cb.block();
}
callback.succeeded();
}
}
public static class HelloHandler4 extends Handler.Processor.Blocking
{
@Override
public void process(Request request, Response response, Callback callback) throws IOException
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
try (PrintStream out = new PrintStream(Content.Sink.asOutputStream(response)))
{
out.print("Hello ");
out.println("World");
callback.succeeded();
}
catch (Throwable t)
{
callback.failed(t);
}
}
}
public static class HelloHandler5 extends Handler.Processor.NonBlocking
{
@Override
public void process(Request request, Response response, Callback callback) throws IOException
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
new HelloWorldPublisher().subscribe(Content.Sink.asSubscriber(response, callback));
}
}
public static class HelloWorldPublisher implements Flow.Publisher<Content.Chunk>
{
@Override
public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber)
{
final SerializedInvoker invoker = new SerializedInvoker();
final Queue<Content.Chunk> chunks = new LinkedList<>(List.of(
Content.Chunk.from(BufferUtil.toBuffer("Hello "), false),
Content.Chunk.from(BufferUtil.toBuffer("World "), false),
Content.Chunk.EOF));
subscriber.onSubscribe(new Flow.Subscription()
{
@Override
public void request(long n)
{
while (n-- > 0 && !chunks.isEmpty())
invoker.run(() -> subscriber.onNext(chunks.poll()));
}
@Override
public void cancel()
{
subscriber.onNext(Content.Chunk.from(new IOException("Cancelled")));
}
});
}
}
public static class DescriminatingGreeterHandler extends Handler.Processor.NonBlocking
{
@Override
public Request.Processor handle(Request request) throws Exception
{
if (HttpMethod.GET.is(request.getMethod()) &&
"greeting".equals(request.getPathInContext()))
return this;
return null;
}
@Override
public void process(Request request, Response response, Callback callback)
{
response.setStatus(200);
response.getHeaders().add(HttpHeader.CONTENT_LENGTH, "text/plain");
response.write(true, BufferUtil.toBuffer("Hello World\n"), callback);
}
}
public static class EchoHandler extends Handler.Processor.NonBlocking
{
@Override
public void process(Request request, Response response, Callback callback)
{
response.setStatus(200);
response.getHeaders().put(HttpHeader.CONTENT_TYPE, request.getHeaders().get(HttpHeader.CONTENT_TYPE));
long contentLength = request.getHeaders().getLongField(HttpHeader.CONTENT_LENGTH);
if (contentLength >= 0)
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, contentLength);
Content.copy(request, response, callback);
}
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.client.util;
import java.io.InputStream;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.content.InputStreamContentSource;
/**
@ -32,7 +33,7 @@ public class InputStreamRequestContent extends InputStreamContentSource implemen
public InputStreamRequestContent(InputStream stream)
{
this("application/octet-stream", stream);
this("application/octet-stream", stream, null);
}
public InputStreamRequestContent(InputStream stream, int bufferSize)
@ -40,18 +41,23 @@ public class InputStreamRequestContent extends InputStreamContentSource implemen
this("application/octet-stream", stream, bufferSize);
}
public InputStreamRequestContent(String contentType, InputStream stream)
{
super(stream);
this.contentType = contentType;
}
public InputStreamRequestContent(String contentType, InputStream stream, int bufferSize)
{
this(contentType, stream);
this(contentType, stream, null);
setBufferSize(bufferSize);
}
public InputStreamRequestContent(String contentType, InputStream stream)
{
this(contentType, stream, null);
}
public InputStreamRequestContent(String contentType, InputStream stream, ByteBufferPool bufferPool)
{
super(stream, bufferPool);
this.contentType = contentType;
}
@Override
public String getContentType()
{

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.Test;
@ -54,7 +53,7 @@ public class DataGenerateParseTest
byteBuffer.get(inputBytes);
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
new MessageGenerator(null, 8192, true).generate(lease, 0, input, null);
List<DataFrame> frames = new ArrayList<>();

View File

@ -22,7 +22,6 @@ import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -35,7 +34,7 @@ public class GoAwayGenerateParseTest
{
GoAwayFrame input = GoAwayFrame.CLIENT_GRACEFUL;
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
new ControlGenerator(true).generate(lease, 0, input, null);
List<GoAwayFrame> frames = new ArrayList<>();

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -48,7 +47,7 @@ public class HeadersGenerateParseTest
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields), true);
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input, null);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);

View File

@ -23,7 +23,6 @@ import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -47,7 +46,7 @@ public class SettingsGenerateParseTest
{
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
new ControlGenerator(true).generate(lease, 0, input, null);
List<SettingsFrame> frames = new ArrayList<>();

View File

@ -21,7 +21,6 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.hamcrest.Matcher;
@ -33,7 +32,7 @@ public class QpackTestUtil
{
public static ByteBuffer toBuffer(Instruction... instructions)
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
for (Instruction instruction : instructions)
{
instruction.encode(lease);
@ -56,8 +55,7 @@ public class QpackTestUtil
public static ByteBuffer toBuffer(List<Instruction> instructions)
{
NullByteBufferPool bufferPool = new NullByteBufferPool();
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(bufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(ByteBufferPool.NOOP);
instructions.forEach(i -> i.encode(lease));
assertThat(lease.getSize(), is(instructions.size()));
ByteBuffer combinedBuffer = BufferUtil.allocate(Math.toIntExact(lease.getTotalLength()), false);

View File

@ -43,7 +43,7 @@ public class ByteBufferAccumulator implements AutoCloseable
public ByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct)
{
_bufferPool = (bufferPool == null) ? new NullByteBufferPool() : bufferPool;
_bufferPool = (bufferPool == null) ? ByteBufferPool.NOOP : bufferPool;
_direct = direct;
}

View File

@ -36,7 +36,7 @@ public class ByteBufferOutputStream2 extends OutputStream
public ByteBufferOutputStream2(ByteBufferPool bufferPool, boolean direct)
{
_accumulator = new ByteBufferAccumulator((bufferPool == null) ? new NullByteBufferPool() : bufferPool, direct);
_accumulator = new ByteBufferAccumulator((bufferPool == null) ? ByteBufferPool.NOOP : bufferPool, direct);
}
public ByteBufferPool getByteBufferPool()

View File

@ -27,6 +27,8 @@ import org.eclipse.jetty.util.BufferUtil;
*/
public interface ByteBufferPool
{
ByteBufferPool NOOP = new NoopByteBufferPool();
/**
* <p>Requests a {@link ByteBuffer} of the given size.</p>
* <p>The returned buffer may have a bigger capacity than the size being requested.</p>

View File

@ -19,8 +19,10 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
import org.eclipse.jetty.io.content.ContentSinkSubscriber;
@ -446,7 +448,7 @@ public class Content
*/
static Chunk from(ByteBuffer byteBuffer, boolean last)
{
return from(byteBuffer, last, null);
return new ByteBufferChunk(byteBuffer, last);
}
/**
@ -459,7 +461,20 @@ public class Content
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Runnable releaser)
{
return new ByteBufferChunk(byteBuffer, last, releaser);
return new ByteBufferChunk.ReleasedByRunnable(byteBuffer, last, Objects.requireNonNull(releaser));
}
/**
* <p>Creates a last/non-last Chunk with the given ByteBuffer.</p>
*
* @param byteBuffer the ByteBuffer with the bytes of this Chunk
* @param last whether the Chunk is the last one
* @param releaser the code to run when this Chunk is released
* @return a new Chunk
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> releaser)
{
return new ByteBufferChunk.ReleasedByConsumer(byteBuffer, last, Objects.requireNonNull(releaser));
}
/**
@ -577,7 +592,7 @@ public class Content
/**
* <p>Returns whether this Chunk is a <em>terminal</em> chunk.</p>
* <p>A terminal chunk is either an {@link Error error chunk},
* or a Chunk that {@link #isLast() is last} and has no remaining
* or a Chunk that {@link #isLast()} is true and has no remaining
* bytes.</p>
*
* @return whether this Chunk is a terminal chunk

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
public class NullByteBufferPool implements ByteBufferPool
public class NoopByteBufferPool implements ByteBufferPool
{
private final RetainableByteBufferPool _retainableByteBufferPool = RetainableByteBufferPool.from(this);

View File

@ -27,6 +27,13 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>A {@link Content.Source} that is also a {@link Content.Sink}.
* Content written to the {@link Content.Sink} is converted to a {@link Content.Chunk}
* and made available to calls to the {@link #read()} method. If necessary, any
* {@link Runnable} passed to the {@link #demand(Runnable)} method is invoked once
* content is written to the {@link Content.Sink}.</p>
*/
public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
private static final int UNDETERMINED_LENGTH = -2;
@ -65,6 +72,12 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
{
failure = errorChunk.getCause();
}
else if (chunk instanceof Content.Chunk.Error error)
{
errorChunk = error;
failure = errorChunk.getCause();
wasEmpty = chunks.isEmpty();
}
else
{
wasEmpty = chunks.isEmpty();
@ -79,7 +92,7 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
}
if (failure != null)
callback.failed(failure);
else if (wasEmpty)
if (wasEmpty)
invoker.run(this::invokeDemandCallback);
}
@ -154,8 +167,6 @@ public class AsyncContent implements Content.Sink, Content.Source, Closeable
return errorChunk;
return null;
}
// TODO if we read an Chunk.Error we should remember it to fulfill the read() contract
// where further reads should return the same error chunk.
readClosed = current.chunk().isLast();
if (chunks.isEmpty())
l.signal();

View File

@ -23,6 +23,11 @@ import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>A {@link Content.Source} backed by one or more {@link ByteBuffer}s.
* The buffers passed in the constructor are made available as {@link Content.Chunk}s
* via {@link #read()}. Any calls to {@link #demand(Runnable)} are immediately satisfied.</p>
*/
public class ByteBufferContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();

View File

@ -19,8 +19,16 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
/**
* <p>An {@link OutputStream} backed by a {@link Content.Sink}.
* Any content written to this {@link OutputStream} is written
* to the {@link Content.Sink#write(boolean, ByteBuffer, Callback)}
* with a callback that blocks the caller until it is succeeded or
* failed.</p>
*/
public class ContentSinkOutputStream extends OutputStream
{
private final Blocker.Shared _blocking = new Blocker.Shared();

View File

@ -13,11 +13,18 @@
package org.eclipse.jetty.io.content;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
/**
* <p>A {@link Flow.Subscriber} that wraps a {@link Content.Sink}.
* Content delivered to the {@link #onNext(Content.Chunk)} method is
* written to {@link Content.Sink#write(boolean, ByteBuffer, Callback)} and the chunk
* is released once the write collback is succeeded or failed.</p>
*/
public class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk>
{
private final Content.Sink sink;

View File

@ -21,6 +21,13 @@ import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.IO;
/**
* <p>An {@link InputStream} that is backed by a {@link Content.Source}.
* The read methods are implemented by calling {@link Content.Source#read()}.
* Any {@link Content.Chunk}s read are released once all their content
* has been read.
* </p>
*/
public class ContentSourceInputStream extends InputStream
{
private final Blocker.Shared blocking = new Blocker.Shared();
@ -89,10 +96,23 @@ public class ContentSourceInputStream extends InputStream
@Override
public void close()
{
if (chunk != null)
chunk.release();
// If we have already reached a real EOF or an error, close is a noop.
if (chunk == Content.Chunk.EOF || chunk instanceof Content.Chunk.Error)
return;
chunk = Content.Chunk.EOF;
// If we have a chunk here, then it needs to be released
if (chunk != null)
{
chunk.release();
// if the chunk was a last chunk (but not an instanceof EOF), then nothing more to do
if (chunk.isLast())
return;
}
// This is an abnormal close before EOF
Throwable closed = new IOException("closed before EOF");
chunk = Content.Chunk.from(closed);
content.fail(closed);
}
}

View File

@ -19,6 +19,13 @@ import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.thread.AutoLock;
/**
* <p>Wraps a {@link Content.Source} as a {@link Flow.Publisher}.
* When content is requested via {@link Flow.Subscription#request(long)}, it is
* read from the passed {@link Content.Source} and passed to {@link Flow.Subscriber#onNext(Object)}.
* If no content is available, then the {@link Content.Source#demand(Runnable)} method is used to
* ultimately call {@link Flow.Subscriber#onNext(Object)} once content is available.</p>
*/
public class ContentSourcePublisher implements Flow.Publisher<Content.Chunk>
{
private final Content.Source content;

View File

@ -17,6 +17,18 @@ import java.util.Objects;
import org.eclipse.jetty.io.Content;
/**
* <p>
* This abstract {@link Content.Source} wraps another {@link Content.Source} and implementors need only to provide
* the {@link #transform(Content.Chunk)} method, which is used to transform {@link Content.Chunk} read from the
* wrapped source.
* </p>
* <p>
* The {@link #demand(Runnable)} conversation is passed directly to the wrapped {@link Content.Source}, which means
* that transformations that may fully consume bytes read can result in a null return from {@link Content.Source#read()}
* even after a callback to the demand {@link Runnable} (as per spurious invocation in {@link Content.Source#demand(Runnable)}.
* </p>
*/
public abstract class ContentSourceTransformer implements Content.Source
{
private final Content.Source rawSource;
@ -112,5 +124,22 @@ public abstract class ContentSourceTransformer implements Content.Source
}
}
/**
* Content chunk transformation method.
* <p>
* This method is called during a {@link Content.Source#read()} to transform a raw chunk to a chunk that
* will be returned from the read call. The caller of {@link Content.Source#read()} method is always
* responsible for calling {@link Content.Chunk#release()} on the returned chunk, which may be:
* <ul>
* <li>the <code>rawChunk</code>. This is typically done for {@link Content.Chunk.Error}s,
* when {@link Content.Chunk#isLast()} is true, or if no transformation is required.</li>
* <li>a new (or predefined) {@link Content.Chunk} derived from the <code>rawChunk</code>. The transform is
* responsible for calling {@link Content.Chunk#release()} on the <code>rawChunk</code>, either during the call
* to {@link Content.Source#read()} or subsequently.</li>
* <li>null if the <code>rawChunk</code> is fully consumed and/or requires additional chunks to be transformed.</li>
* </ul>
* @param rawChunk A chunk read from the wrapped {@link Content.Source}. It is always non null.
* @return The transformed chunk or null.
*/
protected abstract Content.Chunk transform(Content.Chunk rawChunk);
}

View File

@ -16,24 +16,42 @@ package org.eclipse.jetty.io.content;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>
* A {@link Content.Source} that is backed by an {@link InputStream}.
* Data is read from the {@link InputStream} into a buffer that is optionally acquired
* from a {@link ByteBufferPool}, and converted to a {@link Content.Chunk} that is
* returned from {@link #read()}. If no {@link ByteBufferPool} is provided, then
* a {@link NoopByteBufferPool} is used.
* </p>
*/
public class InputStreamContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final InputStream inputStream;
private final ByteBufferPool bufferPool;
private int bufferSize = 4096;
private Runnable demandCallback;
private Content.Chunk.Error errorChunk;
private boolean closed;
public InputStreamContentSource(InputStream inputStream)
{
this(inputStream, null);
}
public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool)
{
this.inputStream = inputStream;
this.bufferPool = bufferPool == null ? ByteBufferPool.NOOP : bufferPool;
}
public int getBufferSize()
@ -59,8 +77,8 @@ public class InputStreamContentSource implements Content.Source
try
{
byte[] buffer = new byte[getBufferSize()];
int read = inputStream.read(buffer);
ByteBuffer buffer = bufferPool.acquire(getBufferSize(), false);
int read = inputStream.read(buffer.array(), buffer.arrayOffset(), buffer.capacity());
if (read < 0)
{
close();
@ -68,8 +86,8 @@ public class InputStreamContentSource implements Content.Source
}
else
{
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, read);
return Content.Chunk.from(byteBuffer, false);
buffer.limit(read);
return Content.Chunk.from(buffer, false, bufferPool::release);
}
}
catch (Throwable x)

View File

@ -21,6 +21,15 @@ import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.IO;
/**
* <p>
* A {@link Content.Source} backed by an {@link OutputStream}.
* Any bytes written to the {@link OutputStream} returned by {@link #getOutputStream()}
* is converted to a {@link Content.Chunk} and returned from {@link #read()}. If
* necessary, any {@link Runnable} passed to {@link #demand(Runnable)} is invoked.
* </p>
* @see AsyncContent
*/
public class OutputStreamContentSource implements Content.Source
{
private final AsyncContent async = new AsyncContent();

View File

@ -29,6 +29,9 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>A {@link Content.Source} that provides the file content of the passed {@link Path}</p>
*/
public class PathContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
@ -143,7 +146,7 @@ public class PathContentSource implements Content.Source
if (last)
IO.close(channel);
return Content.Chunk.from(byteBuffer, last, () -> release(byteBuffer));
return Content.Chunk.from(byteBuffer, last, this::release);
}
@Override

View File

@ -16,13 +16,14 @@ package org.eclipse.jetty.io.internal;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;
public class ByteBufferChunk implements Content.Chunk
{
public static final ByteBufferChunk EMPTY = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, false, null)
public static final ByteBufferChunk EMPTY = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, false)
{
@Override
public String toString()
@ -30,7 +31,7 @@ public class ByteBufferChunk implements Content.Chunk
return "%s[EMPTY]".formatted(ByteBufferChunk.class.getSimpleName());
}
};
public static final ByteBufferChunk EOF = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, true, null)
public static final ByteBufferChunk EOF = new ByteBufferChunk(BufferUtil.EMPTY_BUFFER, true)
{
@Override
public String toString()
@ -41,13 +42,11 @@ public class ByteBufferChunk implements Content.Chunk
private final ByteBuffer byteBuffer;
private final boolean last;
private final AtomicReference<Runnable> releaser;
public ByteBufferChunk(ByteBuffer byteBuffer, boolean last, Runnable releaser)
public ByteBufferChunk(ByteBuffer byteBuffer, boolean last)
{
this.byteBuffer = Objects.requireNonNull(byteBuffer);
this.last = last;
this.releaser = releaser == null ? null : new AtomicReference<>(releaser);
}
public ByteBuffer getByteBuffer()
@ -62,12 +61,6 @@ public class ByteBufferChunk implements Content.Chunk
public void release()
{
if (releaser != null)
{
Runnable runnable = releaser.getAndSet(null);
if (runnable != null)
runnable.run();
}
}
@Override
@ -80,4 +73,40 @@ public class ByteBufferChunk implements Content.Chunk
BufferUtil.toDetailString(getByteBuffer())
);
}
public static class ReleasedByRunnable extends ByteBufferChunk
{
private final AtomicReference<Runnable> releaser;
public ReleasedByRunnable(ByteBuffer byteBuffer, boolean last, Runnable releaser)
{
super(byteBuffer, last);
this.releaser = new AtomicReference<>(releaser);
}
public void release()
{
Runnable runnable = releaser.getAndSet(null);
if (runnable != null)
runnable.run();
}
}
public static class ReleasedByConsumer extends ByteBufferChunk
{
private final AtomicReference<Consumer<ByteBuffer>> releaser;
public ReleasedByConsumer(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> releaser)
{
super(byteBuffer, last);
this.releaser = new AtomicReference<>(releaser);
}
public void release()
{
Consumer<ByteBuffer> consumer = releaser.getAndSet(null);
if (consumer != null)
consumer.accept(getByteBuffer());
}
}
}

View File

@ -16,12 +16,17 @@ package org.eclipse.jetty.io;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -29,21 +34,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncContentSourceTest
{
// TODO make an OutputStreamContentSource version of this test
// TODO add a test to actually read some content!
@Test
public void testOfferInvokesDemandCallback() throws Exception
{
AsyncContent async = new AsyncContent();
try (AsyncContent async = new AsyncContent())
{
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(500, TimeUnit.MILLISECONDS));
async.write(Content.Chunk.from(UTF_8.encode("one"), false), Callback.NOOP);
async.write(Content.Chunk.from(UTF_8.encode("one"), false), Callback.NOOP);
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(latch.await(5, TimeUnit.SECONDS));
Content.Chunk chunk = async.read();
assertNotNull(chunk);
Content.Chunk chunk = async.read();
assertNotNull(chunk);
}
}
@Test
@ -53,7 +63,7 @@ public class AsyncContentSourceTest
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(500, TimeUnit.MILLISECONDS));
assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
async.close();
@ -67,27 +77,52 @@ public class AsyncContentSourceTest
@Test
public void testFailInvokesDemandCallback() throws Exception
{
AsyncContent async = new AsyncContent();
async.write(Content.Chunk.from(UTF_8.encode("one"), false), Callback.NOOP);
try (AsyncContent async = new AsyncContent())
{
async.write(Content.Chunk.from(UTF_8.encode("one"), false), Callback.NOOP);
Content.Chunk chunk = async.read();
assertNotNull(chunk);
Content.Chunk chunk = async.read();
assertNotNull(chunk);
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(500, TimeUnit.MILLISECONDS));
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
async.fail(new CancellationException());
async.fail(new CancellationException());
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(latch.await(5, TimeUnit.SECONDS));
// We must read the error.
chunk = async.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
// We must read the error.
chunk = async.read();
assertInstanceOf(Content.Chunk.Error.class, chunk);
// Offering more should fail.
CountDownLatch failLatch = new CountDownLatch(1);
async.write(Content.Chunk.EMPTY, Callback.from(Callback.NOOP::succeeded, x -> failLatch.countDown()));
assertTrue(failLatch.await(5, TimeUnit.SECONDS));
// Offering more should fail.
CountDownLatch failLatch = new CountDownLatch(1);
async.write(Content.Chunk.EMPTY, Callback.from(Callback.NOOP::succeeded, x -> failLatch.countDown()));
assertTrue(failLatch.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testWriteErrorChunk() throws Exception
{
try (AsyncContent async = new AsyncContent())
{
CountDownLatch latch = new CountDownLatch(1);
async.demand(latch::countDown);
assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
Throwable error = new Throwable("test");
AtomicReference<Throwable> callback = new AtomicReference<>();
async.write(Content.Chunk.from(error), Callback.from(Invocable.NOOP, callback::set));
assertThat(callback.get(), sameInstance(error));
assertTrue(latch.await(5, TimeUnit.SECONDS));
Content.Chunk chunk = async.read();
assertNotNull(chunk);
assertThat(chunk, instanceOf(Content.Chunk.Error.class));
assertThat(((Content.Chunk.Error)chunk).getCause(), sameInstance(error));
}
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -47,6 +48,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -65,15 +67,21 @@ public class ContentSourceTest
asyncSource.write(Content.Chunk.from(UTF_8.encode("two"), false), Callback.NOOP);
}
Content.Source byteBufferSource = new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two"));
ByteBufferContentSource byteBufferSource = new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two"));
Content.Source.Transformer transformerSource = new Content.Source.Transformer(byteBufferSource)
Content.Source.Transformer transformerSource = new Content.Source.Transformer(new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two")))
{
@Override
protected Content.Chunk transform(Content.Chunk rawChunk)
{
return rawChunk;
}
@Override
public String toString()
{
return "Content.Source.Transformer@%x".formatted(hashCode());
}
};
Path tmpDir = MavenTestingUtils.getTargetTestingPath();
@ -83,31 +91,95 @@ public class ContentSourceTest
PathContentSource pathSource = new PathContentSource(path);
pathSource.setBufferSize(3);
InputStreamContentSource inputSource = new InputStreamContentSource(new ContentSourceInputStream(byteBufferSource));
InputStreamContentSource inputSource = new InputStreamContentSource(new ByteArrayInputStream("onetwo".getBytes(UTF_8)));
// TODO
// OutputStreamContentSource outputSource = new OutputStreamContentSource();
// try (OutputStream stream = outputSource.getOutputStream())
// {
// stream.write("one".getBytes(UTF_8));
// stream.write("two".getBytes(UTF_8));
// }
InputStreamContentSource inputSource2 =
new InputStreamContentSource(new ContentSourceInputStream(new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two"))));
return List.of(asyncSource, byteBufferSource, transformerSource, pathSource, inputSource/*, outputSource*/);
return List.of(asyncSource, byteBufferSource, transformerSource, pathSource, inputSource, inputSource2);
}
/** Get the next chunk, blocking if necessary
* @param source The source to get the next chunk from
* @return A non null chunk
*/
public static Content.Chunk nextChunk(Content.Source source)
{
Content.Chunk chunk = source.read();
if (chunk != null)
return chunk;
FuturePromise<Content.Chunk> next = new FuturePromise<>();
Runnable getNext = new Runnable()
{
@Override
public void run()
{
Content.Chunk chunk = source.read();
if (chunk == null)
source.demand(this);
next.succeeded(chunk);
}
};
source.demand(getNext);
try
{
return next.get();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
@ParameterizedTest
@MethodSource("all")
public void testDemandReadDemandDoesNotRecurse(Content.Source source)
public void testRead(Content.Source source) throws Exception
{
AtomicBoolean processed = new AtomicBoolean();
StringBuilder builder = new StringBuilder();
CountDownLatch eof = new CountDownLatch(1);
source.demand(new Runnable()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk == null)
{
source.demand(this);
break;
}
if (chunk.hasRemaining())
builder.append(BufferUtil.toString(chunk.getByteBuffer()));
chunk.release();
if (chunk.isLast())
{
eof.countDown();
break;
}
}
}
});
assertTrue(eof.await(10, TimeUnit.SECONDS));
assertThat(builder.toString(), is("onetwo"));
}
@ParameterizedTest
@MethodSource("all")
public void testDemandReadDemandDoesNotRecurse(Content.Source source) throws Exception
{
CountDownLatch processed = new CountDownLatch(1);
AtomicBoolean recursion = new AtomicBoolean();
source.demand(new Runnable()
{
@Override
public void run()
{
processed.set(true);
processed.countDown();
assertTrue(recursion.compareAndSet(false, true));
@ -121,21 +193,19 @@ public class ContentSourceTest
assertTrue(recursion.compareAndSet(true, false));
}
});
assertTrue(processed.get());
assertTrue(processed.await(10, TimeUnit.SECONDS));
}
@ParameterizedTest
@MethodSource("all")
public void testDemandDemandThrows(Content.Source source)
public void testDemandDemandThrows(Content.Source source) throws Exception
{
AtomicBoolean processed = new AtomicBoolean();
CountDownLatch processed = new CountDownLatch(1);
source.demand(new Runnable()
{
@Override
public void run()
{
processed.set(true);
Content.Chunk chunk = source.read();
assertNotNull(chunk);
@ -146,16 +216,17 @@ public class ContentSourceTest
// Second demand after the first must throw.
assertThrows(IllegalStateException.class, () -> source.demand(this));
}
processed.countDown();
}
});
assertTrue(processed.get());
assertTrue(processed.await(10, TimeUnit.SECONDS));
}
@ParameterizedTest
@MethodSource("all")
public void testReadFailReadReturnsError(Content.Source source)
public void testReadFailReadReturnsError(Content.Source source) throws Exception
{
Content.Chunk chunk = source.read();
Content.Chunk chunk = nextChunk(source);
assertNotNull(chunk);
source.fail(new CancellationException());
@ -169,12 +240,7 @@ public class ContentSourceTest
@MethodSource("all")
public void testReadLastDemandInvokesDemandCallback(Content.Source source) throws Exception
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk.isLast())
break;
}
Content.Source.consumeAll(source);
CountDownLatch latch = new CountDownLatch(1);
source.demand(latch::countDown);
@ -199,9 +265,13 @@ public class ContentSourceTest
@ParameterizedTest
@MethodSource("all")
public void testDemandCallbackThrows(Content.Source source)
public void testDemandCallbackThrows(Content.Source source) throws Exception
{
Content.Chunk chunk = source.read();
// TODO fix for OSCS
// if (source instanceof OutputStreamContentSource)
// return;
Content.Chunk chunk = nextChunk(source);
assertNotNull(chunk);
source.demand(() ->

View File

@ -126,7 +126,7 @@ public class GzipRequest extends Request.WrapperProcessor
ByteBuffer decodedBuffer = _decoder.decode(_chunk);
if (BufferUtil.hasContent(decodedBuffer))
return Content.Chunk.from(decodedBuffer, _chunk.isLast() && !_chunk.hasRemaining(), () -> _decoder.release(decodedBuffer));
return Content.Chunk.from(decodedBuffer, _chunk.isLast() && !_chunk.hasRemaining(), _decoder::release);
return _chunk.isLast() ? Content.Chunk.EOF : null;
}
finally

View File

@ -16,17 +16,15 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.thread.Scheduler;
public class MockConnector extends AbstractConnector
{
private static final ByteBufferPool BUFFER_POOL = new NullByteBufferPool();
private final Server _server;
public MockConnector(Server server)
{
super(server, server.getThreadPool(), server.getBean(Scheduler.class), BUFFER_POOL, 0);
super(server, server.getThreadPool(), server.getBean(Scheduler.class), ByteBufferPool.NOOP, 0);
_server = server;
}

View File

@ -18,7 +18,7 @@ import java.util.Arrays;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -45,7 +45,7 @@ public class BufferedResponseHandlerTest
public void setUp() throws Exception
{
_server = new Server();
_server.addBean(new NullByteBufferPool()); // Avoid giving larger buffers than requested
_server.addBean(new NoopByteBufferPool()); // Avoid giving larger buffers than requested
HttpConfiguration config = new HttpConfiguration();
config.setOutputBufferSize(1024);
config.setOutputAggregationSize(256);

View File

@ -29,11 +29,13 @@ package org.eclipse.jetty.util.thread;
*/
public interface Invocable
{
Runnable NOOP = () -> {};
ThreadLocal<Boolean> __nonBlocking = new ThreadLocal<>();
/**
* <p>The behavior of an {@link Invocable} when it is invoked.</p>
* <p>Typically, {@link Runnable}s or {@link Callback}s declare their
* <p>Typically, {@link Runnable}s or {@link org.eclipse.jetty.util.Callback}s declare their
* invocation type; this information is then used by the code that should
* invoke the {@code Runnable} or {@code Callback} to decide whether to
* invoke it directly, or submit it to a thread pool to be invoked by

View File

@ -21,7 +21,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
@ -42,7 +42,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
public BlockingQueue<ByteBuffer> binaryMessages = new LinkedBlockingDeque<>();
public BlockingQueue<String> events = new LinkedBlockingDeque<>();
private final ByteBufferPool bufferPool = new NullByteBufferPool();
private final ByteBufferPool bufferPool = new NoopByteBufferPool();
private final MethodHandle wholeTextHandle;
private final MethodHandle wholeBinaryHandle;
private MessageSink messageSink;

View File

@ -31,7 +31,7 @@ import jakarta.servlet.ServletException;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
@ -71,7 +71,7 @@ public class HttpOutputTest
_server = new Server();
_contextHandler = new ContextHandler(_server, "/");
_server.addBean(new NullByteBufferPool());
_server.addBean(new NoopByteBufferPool());
HttpConnectionFactory http = new HttpConnectionFactory();
http.getHttpConfiguration().setRequestHeaderSize(1024);

View File

@ -16,14 +16,12 @@ package org.eclipse.jetty.ee9.nested;
import java.io.IOException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.Scheduler;
public class MockConnector extends AbstractConnector
{
private static final ByteBufferPool BUFFER_POOL = new NullByteBufferPool();
private final Server _server;
public MockConnector()
@ -33,7 +31,7 @@ public class MockConnector extends AbstractConnector
public MockConnector(Server server)
{
super(server, server.getThreadPool(), server.getBean(Scheduler.class), BUFFER_POOL, 0);
super(server, server.getThreadPool(), server.getBean(Scheduler.class), ByteBufferPool.NOOP, 0);
_server = server;
}

View File

@ -21,7 +21,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.io.NoopByteBufferPool;
import org.eclipse.jetty.toolchain.test.Hex;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
@ -42,7 +42,7 @@ public class OutgoingMessageCapture extends CoreSession.Empty implements CoreSes
public BlockingQueue<ByteBuffer> binaryMessages = new LinkedBlockingDeque<>();
public BlockingQueue<String> events = new LinkedBlockingDeque<>();
private final ByteBufferPool bufferPool = new NullByteBufferPool();
private final ByteBufferPool bufferPool = new NoopByteBufferPool();
private final MethodHandle wholeTextHandle;
private final MethodHandle wholeBinaryHandle;
private MessageSink messageSink;