changes from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2023-01-31 14:29:42 +11:00
parent 719c60d70d
commit 8133dd6c9e
6 changed files with 16 additions and 35 deletions

View File

@ -32,7 +32,6 @@ import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.io.content.ChunksContentSource;
import org.eclipse.jetty.io.content.PathContentSource;
@ -119,7 +118,7 @@ public class MultiPart
* <p>A part has an optional name, an optional fileName,
* optional headers and an optional content.</p>
*/
public abstract static class Part
public abstract static class Part implements Closeable
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
@ -204,6 +203,7 @@ public class MultiPart
* <a href="https://datatracker.ietf.org/doc/html/rfc7578#section-4.6">RFC 7578, section 4.6</a>.</p>
*
* @return the content of this part as a new {@link Content.Source}
* @see #getContentSource()
*/
public abstract Content.Source newContentSource();
@ -279,6 +279,7 @@ public class MultiPart
Files.delete(this.path);
}
@Override
public void close()
{
fail(SENTINEL_CLOSE_EXCEPTION);
@ -354,15 +355,18 @@ public class MultiPart
@Override
public Content.Source newContentSource()
{
return new ChunksContentSource(content.stream().map(c ->
Content.Chunk.asChunk(c.getByteBuffer().slice(), c.isLast(), Retainable.NOOP)).toList());
List<Content.Chunk> newChunks = content.stream()
.map(chunk -> Content.Chunk.asChunk(chunk.getByteBuffer().slice(), chunk.isLast(), chunk))
.peek(Content.Chunk::retain)
.toList();
return new ChunksContentSource(newChunks);
}
@Override
public void close()
{
super.close();
content.forEach(Retainable::release);
content.forEach(Content.Chunk::release);
}
@Override

View File

@ -24,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link CompletableFuture} that is completed when a multipart/byteranges
@ -56,8 +54,6 @@ import org.slf4j.LoggerFactory;
*/
public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.Parts>
{
private static final Logger LOG = LoggerFactory.getLogger(MultiPartByteRanges.class);
private final PartsListener listener = new PartsListener();
private final MultiPart.Parser parser;

View File

@ -386,7 +386,8 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
}
catch (Throwable e)
{
LOG.warn("Errors deleting multipart tmp files", e);
if (LOG.isDebugEnabled())
LOG.debug("Could not close part {}", p, e);
}
}
}
@ -565,7 +566,7 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
}
for (MultiPart.Part part : toFail)
{
part.close();
part.fail(cause);
}
close();
delete();

View File

@ -26,26 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public interface Retainable
{
Retainable NOOP = new Retainable()
{
@Override
public boolean canRetain()
{
return true;
}
@Override
public void retain()
{
}
@Override
public boolean release()
{
return true;
}
};
/**
* <p>Returns whether this resource is referenced counted by calls to {@link #retain()}
* and {@link #release()}.</p>

View File

@ -110,10 +110,10 @@ public interface HttpStream extends Callback
{
long consumedRequestContentBytes = 0;
long maxUnconsumedRequestContentBytes = httpConfig.getMaxUnconsumedRequestContentBytes();
while (consumedRequestContentBytes < maxUnconsumedRequestContentBytes)
while (maxUnconsumedRequestContentBytes < 0 || consumedRequestContentBytes < maxUnconsumedRequestContentBytes)
{
// We can always just read again here as EOF and Error content will be persistently returned.
Content.Chunk content = stream.read();
Chunk content = stream.read();
// if we cannot read to EOF then fail the stream rather than wait for unconsumed content
if (content == null)
@ -124,7 +124,7 @@ public interface HttpStream extends Callback
content.release();
// if the input failed, then fail the stream for same reason
if (content instanceof Content.Chunk.Error error)
if (content instanceof Chunk.Error error)
return error.getCause();
if (content.isLast())

View File

@ -291,7 +291,7 @@ public class DelayedHandler extends Handler.Wrapper
{
// We must execute here as even though we have consumed all the input, we are probably
// invoked in a demand runnable that is serialized with any write callbacks that might be done in process
getRequest().getContext().execute(super::process);
getRequest().getContext().execute(this::process);
}
else
{