Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.1.x

# Conflicts:
#	jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
#	jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java
This commit is contained in:
gregw 2024-06-25 15:19:32 +10:00
commit d818a98cba
9 changed files with 223 additions and 260 deletions

View File

@ -36,7 +36,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ChunksContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
@ -476,7 +475,7 @@ public class MultiPart
public Content.Source newContentSource()
{
// TODO: use a ByteBuffer pool and direct ByteBuffers?
return new ByteChannelContentSource.PathContentSource(getPath());
return Content.Source.from(getPath());
}
@Override

View File

@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.IOResources;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.thread.AutoLock;
@ -164,14 +163,55 @@ public class MultiPartByteRanges
}
/**
* <p>A specialized {@link org.eclipse.jetty.io.content.ByteChannelContentSource.PathContentSource}
* whose content is sliced by a byte range.</p>
* <p>A specialized {@link Content.Source}
* whose {@link Path} content is sliced by a byte range.</p>
*
* @deprecated use {@link Content.Source#from(ByteBufferPool.Sized, Path, long, long)}
*/
public static class PathContentSource extends ByteChannelContentSource.PathContentSource
@Deprecated(forRemoval = true, since = "12.0.11")
public static class PathContentSource implements Content.Source
{
private final Content.Source contentSource;
public PathContentSource(Path path, ByteRange byteRange)
{
super(new ByteBufferPool.Sized(null), path, byteRange.first(), byteRange.getLength());
contentSource = Content.Source.from(null, path, byteRange.first(), byteRange.getLength());
}
@Override
public void demand(Runnable demandCallback)
{
contentSource.demand(demandCallback);
}
@Override
public void fail(Throwable failure)
{
contentSource.fail(failure);
}
@Override
public void fail(Throwable failure, boolean last)
{
contentSource.fail(failure, last);
}
@Override
public long getLength()
{
return contentSource.getLength();
}
@Override
public Content.Chunk read()
{
return contentSource.read();
}
@Override
public boolean rewind()
{
return contentSource.rewind();
}
}

View File

@ -47,6 +47,7 @@ import org.eclipse.jetty.util.BufferUtil;
public interface ByteBufferPool
{
ByteBufferPool NON_POOLING = new NonPooling();
ByteBufferPool.Sized SIZED_NON_POOLING = new Sized(ByteBufferPool.NON_POOLING);
/**
* <p>Acquires a {@link RetainableByteBuffer} from this pool.</p>

View File

@ -21,19 +21,24 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.ByteChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import org.eclipse.jetty.io.content.BufferedContentSink;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
import org.eclipse.jetty.io.content.ContentSinkSubscriber;
import org.eclipse.jetty.io.content.ContentSourceInputStream;
import org.eclipse.jetty.io.content.ContentSourcePublisher;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.io.internal.ByteBufferChunk;
import org.eclipse.jetty.io.internal.ByteChannelContentSource;
import org.eclipse.jetty.io.internal.ContentCopier;
import org.eclipse.jetty.io.internal.ContentSourceByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
@ -156,6 +161,137 @@ public class Content
*/
public interface Source
{
/**
* Create a {@code Content.Source} from zero or more {@link ByteBuffer}s
* @param byteBuffers The {@link ByteBuffer}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBuffer... byteBuffers)
{
return new ByteBufferContentSource(byteBuffers);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param path The {@link Path}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(Path path)
{
return from(null, path, 0, -1);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param path The {@link Path}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(Path path, long offset, long length)
{
return from(null, path, offset, length);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param path The {@link Path}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, Path path)
{
return from(byteBufferPool, path, 0, -1);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param path The {@link Path}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, Path path, long offset, long length)
{
return new ByteChannelContentSource.PathContentSource(byteBufferPool, path, offset, length);
}
/**
* Create a {@code Content.Source} from a {@link ByteChannel}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param byteChannel The {@link ByteChannel}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel)
{
return new ByteChannelContentSource(byteBufferPool, byteChannel);
}
/**
* Create a {@code Content.Source} from a {@link ByteChannel}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param seekableByteChannel The {@link ByteChannel}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length)
{
return new ByteChannelContentSource(byteBufferPool, seekableByteChannel, offset, length);
}
static Content.Source from(InputStream inputStream)
{
return from(null, inputStream);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param inputStream The {@link InputStream}s to use as the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, InputStream inputStream)
{
return new InputStreamContentSource(inputStream, byteBufferPool);
}
/**
* Create a {@code Content.Source} from a {@link Path}.
* @param byteBufferPool The {@link org.eclipse.jetty.io.ByteBufferPool.Sized} to use for any internal buffers.
* @param inputStream The {@link InputStream}s to use as the source.
* @param offset The offset in bytes from which to start the source
* @param length The length in bytes of the source.
* @return A {@code Content.Source}
*/
static Content.Source from(ByteBufferPool.Sized byteBufferPool, InputStream inputStream, long offset, long length)
{
return new InputStreamContentSource(inputStream, byteBufferPool)
{
private long skip = offset;
private long toRead = length;
@Override
protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer) throws IOException
{
if (skip > 0)
{
inputStream.skipNBytes(skip);
skip = 0;
}
if (toRead == 0)
return -1;
int toReadInt = (int)Math.min(Integer.MAX_VALUE, toRead);
int len = toReadInt > -1 ? Math.min(toReadInt, buffer.length) : buffer.length;
int read = inputStream.read(buffer, 0, len);
toRead -= read;
return read;
}
};
}
/**
* <p>Reads, non-blocking, the whole content source into a {@link ByteBuffer}.</p>
*

View File

@ -21,7 +21,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -148,7 +147,7 @@ public class IOResources
Path path = resource.getPath();
if (path != null)
{
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, 0, -1);
}
if (resource instanceof MemoryResource memoryResource)
{
@ -192,12 +191,12 @@ public class IOResources
Path path = resource.getPath();
if (path != null)
{
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length);
}
// Try an optimization for MemoryResource.
if (resource instanceof MemoryResource memoryResource)
return new ByteBufferContentSource(ByteBuffer.wrap(memoryResource.getBytes()));
return Content.Source.from(ByteBuffer.wrap(memoryResource.getBytes()));
// Fallback to InputStream.
try
@ -205,7 +204,7 @@ public class IOResources
InputStream inputStream = resource.newInputStream();
if (inputStream == null)
throw new IllegalArgumentException("Resource does not support InputStream: " + resource);
return new RangedInputStreamContentSource(inputStream, new ByteBufferPool.Sized(bufferPool, direct, bufferSize), first, length);
return Content.Source.from(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), inputStream, first, length);
}
catch (IOException e)
{
@ -422,33 +421,4 @@ public class IOResources
super.onCompleteFailure(x);
}
}
/**
* <p>A specialized {@link InputStreamContentSource}
* whose content is sliced by a byte range.</p>
*/
private static class RangedInputStreamContentSource extends InputStreamContentSource
{
private long toRead;
public RangedInputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool, long first, long length) throws IOException
{
super(inputStream, bufferPool);
inputStream.skipNBytes(first);
// TODO perform sanity checks on length?
this.toRead = length;
}
@Override
protected int fillBufferFromInputStream(InputStream inputStream, byte[] buffer) throws IOException
{
if (toRead == 0)
return -1;
int toReadInt = (int)Math.min(Integer.MAX_VALUE, toRead);
int len = toReadInt > -1 ? Math.min(toReadInt, buffer.length) : buffer.length;
int read = inputStream.read(buffer, 0, len);
toRead -= read;
return read;
}
}
}

View File

@ -47,7 +47,7 @@ public class InputStreamContentSource implements Content.Source
public InputStreamContentSource(InputStream inputStream)
{
this(inputStream, new ByteBufferPool.Sized(null));
this(inputStream, null);
}
public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool)
@ -58,7 +58,7 @@ public class InputStreamContentSource implements Content.Source
public InputStreamContentSource(InputStream inputStream, ByteBufferPool.Sized bufferPool)
{
this.inputStream = Objects.requireNonNull(inputStream);
this.bufferPool = Objects.requireNonNull(bufferPool);
this.bufferPool = Objects.requireNonNullElse(bufferPool, ByteBufferPool.SIZED_NON_POOLING);
}
public int getBufferSize()

View File

@ -13,251 +13,73 @@
package org.eclipse.jetty.io.content;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>A {@link Content.Source} that provides the file content of the passed {@link Path}.</p>
*/
public class PathContentSource implements Content.Source
{
// TODO in 12.1.x reimplement this class based on ByteChannelContentSource
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final Path path;
private final long length;
private final ByteBufferPool byteBufferPool;
private int bufferSize;
private boolean useDirectByteBuffers;
private SeekableByteChannel channel;
private long totalRead;
private Runnable demandCallback;
private Content.Chunk errorChunk;
private final Path _path;
private final Content.Source _source;
public PathContentSource(Path path)
{
this(path, null, true, -1);
this(path, null);
}
public PathContentSource(Path path, ByteBufferPool byteBufferPool)
{
this(path,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getWrapped() : byteBufferPool,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.isDirect() : true,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getSize() : -1);
this (path, byteBufferPool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(byteBufferPool));
}
public PathContentSource(Path path, ByteBufferPool.Sized sizedBufferPool)
{
this(path,
sizedBufferPool == null ? null : sizedBufferPool.getWrapped(),
sizedBufferPool == null ? true : sizedBufferPool.isDirect(),
sizedBufferPool == null ? -1 : sizedBufferPool.getSize());
}
private PathContentSource(Path path, ByteBufferPool byteBufferPool, boolean direct, int bufferSize)
{
try
{
if (!Files.isRegularFile(path))
throw new NoSuchFileException(path.toString());
if (!Files.isReadable(path))
throw new AccessDeniedException(path.toString());
this.path = path;
this.length = Files.size(path);
this.byteBufferPool = byteBufferPool != null ? byteBufferPool : ByteBufferPool.NON_POOLING;
this.useDirectByteBuffers = direct;
this.bufferSize = bufferSize > 0 ? bufferSize : 4096;
}
catch (IOException x)
{
throw new UncheckedIOException(x);
}
_path = path;
_source = Content.Source.from(sizedBufferPool, path);
}
public Path getPath()
{
return path;
}
@Override
public long getLength()
{
return length;
}
public int getBufferSize()
{
return bufferSize;
}
/**
* @param bufferSize The size of the buffer
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true)
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
}
public boolean isUseDirectByteBuffers()
{
return useDirectByteBuffers;
}
/**
* @param useDirectByteBuffers {@code true} if direct buffers should be used
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true)
public void setUseDirectByteBuffers(boolean useDirectByteBuffers)
{
this.useDirectByteBuffers = useDirectByteBuffers;
}
@Override
public Content.Chunk read()
{
SeekableByteChannel channel;
try (AutoLock ignored = lock.lock())
{
if (errorChunk != null)
return errorChunk;
if (this.channel == null)
{
try
{
this.channel = open();
}
catch (Throwable x)
{
return failure(x);
}
}
channel = this.channel;
}
if (!channel.isOpen())
return Content.Chunk.EOF;
RetainableByteBuffer retainableByteBuffer = byteBufferPool.acquire(getBufferSize(), isUseDirectByteBuffers());
ByteBuffer byteBuffer = retainableByteBuffer.getByteBuffer();
int read;
try
{
BufferUtil.clearToFill(byteBuffer);
read = read(channel, byteBuffer);
BufferUtil.flipToFlush(byteBuffer, 0);
}
catch (Throwable x)
{
retainableByteBuffer.release();
return failure(x);
}
if (read > 0)
totalRead += read;
boolean last = read == -1 || isReadComplete(totalRead);
if (last)
IO.close(channel);
return Content.Chunk.asChunk(byteBuffer, last, retainableByteBuffer);
}
protected SeekableByteChannel open() throws IOException
{
return Files.newByteChannel(path, StandardOpenOption.READ);
}
protected int read(SeekableByteChannel channel, ByteBuffer byteBuffer) throws IOException
{
return channel.read(byteBuffer);
}
protected boolean isReadComplete(long read)
{
return read == getLength();
return _path;
}
@Override
public void demand(Runnable demandCallback)
{
try (AutoLock ignored = lock.lock())
{
if (this.demandCallback != null)
throw new IllegalStateException("demand pending");
this.demandCallback = demandCallback;
}
invoker.run(this::invokeDemandCallback);
}
private void invokeDemandCallback()
{
Runnable demandCallback;
try (AutoLock ignored = lock.lock())
{
demandCallback = this.demandCallback;
this.demandCallback = null;
}
if (demandCallback != null)
ExceptionUtil.run(demandCallback, this::fail);
_source.demand(demandCallback);
}
@Override
public void fail(Throwable failure)
{
failure(failure);
_source.fail(failure);
}
private Content.Chunk failure(Throwable failure)
@Override
public void fail(Throwable failure, boolean last)
{
try (AutoLock ignored = lock.lock())
{
if (errorChunk == null)
{
errorChunk = Content.Chunk.from(failure);
IO.close(channel);
}
return errorChunk;
}
// Demands are always serviced immediately so there is no
// need to ask the invoker to run invokeDemandCallback here.
_source.fail(failure, last);
}
@Override
public long getLength()
{
return _source.getLength();
}
@Override
public Content.Chunk read()
{
return _source.read();
}
@Override
public boolean rewind()
{
try (AutoLock ignored = lock.lock())
{
IO.close(channel);
channel = null;
totalRead = 0;
demandCallback = null;
errorChunk = null;
}
return true;
return _source.rewind();
}
}

View File

@ -11,7 +11,7 @@
// ========================================================================
//
package org.eclipse.jetty.io.content;
package org.eclipse.jetty.io.internal;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -51,7 +51,7 @@ public class ByteChannelContentSource implements Content.Source
public ByteChannelContentSource(SeekableByteChannel seekableByteChannel, long offset, long length)
{
this(new ByteBufferPool.Sized(null), seekableByteChannel, offset, length);
this(null, seekableByteChannel, offset, length);
}
public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length)
@ -73,7 +73,7 @@ public class ByteChannelContentSource implements Content.Source
public ByteChannelContentSource(ByteChannel byteChannel)
{
this(new ByteBufferPool.Sized(null), byteChannel, -1L, -1L);
this(null, byteChannel, -1L, -1L);
}
public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel)
@ -83,7 +83,7 @@ public class ByteChannelContentSource implements Content.Source
private ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel, long offset, long length)
{
_byteBufferPool = Objects.requireNonNull(byteBufferPool);
_byteBufferPool = Objects.requireNonNullElse(byteBufferPool, ByteBufferPool.SIZED_NON_POOLING);
_byteChannel = byteChannel;
_offset = offset < 0 ? 0 : offset;
_length = length;
@ -246,28 +246,24 @@ public class ByteChannelContentSource implements Content.Source
/**
* A {@link ByteChannelContentSource} for a {@link Path}
* @deprecated To be replaced by an updated {@link org.eclipse.jetty.io.content.PathContentSource} in 12.1.0
*/
@Deprecated(forRemoval = true, since = "12.0.11")
public static class PathContentSource extends ByteChannelContentSource
{
private final Path _path;
public PathContentSource(Path path)
{
super(new ByteBufferPool.Sized(null), null, 0, size(path));
_path = path;
this(null, path, 0, -1);
}
public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path)
{
super(byteBufferPool, null, 0, size(path));
_path = path;
this(byteBufferPool, path, 0, -1);
}
public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path, long offset, long length)
{
super(byteBufferPool, null, offset, length);
super(byteBufferPool, null, offset, length < 0 ? size(path) : length);
_path = path;
}

View File

@ -38,11 +38,11 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ContentSourceInputStream;
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.io.content.PathContentSource;
import org.eclipse.jetty.io.internal.ByteChannelContentSource;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -141,8 +141,7 @@ public class ContentSourceTest
Files.writeString(path0123, "zeroonetwothree", StandardOpenOption.CREATE, StandardOpenOption.WRITE);
PathContentSource path0 = new PathContentSource(path12, byteBufferPool);
PathContentSource path1 = new PathContentSource(path12, byteBufferPool);
path1.setBufferSize(3);
PathContentSource path1 = new PathContentSource(path12, new ByteBufferPool.Sized(byteBufferPool, false, 3));
InputStreamContentSource inputSource = new InputStreamContentSource(new ByteArrayInputStream("onetwo".getBytes(UTF_8)));
InputStreamContentSource inputSource2 =