Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.1.x

This commit is contained in:
gregw 2024-06-25 09:32:38 +10:00
commit 4228f8e76b
17 changed files with 818 additions and 227 deletions

View File

@ -47,13 +47,17 @@ public class PathRequestContent extends PathContentSource implements Request.Con
public PathRequestContent(String contentType, Path filePath, int bufferSize) throws IOException
{
this(contentType, filePath, null);
setBufferSize(bufferSize);
this(contentType, filePath, new ByteBufferPool.Sized(null, false, bufferSize));
}
public PathRequestContent(String contentType, Path filePath, ByteBufferPool bufferPool) throws IOException
{
super(filePath, bufferPool);
this(contentType, filePath, bufferPool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(bufferPool));
}
public PathRequestContent(String contentType, Path filePath, ByteBufferPool.Sized sizedBufferPool)
{
super(filePath, sizedBufferPool);
this.contentType = contentType;
}

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.MultiPart;
import org.eclipse.jetty.http.MultiPartFormData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
@ -285,8 +286,7 @@ public class MultiPartRequestContentTest extends AbstractHttpClientServerTest
});
MultiPartRequestContent multiPart = new MultiPartRequestContent();
PathRequestContent content = new PathRequestContent(contentType, tmpPath, client.getByteBufferPool());
content.setUseDirectByteBuffers(client.isUseOutputDirectByteBuffers());
PathRequestContent content = new PathRequestContent(contentType, tmpPath, new ByteBufferPool.Sized(client.getByteBufferPool(), client.isUseOutputDirectByteBuffers(), -1));
multiPart.addPart(new MultiPart.ContentSourcePart(name, tmpPath.getFileName().toString(), null, content));
multiPart.close();
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())

View File

@ -36,8 +36,8 @@ import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ChunksContentSource;
import org.eclipse.jetty.io.content.PathContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.QuotedStringTokenizer;
@ -306,7 +306,7 @@ public class MultiPart
Path path = getPath();
if (path != null)
{
Files.delete(path);
Files.deleteIfExists(path);
try (AutoLock ignored = lock.lock())
{
this.path = null;
@ -338,7 +338,7 @@ public class MultiPart
if (source != null)
source.fail(t);
if (path != null)
Files.delete(path);
Files.deleteIfExists(path);
}
catch (Throwable x)
{
@ -475,7 +475,8 @@ public class MultiPart
@Override
public Content.Source newContentSource()
{
return new PathContentSource(getPath());
// TODO: use a ByteBuffer pool and direct ByteBuffers?
return new ByteChannelContentSource.PathContentSource(getPath());
}
@Override

View File

@ -15,8 +15,6 @@ package org.eclipse.jetty.http;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
@ -26,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.IOResources;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.thread.AutoLock;
@ -165,46 +164,14 @@ public class MultiPartByteRanges
}
/**
* <p>A specialized {@link org.eclipse.jetty.io.content.PathContentSource}
* <p>A specialized {@link org.eclipse.jetty.io.content.ByteChannelContentSource.PathContentSource}
* whose content is sliced by a byte range.</p>
*/
public static class PathContentSource extends org.eclipse.jetty.io.content.PathContentSource
public static class PathContentSource extends ByteChannelContentSource.PathContentSource
{
private final ByteRange byteRange;
private long toRead;
public PathContentSource(Path path, ByteRange byteRange)
{
super(path);
this.byteRange = byteRange;
}
@Override
protected SeekableByteChannel open() throws IOException
{
SeekableByteChannel channel = super.open();
channel.position(byteRange.first());
toRead = byteRange.getLength();
return channel;
}
@Override
protected int read(SeekableByteChannel channel, ByteBuffer byteBuffer) throws IOException
{
int read = super.read(channel, byteBuffer);
if (read <= 0)
return read;
read = (int)Math.min(read, toRead);
toRead -= read;
byteBuffer.position(read);
return read;
}
@Override
protected boolean isReadComplete(long read)
{
return read == byteRange.getLength();
super(new ByteBufferPool.Sized(null), path, byteRange.first(), byteRange.getLength());
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.util.BufferUtil;
@ -92,6 +93,56 @@ public interface ByteBufferPool
}
}
/**
* A ByteBufferPool with an additional no-args {@link #acquire()} method to obtain a buffer of a
* preconfigured specific size and type.
*/
class Sized extends Wrapper
{
private final boolean _direct;
private final int _size;
/**
* Create a sized pool for non direct buffers of a default size from a wrapped pool.
* @param wrapped The actual {@link ByteBufferPool}
*/
public Sized(ByteBufferPool wrapped)
{
this(wrapped, false, -1);
}
/**
* Create a sized pool for a give directness and size from a wrapped pool.
* @param wrapped The actual {@link ByteBufferPool}
* @param direct {@code true} for direct buffers.
* @param size The specified size in bytes of the buffer, or -1 for a default
*/
public Sized(ByteBufferPool wrapped, boolean direct, int size)
{
super(Objects.requireNonNullElse(wrapped, NON_POOLING));
_direct = direct;
_size = size > 0 ? size : 4096;
}
public boolean isDirect()
{
return _direct;
}
public int getSize()
{
return _size;
}
/**
* @return A {@link RetainableByteBuffer} suitable for the specified preconfigured size and type.
*/
public RetainableByteBuffer acquire()
{
return getWrapped().acquire(_size, _direct);
}
}
/**
* <p>A {@link ByteBufferPool} that does not pool its
* {@link RetainableByteBuffer}s.</p>

View File

@ -21,8 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.InputStreamContentSource;
import org.eclipse.jetty.io.content.PathContentSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
@ -148,13 +148,7 @@ public class IOResources
Path path = resource.getPath();
if (path != null)
{
PathContentSource pathContentSource = new PathContentSource(path, bufferPool);
if (bufferSize > 0)
{
pathContentSource.setBufferSize(bufferSize);
pathContentSource.setUseDirectByteBuffers(direct);
}
return pathContentSource;
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path);
}
if (resource instanceof MemoryResource memoryResource)
{
@ -198,13 +192,7 @@ public class IOResources
Path path = resource.getPath();
if (path != null)
{
RangedPathContentSource contentSource = new RangedPathContentSource(path, bufferPool, first, length);
if (bufferSize > 0)
{
contentSource.setBufferSize(bufferSize);
contentSource.setUseDirectByteBuffers(direct);
}
return contentSource;
return new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(bufferPool, direct, bufferSize), path, first, length);
}
// Try an optimization for MemoryResource.
@ -217,13 +205,7 @@ public class IOResources
InputStream inputStream = resource.newInputStream();
if (inputStream == null)
throw new IllegalArgumentException("Resource does not support InputStream: " + resource);
RangedInputStreamContentSource contentSource = new RangedInputStreamContentSource(inputStream, bufferPool, first, length);
if (bufferSize > 0)
{
contentSource.setBufferSize(bufferSize);
contentSource.setUseDirectByteBuffers(direct);
}
return contentSource;
return new RangedInputStreamContentSource(inputStream, new ByteBufferPool.Sized(bufferPool, direct, bufferSize), first, length);
}
catch (IOException e)
{
@ -441,57 +423,6 @@ public class IOResources
}
}
/**
* <p>A specialized {@link PathContentSource}
* whose content is sliced by a byte range.</p>
*/
private static class RangedPathContentSource extends PathContentSource
{
private final long first;
private final long length;
private long toRead;
public RangedPathContentSource(Path path, ByteBufferPool bufferPool, long first, long length)
{
super(path, bufferPool);
// TODO perform sanity checks on first and length?
this.first = first;
this.length = length;
}
@Override
protected SeekableByteChannel open() throws IOException
{
SeekableByteChannel channel = super.open();
if (first > -1)
channel.position(first);
toRead = length;
return channel;
}
@Override
protected int read(SeekableByteChannel channel, ByteBuffer byteBuffer) throws IOException
{
int read = super.read(channel, byteBuffer);
if (read <= 0)
return read;
read = (int)Math.min(read, toRead);
if (read > -1)
{
toRead -= read;
byteBuffer.position(read);
}
return read;
}
@Override
protected boolean isReadComplete(long read)
{
return read == length;
}
}
/**
* <p>A specialized {@link InputStreamContentSource}
* whose content is sliced by a byte range.</p>

View File

@ -0,0 +1,297 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io.content;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
* <p>A {@link Content.Source} backed by a {@link ByteChannel}.
* Any calls to {@link #demand(Runnable)} are immediately satisfied.</p>
*/
public class ByteChannelContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker();
private final ByteBufferPool.Sized _byteBufferPool;
private ByteChannel _byteChannel;
private final long _offset;
private final long _length;
private RetainableByteBuffer _buffer;
private long _totalRead;
private Runnable demandCallback;
private Content.Chunk _terminal;
public ByteChannelContentSource(SeekableByteChannel seekableByteChannel, long offset, long length)
{
this(new ByteBufferPool.Sized(null), seekableByteChannel, offset, length);
}
public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, SeekableByteChannel seekableByteChannel, long offset, long length)
{
this(byteBufferPool, (ByteChannel)seekableByteChannel, offset, length);
if (offset >= 0 && seekableByteChannel != null)
{
try
{
seekableByteChannel.position(offset);
}
catch (IOException e)
{
// lock not needed in constructor
lockedSetTerminal(Content.Chunk.from(e, true));
}
}
}
public ByteChannelContentSource(ByteChannel byteChannel)
{
this(new ByteBufferPool.Sized(null), byteChannel, -1L, -1L);
}
public ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel)
{
this(byteBufferPool, byteChannel, -1L, -1L);
}
private ByteChannelContentSource(ByteBufferPool.Sized byteBufferPool, ByteChannel byteChannel, long offset, long length)
{
_byteBufferPool = Objects.requireNonNull(byteBufferPool);
_byteChannel = byteChannel;
_offset = offset < 0 ? 0 : offset;
_length = length;
}
protected ByteChannel open() throws IOException
{
return _byteChannel;
}
@Override
public void demand(Runnable demandCallback)
{
try (AutoLock ignored = lock.lock())
{
if (this.demandCallback != null)
throw new IllegalStateException("demand pending");
this.demandCallback = demandCallback;
}
_invoker.run(this::invokeDemandCallback);
}
private void invokeDemandCallback()
{
Runnable demandCallback;
try (AutoLock ignored = lock.lock())
{
demandCallback = this.demandCallback;
this.demandCallback = null;
}
if (demandCallback != null)
ExceptionUtil.run(demandCallback, this::fail);
}
protected void lockedSetTerminal(Content.Chunk terminal)
{
if (_terminal == null)
_terminal = Objects.requireNonNull(terminal);
else
ExceptionUtil.addSuppressedIfNotAssociated(_terminal.getFailure(), terminal.getFailure());
IO.close(_byteChannel);
if (_buffer != null)
_buffer.release();
_buffer = null;
}
private void lockedEnsureOpenOrTerminal()
{
if (_terminal == null && (_byteChannel == null || !_byteChannel.isOpen()))
{
try
{
_byteChannel = open();
if (_byteChannel == null || !_byteChannel.isOpen())
lockedSetTerminal(Content.Chunk.from(new ClosedChannelException(), true));
else if (_offset >= 0 && _byteChannel instanceof SeekableByteChannel seekableByteChannel)
seekableByteChannel.position(_offset);
}
catch (IOException e)
{
lockedSetTerminal(Content.Chunk.from(e, true));
}
}
}
@Override
public Content.Chunk read()
{
try (AutoLock ignored = lock.lock())
{
lockedEnsureOpenOrTerminal();
if (_terminal != null)
return _terminal;
if (_buffer == null)
{
_buffer = _byteBufferPool.acquire();
}
else if (_buffer.isRetained())
{
_buffer.release();
_buffer = _byteBufferPool.acquire();
}
try
{
ByteBuffer byteBuffer = _buffer.getByteBuffer();
BufferUtil.clearToFill(byteBuffer);
if (_length >= 0)
byteBuffer.limit((int)Math.min(_buffer.capacity(), _length - _totalRead));
int read = _byteChannel.read(byteBuffer);
BufferUtil.flipToFlush(byteBuffer, 0);
if (read == 0)
return null;
if (read > 0)
{
_totalRead += read;
_buffer.retain();
if (_length < 0 || _totalRead < _length)
return Content.Chunk.asChunk(byteBuffer, false, _buffer);
Content.Chunk last = Content.Chunk.asChunk(byteBuffer, true, _buffer);
lockedSetTerminal(Content.Chunk.EOF);
return last;
}
lockedSetTerminal(Content.Chunk.EOF);
}
catch (Throwable t)
{
lockedSetTerminal(Content.Chunk.from(t, true));
}
}
return _terminal;
}
@Override
public void fail(Throwable failure)
{
try (AutoLock ignored = lock.lock())
{
lockedSetTerminal(Content.Chunk.from(failure, true));
}
}
@Override
public long getLength()
{
return _length;
}
@Override
public boolean rewind()
{
try (AutoLock ignored = lock.lock())
{
// We can remove terminal condition for a rewind that is likely to occur
if (_terminal != null && !Content.Chunk.isFailure(_terminal) && (_byteChannel == null || _byteChannel instanceof SeekableByteChannel))
_terminal = null;
lockedEnsureOpenOrTerminal();
if (_terminal != null || _byteChannel == null || !_byteChannel.isOpen())
return false;
if (_offset >= 0 && _byteChannel instanceof SeekableByteChannel seekableByteChannel)
{
try
{
seekableByteChannel.position(_offset);
_totalRead = 0;
return true;
}
catch (Throwable t)
{
lockedSetTerminal(Content.Chunk.from(t, true));
}
}
return false;
}
}
/**
* A {@link ByteChannelContentSource} for a {@link Path}
* @deprecated To be replaced by an updated {@link org.eclipse.jetty.io.content.PathContentSource} in 12.1.0
*/
@Deprecated(forRemoval = true, since = "12.0.11")
public static class PathContentSource extends ByteChannelContentSource
{
private final Path _path;
public PathContentSource(Path path)
{
super(new ByteBufferPool.Sized(null), null, 0, size(path));
_path = path;
}
public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path)
{
super(byteBufferPool, null, 0, size(path));
_path = path;
}
public PathContentSource(ByteBufferPool.Sized byteBufferPool, Path path, long offset, long length)
{
super(byteBufferPool, null, offset, length);
_path = path;
}
public Path getPath()
{
return _path;
}
@Override
protected ByteChannel open() throws IOException
{
return Files.newByteChannel(_path, StandardOpenOption.READ);
}
private static long size(Path path)
{
try
{
return Files.size(path);
}
catch (IOException e)
{
return -1L;
}
}
}
}

View File

@ -20,6 +20,7 @@ import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
@ -111,19 +112,7 @@ public class ChunksContentSource implements Content.Source
this.demandCallback = null;
}
if (demandCallback != null)
runDemandCallback(demandCallback);
}
private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}
@Override

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.io.content;
import java.util.Objects;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.SerializedInvoker;
/**
@ -114,19 +115,7 @@ public abstract class ContentSourceTransformer implements Content.Source
Runnable demandCallback = this.demandCallback;
this.demandCallback = null;
if (demandCallback != null)
runDemandCallback(demandCallback);
}
private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}
private Content.Chunk process(Content.Chunk rawChunk)

View File

@ -21,6 +21,7 @@ import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
@ -39,44 +40,65 @@ public class InputStreamContentSource implements Content.Source
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final InputStream inputStream;
private final ByteBufferPool bufferPool;
private int bufferSize = 4096;
private boolean useDirectByteBuffers;
private ByteBufferPool.Sized bufferPool;
private Runnable demandCallback;
private Content.Chunk errorChunk;
private boolean closed;
public InputStreamContentSource(InputStream inputStream)
{
this(inputStream, null);
this(inputStream, new ByteBufferPool.Sized(null));
}
public InputStreamContentSource(InputStream inputStream, ByteBufferPool bufferPool)
{
this(inputStream, bufferPool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(bufferPool));
}
public InputStreamContentSource(InputStream inputStream, ByteBufferPool.Sized bufferPool)
{
this.inputStream = Objects.requireNonNull(inputStream);
this.bufferPool = bufferPool != null ? bufferPool : ByteBufferPool.NON_POOLING;
this.bufferPool = Objects.requireNonNull(bufferPool);
}
public int getBufferSize()
{
return bufferSize;
return bufferPool.getSize();
}
/**
* @param bufferSize The size of the buffer
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true)
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
try (AutoLock ignored = lock.lock())
{
if (bufferSize != bufferPool.getSize())
bufferPool = new ByteBufferPool.Sized(bufferPool.getWrapped(), bufferPool.isDirect(), bufferSize);
}
}
public boolean isUseDirectByteBuffers()
{
return useDirectByteBuffers;
return bufferPool.isDirect();
}
/**
* @param useDirectByteBuffers {@code true} if direct buffers will be used.
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true, since = "12.0.11")
public void setUseDirectByteBuffers(boolean useDirectByteBuffers)
{
this.useDirectByteBuffers = useDirectByteBuffers;
try (AutoLock ignored = lock.lock())
{
if (useDirectByteBuffers != bufferPool.isDirect())
bufferPool = new ByteBufferPool.Sized(bufferPool.getWrapped(), useDirectByteBuffers, bufferPool.getSize());
}
}
@Override
public Content.Chunk read()
{
@ -88,7 +110,7 @@ public class InputStreamContentSource implements Content.Source
return Content.Chunk.EOF;
}
RetainableByteBuffer streamBuffer = bufferPool.acquire(getBufferSize(), useDirectByteBuffers);
RetainableByteBuffer streamBuffer = bufferPool.acquire();
try
{
ByteBuffer buffer = streamBuffer.getByteBuffer();
@ -147,19 +169,7 @@ public class InputStreamContentSource implements Content.Source
this.demandCallback = null;
}
if (demandCallback != null)
runDemandCallback(demandCallback);
}
private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}
@Override

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.io.content;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
@ -27,6 +28,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
@ -36,13 +38,15 @@ import org.eclipse.jetty.util.thread.SerializedInvoker;
*/
public class PathContentSource implements Content.Source
{
// TODO in 12.1.x reimplement this class based on ByteChannelContentSource
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final Path path;
private final long length;
private final ByteBufferPool byteBufferPool;
private int bufferSize = 4096;
private boolean useDirectByteBuffers = true;
private int bufferSize;
private boolean useDirectByteBuffers;
private SeekableByteChannel channel;
private long totalRead;
private Runnable demandCallback;
@ -50,10 +54,26 @@ public class PathContentSource implements Content.Source
public PathContentSource(Path path)
{
this(path, null);
this(path, null, true, -1);
}
public PathContentSource(Path path, ByteBufferPool byteBufferPool)
{
this(path,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getWrapped() : byteBufferPool,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.isDirect() : true,
byteBufferPool instanceof ByteBufferPool.Sized sized ? sized.getSize() : -1);
}
public PathContentSource(Path path, ByteBufferPool.Sized sizedBufferPool)
{
this(path,
sizedBufferPool == null ? null : sizedBufferPool.getWrapped(),
sizedBufferPool == null ? true : sizedBufferPool.isDirect(),
sizedBufferPool == null ? -1 : sizedBufferPool.getSize());
}
private PathContentSource(Path path, ByteBufferPool byteBufferPool, boolean direct, int bufferSize)
{
try
{
@ -63,7 +83,10 @@ public class PathContentSource implements Content.Source
throw new AccessDeniedException(path.toString());
this.path = path;
this.length = Files.size(path);
this.byteBufferPool = byteBufferPool != null ? byteBufferPool : ByteBufferPool.NON_POOLING;
this.useDirectByteBuffers = direct;
this.bufferSize = bufferSize > 0 ? bufferSize : 4096;
}
catch (IOException x)
{
@ -87,6 +110,11 @@ public class PathContentSource implements Content.Source
return bufferSize;
}
/**
* @param bufferSize The size of the buffer
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true)
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
@ -97,6 +125,11 @@ public class PathContentSource implements Content.Source
return useDirectByteBuffers;
}
/**
* @param useDirectByteBuffers {@code true} if direct buffers should be used
* @deprecated Use {@link InputStreamContentSource#InputStreamContentSource(InputStream, ByteBufferPool.Sized)}
*/
@Deprecated(forRemoval = true)
public void setUseDirectByteBuffers(boolean useDirectByteBuffers)
{
this.useDirectByteBuffers = useDirectByteBuffers;
@ -190,19 +223,7 @@ public class PathContentSource implements Content.Source
this.demandCallback = null;
}
if (demandCallback != null)
runDemandCallback(demandCallback);
}
private void runDemandCallback(Runnable demandCallback)
{
try
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
}
ExceptionUtil.run(demandCallback, this::fail);
}
@Override

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.io.internal;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,6 +47,9 @@ public class ContentCopier extends IteratingNestedCallback
@Override
protected Action process() throws Throwable
{
if (current != null)
current.release();
if (terminated)
return Action.SUCCEEDED;
@ -53,36 +57,33 @@ public class ContentCopier extends IteratingNestedCallback
if (current == null)
{
source.demand(this::iterate);
return Action.IDLE;
source.demand(this::succeeded);
return Action.SCHEDULED;
}
if (chunkProcessor != null && chunkProcessor.process(current, this))
return Action.SCHEDULED;
terminated = current.isLast();
if (Content.Chunk.isFailure(current))
throw current.getFailure();
{
failed(current.getFailure());
return Action.SCHEDULED;
}
sink.write(current.isLast(), current.getByteBuffer(), this);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
terminated = current.isLast();
current.release();
current = null;
super.succeeded();
}
@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable x)
{
if (current != null)
{
current.release();
current = null;
source.fail(x);
super.failed(x);
current = Content.Chunk.next(current);
}
ExceptionUtil.callAndThen(x, source::fail, super::onCompleteFailure);
}
}

View File

@ -22,6 +22,8 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CancellationException;
@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ByteChannelContentSource;
import org.eclipse.jetty.io.content.ContentSourceInputStream;
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.io.content.InputStreamContentSource;
@ -47,6 +50,9 @@ import org.eclipse.jetty.util.CompletableTask;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -54,6 +60,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@ -67,7 +74,40 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ContentSourceTest
{
private static ArrayByteBufferPool.Tracking byteBufferPool;
@BeforeEach
public void beforeEach()
{
byteBufferPool = new ArrayByteBufferPool.Tracking();
}
@AfterEach
public void afterEach()
{
if (!byteBufferPool.getLeaks().isEmpty())
byteBufferPool.dumpLeaks();
assertThat(byteBufferPool.getLeaks(), empty());
byteBufferPool.clear();
byteBufferPool = null;
}
public static List<Content.Source> all() throws Exception
{
return sources("all");
}
public static List<Content.Source> multi() throws Exception
{
return sources("multi");
}
public static List<Content.Source> rewind() throws Exception
{
return sources("rewind");
}
private static List<Content.Source> sources(String mode) throws Exception
{
AsyncContent asyncSource = new AsyncContent();
try (asyncSource)
@ -95,17 +135,61 @@ public class ContentSourceTest
Path tmpDir = MavenTestingUtils.getTargetTestingPath();
Files.createDirectories(tmpDir);
Path path = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt");
Files.writeString(path, "onetwo", StandardOpenOption.CREATE, StandardOpenOption.WRITE);
PathContentSource pathSource = new PathContentSource(path);
pathSource.setBufferSize(3);
Path path12 = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt");
Files.writeString(path12, "onetwo", StandardOpenOption.CREATE, StandardOpenOption.WRITE);
Path path0123 = Files.createTempFile(tmpDir, ContentSourceTest.class.getSimpleName(), ".txt");
Files.writeString(path0123, "zeroonetwothree", StandardOpenOption.CREATE, StandardOpenOption.WRITE);
PathContentSource path0 = new PathContentSource(path12, byteBufferPool);
PathContentSource path1 = new PathContentSource(path12, byteBufferPool);
path1.setBufferSize(3);
InputStreamContentSource inputSource = new InputStreamContentSource(new ByteArrayInputStream("onetwo".getBytes(UTF_8)));
InputStreamContentSource inputSource2 =
new InputStreamContentSource(new ContentSourceInputStream(new ByteBufferContentSource(UTF_8.encode("one"), UTF_8.encode("two"))));
return List.of(asyncSource, byteBufferSource, transformerSource, pathSource, inputSource, inputSource2);
ByteChannelContentSource bccs0 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), Files.newByteChannel(path12, StandardOpenOption.READ));
ByteChannelContentSource bccs1 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 4096), Files.newByteChannel(path12, StandardOpenOption.READ), 0, 6);
ByteChannelContentSource bccs2 = new ByteChannelContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 8192), Files.newByteChannel(path0123, StandardOpenOption.READ), 4, 6);
ByteChannelContentSource bccs3 = new ByteChannelContentSource(new ByteBufferPool.Sized(null, false, 3), Files.newByteChannel(path0123, StandardOpenOption.READ), 4, 6);
ByteChannelContentSource.PathContentSource pcs0 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), path12);
ByteChannelContentSource.PathContentSource pcs1 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(byteBufferPool, false, 1024), path0123, 4, 6);
ByteChannelContentSource.PathContentSource pcs2 = new ByteChannelContentSource.PathContentSource(new ByteBufferPool.Sized(null, false, 3), path12);
return switch (mode)
{
case "rewind" -> List.of(
byteBufferSource,
path1,
bccs3,
pcs2);
case "multi" -> List.of(
asyncSource,
byteBufferSource,
transformerSource,
path1,
inputSource,
inputSource2,
bccs3,
pcs2);
case "all" -> List.of(
asyncSource,
byteBufferSource,
transformerSource,
path0,
path1,
inputSource,
inputSource2,
bccs0,
bccs1,
bccs2,
bccs3,
pcs0,
pcs1,
pcs2);
default -> Collections.emptyList();
};
}
/**
@ -171,6 +255,104 @@ public class ContentSourceTest
assertThat(builder.toString(), is("onetwo"));
}
@ParameterizedTest
@MethodSource("rewind")
public void testReadRewindReadAll(Content.Source source) throws Exception
{
StringBuilder builder = new StringBuilder();
var task = new CompletableTask<>()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk == null)
{
source.demand(this);
break;
}
if (chunk.hasRemaining() && builder.isEmpty())
assertTrue(source.rewind());
if (chunk.hasRemaining())
builder.append(BufferUtil.toString(chunk.getByteBuffer()));
chunk.release();
if (chunk.isLast())
{
complete(null);
break;
}
}
}
};
source.demand(task);
task.get(10, TimeUnit.SECONDS);
assertThat(builder.toString(), is("oneonetwo"));
}
@ParameterizedTest
@MethodSource("rewind")
public void testReadAllRewindReadAll(Content.Source source) throws Exception
{
// A raw BCCS cannot be rewound if fully consumed, as it is not able to re-open a passed in channel
Assumptions.assumeTrue(!(source instanceof ByteChannelContentSource) || source instanceof ByteChannelContentSource.PathContentSource);
String first = Content.Source.asString(source);
assertThat(first, is("onetwo"));
source.rewind();
String second = Content.Source.asString(source);
assertThat(second, is("onetwo"));
}
@ParameterizedTest
@MethodSource("all")
public void testReadRetain(Content.Source source) throws Exception
{
List<Content.Chunk> chunks = new ArrayList<>();
var task = new CompletableTask<>()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk == null)
{
source.demand(this);
break;
}
if (chunk.hasRemaining())
chunks.add(chunk);
if (chunk.isLast())
{
complete(null);
break;
}
}
}
};
source.demand(task);
task.get(10, TimeUnit.SECONDS);
StringBuilder builder = new StringBuilder();
for (Content.Chunk chunk : chunks)
{
if (chunk.hasRemaining())
builder.append(BufferUtil.toString(chunk.getByteBuffer()));
chunk.release();
}
assertThat(builder.toString(), is("onetwo"));
}
@ParameterizedTest
@MethodSource("all")
public void testDemandReadDemandDoesNotRecurse(Content.Source source) throws Exception
@ -228,7 +410,7 @@ public class ContentSourceTest
}
@ParameterizedTest
@MethodSource("all")
@MethodSource("multi")
public void testReadFailReadReturnsError(Content.Source source) throws Exception
{
Content.Chunk chunk = nextChunk(source);
@ -242,6 +424,17 @@ public class ContentSourceTest
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@ParameterizedTest
@MethodSource("all")
public void testFailReadReturnsError(Content.Source source) throws Exception
{
source.fail(new CancellationException());
// We must read the error.
Content.Chunk chunk = source.read();
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@ParameterizedTest
@MethodSource("all")
public void testReadLastDemandInvokesDemandCallback(Content.Source source) throws Exception
@ -270,13 +463,9 @@ public class ContentSourceTest
}
@ParameterizedTest
@MethodSource("all")
public void testDemandCallbackThrows(Content.Source source) throws Exception
@MethodSource("multi")
public void testReadDemandCallbackThrows(Content.Source source) throws Exception
{
// TODO fix for OSCS
// if (source instanceof OutputStreamContentSource)
// return;
Content.Chunk chunk = nextChunk(source);
assertNotNull(chunk);
chunk.release();
@ -290,6 +479,19 @@ public class ContentSourceTest
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@ParameterizedTest
@MethodSource("all")
public void testDemandCallbackThrows(Content.Source source) throws Exception
{
source.demand(() ->
{
throw new CancellationException();
});
Content.Chunk chunk = source.read();
assertTrue(Content.Chunk.isFailure(chunk, true));
}
@Test
public void testSimple()
{

View File

@ -16,21 +16,50 @@ package org.eclipse.jetty.io.internal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.TestSink;
import org.eclipse.jetty.io.TestSource;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.fail;
public class ContentCopierTest
{
@Test
public void testSimpleCopy() throws Exception
{
TimeoutException originalFailure = new TimeoutException("timeout");
TestSource originalSource = new TestSource(
Content.Chunk.from(BufferUtil.toBuffer("How "), false),
null,
Content.Chunk.from(BufferUtil.toBuffer("now "), false),
null,
Content.Chunk.from(BufferUtil.toBuffer("brown "), false),
Content.Chunk.from(BufferUtil.toBuffer("cow."), true)
);
Callback.Completable callback = new Callback.Completable();
TestSink resultSink = new TestSink();
ContentCopier contentCopier = new ContentCopier(originalSource, resultSink, null, callback);
contentCopier.iterate();
callback.get(5, TimeUnit.SECONDS);
StringBuilder result = new StringBuilder();
for (Content.Chunk chunk : resultSink.takeAccumulatedChunks())
result.append(BufferUtil.toString(chunk.getByteBuffer()));
assertThat(result.toString(), equalTo("How now brown cow."));
}
@Test
public void testTransientErrorsBecomeTerminalErrors() throws Exception
{

View File

@ -226,8 +226,6 @@ public class HttpClientStreamTest extends AbstractTest
@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:client:H2")
@Tag("DisableLeakTracking:client:H2C")
public void testDownloadWithFailure(Transport transport) throws Exception
{
byte[] data = new byte[64 * 1024];

View File

@ -18,6 +18,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.eclipse.jetty.util.thread.Invocable;
@ -148,6 +149,16 @@ public class ExceptionUtil
throw as(type, throwable);
}
/** Check if two {@link Throwable}s are associated.
* @param t1 A Throwable or null
* @param t2 Another Throwable or null
* @return true iff the exceptions are not associated by being the same instance, sharing a cause or one suppressing the other.
*/
public static boolean areAssociated(Throwable t1, Throwable t2)
{
return t1 != null && t2 != null && !areNotAssociated(t1, t2);
}
/** Check if two {@link Throwable}s are associated.
* @param t1 A Throwable or null
* @param t2 Another Throwable or null
@ -256,15 +267,7 @@ public class ExceptionUtil
public void callAndCatch(Invocable.Callable task)
{
try
{
if (task != null)
task.call();
}
catch (Throwable t)
{
add(t);
}
ExceptionUtil.call(task, this::add);
}
}
@ -300,8 +303,102 @@ public class ExceptionUtil
return t1;
}
private ExceptionUtil()
public static void callAndThen(Throwable cause, Consumer<Throwable> first, Consumer<Throwable> second)
{
try
{
first.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
second.accept(cause);
}
}
public static void callAndThen(Throwable cause, Consumer<Throwable> first, Runnable second)
{
try
{
first.accept(cause);
}
catch (Throwable t)
{
addSuppressedIfNotAssociated(cause, t);
}
finally
{
second.run();
}
}
public static void callAndThen(Runnable first, Runnable second)
{
try
{
first.run();
}
catch (Throwable t)
{
// ignored
}
finally
{
second.run();
}
}
/**
* Call a {@link Invocable.Callable} and handle failures
* @param callable The runnable to call
* @param failure The handling of failures
*/
public static void call(Invocable.Callable callable, Consumer<Throwable> failure)
{
try
{
callable.call();
}
catch (Throwable thrown)
{
try
{
failure.accept(thrown);
}
catch (Throwable alsoThrown)
{
ExceptionUtil.addSuppressedIfNotAssociated(alsoThrown, thrown);
ExceptionUtil.ifExceptionThrowUnchecked(alsoThrown);
}
}
}
/**
* Call a {@link Runnable} and handle failures
* @param runnable The runnable to call
* @param failure The handling of failures
*/
public static void run(Runnable runnable, Consumer<Throwable> failure)
{
try
{
runnable.run();
}
catch (Throwable thrown)
{
try
{
failure.accept(thrown);
}
catch (Throwable alsoThrown)
{
ExceptionUtil.addSuppressedIfNotAssociated(alsoThrown, thrown);
ExceptionUtil.ifExceptionThrowUnchecked(alsoThrown);
}
}
}
/**
@ -326,4 +423,8 @@ public class ExceptionUtil
throw new RuntimeException(e.getCause());
}
}
private ExceptionUtil()
{
}
}

View File

@ -200,7 +200,7 @@ public class ProcessWrapper implements AutoCloseable
String line;
while ((line = reader.readLine()) != null)
{
LOG.info(line);
LOG.debug(line);
logs.add(line);
}
}