changes from review
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
8133dd6c9e
commit
88c6f20199
|
@ -120,7 +120,7 @@ public class MultiPart
|
|||
*/
|
||||
public abstract static class Part implements Closeable
|
||||
{
|
||||
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");
|
||||
private static final Throwable CLOSE_EXCEPTION = new StaticException("Closed");
|
||||
|
||||
private final String name;
|
||||
private final String fileName;
|
||||
|
@ -202,7 +202,7 @@ public class MultiPart
|
|||
* as specified in
|
||||
* <a href="https://datatracker.ietf.org/doc/html/rfc7578#section-4.6">RFC 7578, section 4.6</a>.</p>
|
||||
*
|
||||
* @return the content of this part as a new {@link Content.Source}
|
||||
* @return the content of this part as a new {@link Content.Source} or null if the content cannot be consumed multiple times.
|
||||
* @see #getContentSource()
|
||||
*/
|
||||
public abstract Content.Source newContentSource();
|
||||
|
@ -258,7 +258,6 @@ public class MultiPart
|
|||
*/
|
||||
public void writeTo(Path path) throws IOException
|
||||
{
|
||||
this.temporary = false;
|
||||
if (this.path == null)
|
||||
{
|
||||
try (OutputStream out = Files.newOutputStream(path))
|
||||
|
@ -266,10 +265,12 @@ public class MultiPart
|
|||
IO.copy(Content.Source.asInputStream(newContentSource()), out);
|
||||
}
|
||||
this.path = path;
|
||||
this.temporary = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
this.path = Files.move(this.path, path, StandardCopyOption.REPLACE_EXISTING);
|
||||
this.temporary = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,7 +283,7 @@ public class MultiPart
|
|||
@Override
|
||||
public void close()
|
||||
{
|
||||
fail(SENTINEL_CLOSE_EXCEPTION);
|
||||
fail(CLOSE_EXCEPTION);
|
||||
}
|
||||
|
||||
public void fail(Throwable t)
|
||||
|
@ -349,6 +350,7 @@ public class MultiPart
|
|||
public ChunksPart(String name, String fileName, HttpFields fields, List<Content.Chunk> content)
|
||||
{
|
||||
super(name, fileName, fields);
|
||||
content.forEach(Content.Chunk::retain);
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
|
@ -356,8 +358,7 @@ public class MultiPart
|
|||
public Content.Source newContentSource()
|
||||
{
|
||||
List<Content.Chunk> newChunks = content.stream()
|
||||
.map(chunk -> Content.Chunk.asChunk(chunk.getByteBuffer().slice(), chunk.isLast(), chunk))
|
||||
.peek(Content.Chunk::retain)
|
||||
.map(chunk -> Content.Chunk.from(chunk.getByteBuffer().slice(), chunk.isLast()))
|
||||
.toList();
|
||||
return new ChunksContentSource(newChunks);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.Retainable;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
|
||||
/**
|
||||
|
@ -294,6 +295,7 @@ public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.P
|
|||
public void onPart(String name, String fileName, HttpFields headers)
|
||||
{
|
||||
parts.add(new MultiPart.ChunksPart(name, fileName, headers, List.copyOf(partChunks)));
|
||||
partChunks.forEach(Content.Chunk::release);
|
||||
partChunks.clear();
|
||||
}
|
||||
|
||||
|
@ -313,16 +315,20 @@ public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.P
|
|||
|
||||
private void fail(Throwable cause)
|
||||
{
|
||||
List<MultiPart.Part> toFail;
|
||||
List<MultiPart.Part> partsToFail;
|
||||
List<Content.Chunk> partChunksToFail;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
if (failure != null)
|
||||
return;
|
||||
failure = cause;
|
||||
toFail = new ArrayList<>(parts);
|
||||
partsToFail = new ArrayList<>(parts);
|
||||
parts.clear();
|
||||
partChunksToFail = new ArrayList<>(partChunks);
|
||||
partChunks.clear();
|
||||
}
|
||||
toFail.forEach(MultiPart.Part::close);
|
||||
partsToFail.forEach(p -> p.fail(cause));
|
||||
partChunksToFail.forEach(Retainable::release);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import java.util.Objects;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.Retainable;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.QuotedStringTokenizer;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -380,15 +382,7 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
|
|||
{
|
||||
for (MultiPart.Part p : parts)
|
||||
{
|
||||
try
|
||||
{
|
||||
p.close();
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not close part {}", p, e);
|
||||
}
|
||||
IO.close(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -523,6 +517,7 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
|
|||
memoryFileSize = 0;
|
||||
filePath = null;
|
||||
fileChannel = null;
|
||||
partChunks.forEach(Content.Chunk::release);
|
||||
partChunks.clear();
|
||||
// Store the new part.
|
||||
try (AutoLock ignored = lock.lock())
|
||||
|
@ -555,19 +550,20 @@ public class MultiPartFormData extends CompletableFuture<MultiPartFormData.Parts
|
|||
|
||||
private void fail(Throwable cause)
|
||||
{
|
||||
List<MultiPart.Part> toFail;
|
||||
List<MultiPart.Part> partsToFail;
|
||||
List<Content.Chunk> partChunksToFail;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
if (failure != null)
|
||||
return;
|
||||
failure = cause;
|
||||
toFail = new ArrayList<>(parts);
|
||||
partsToFail = new ArrayList<>(parts);
|
||||
parts.clear();
|
||||
partChunksToFail = new ArrayList<>(partChunks);
|
||||
partChunks.clear();
|
||||
}
|
||||
for (MultiPart.Part part : toFail)
|
||||
{
|
||||
part.fail(cause);
|
||||
}
|
||||
partsToFail.forEach(p -> p.fail(cause));
|
||||
partChunksToFail.forEach(Retainable::release);
|
||||
close();
|
||||
delete();
|
||||
}
|
||||
|
|
|
@ -51,7 +51,6 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class MultiPartCaptureTest
|
||||
{
|
||||
|
@ -255,8 +254,7 @@ public class MultiPartCaptureTest
|
|||
assertThat("Part[" + expected.name + "]", parts, is(notNullValue()));
|
||||
MultiPart.Part part = parts.get(0);
|
||||
String charset = getCharsetFromContentType(part.getHeaders().get(HttpHeader.CONTENT_TYPE), defaultCharset);
|
||||
assertTrue(part.getContentSource().rewind());
|
||||
String partContent = Content.Source.asString(part.getContentSource(), Charset.forName(charset));
|
||||
String partContent = Content.Source.asString(part.newContentSource(), Charset.forName(charset));
|
||||
assertThat("Part[" + expected.name + "].contents", partContent, containsString(expected.value));
|
||||
}
|
||||
|
||||
|
@ -276,8 +274,7 @@ public class MultiPartCaptureTest
|
|||
assertThat("Part[" + expected.name + "]", parts, is(notNullValue()));
|
||||
MultiPart.Part part = parts.get(0);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA1");
|
||||
assertTrue(part.getContentSource().rewind());
|
||||
try (InputStream partInputStream = Content.Source.asInputStream(part.getContentSource());
|
||||
try (InputStream partInputStream = Content.Source.asInputStream(part.newContentSource());
|
||||
DigestOutputStream digester = new DigestOutputStream(OutputStream.nullOutputStream(), digest))
|
||||
{
|
||||
IO.copy(partInputStream, digester);
|
||||
|
|
|
@ -13,8 +13,10 @@
|
|||
|
||||
package org.eclipse.jetty.io.content;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
|
@ -38,6 +40,7 @@ public class ChunksContentSource implements Content.Source
|
|||
|
||||
public ChunksContentSource(Collection<Content.Chunk> chunks)
|
||||
{
|
||||
chunks.forEach(Content.Chunk::retain);
|
||||
this.chunks = chunks;
|
||||
this.length = chunks.stream().mapToLong(c -> c.getByteBuffer().remaining()).sum();
|
||||
}
|
||||
|
@ -113,11 +116,23 @@ public class ChunksContentSource implements Content.Source
|
|||
@Override
|
||||
public void fail(Throwable failure)
|
||||
{
|
||||
List<Content.Chunk> chunksToRelease;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
if (terminated != null)
|
||||
return;
|
||||
terminated = Content.Chunk.from(failure);
|
||||
if (iterator != null)
|
||||
{
|
||||
chunksToRelease = new ArrayList<>();
|
||||
iterator.forEachRemaining(chunksToRelease::add);
|
||||
}
|
||||
else
|
||||
{
|
||||
chunksToRelease = List.copyOf(chunks);
|
||||
}
|
||||
}
|
||||
|
||||
chunksToRelease.forEach(Content.Chunk::release);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue