Issue #3906 - Handling SeekableByteChannel.setPosition() Exception

+ UnsupportedOperationException now triggers a fallback seek mode

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2019-07-31 14:36:16 -05:00
parent 6b07bd23c8
commit c2f76a6be5
6 changed files with 250 additions and 28 deletions

View File

@ -64,7 +64,7 @@ public class HttpContentRangeWriter
if (channel instanceof SeekableByteChannel) if (channel instanceof SeekableByteChannel)
{ {
SeekableByteChannel seekableByteChannel = (SeekableByteChannel)channel; SeekableByteChannel seekableByteChannel = (SeekableByteChannel)channel;
return new SeekableByteChannelRangeWriter(seekableByteChannel); return new SeekableByteChannelRangeWriter(seekableByteChannel, () -> (SeekableByteChannel)content.getReadableByteChannel());
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())

View File

@ -28,13 +28,29 @@ import org.eclipse.jetty.util.IO;
public class SeekableByteChannelRangeWriter implements RangeWriter public class SeekableByteChannelRangeWriter implements RangeWriter
{ {
private final SeekableByteChannel channel; public static final int NO_PROGRESS_LIMIT = 3;
public interface ChannelSupplier
{
SeekableByteChannel newSeekableByteChannel() throws IOException;
}
private final ChannelSupplier channelSupplier;
private final int bufSize; private final int bufSize;
private final ByteBuffer buffer; private final ByteBuffer buffer;
private SeekableByteChannel channel;
private long pos;
private boolean defaultSeekMode = true;
public SeekableByteChannelRangeWriter(SeekableByteChannel seekableByteChannel) public SeekableByteChannelRangeWriter(SeekableByteChannelRangeWriter.ChannelSupplier channelSupplier)
{ {
this.channel = seekableByteChannel; this(null, channelSupplier);
}
public SeekableByteChannelRangeWriter(SeekableByteChannel initialChannel, SeekableByteChannelRangeWriter.ChannelSupplier channelSupplier)
{
this.channel = initialChannel;
this.channelSupplier = channelSupplier;
this.bufSize = IO.bufferSize; this.bufSize = IO.bufferSize;
this.buffer = BufferUtil.allocate(this.bufSize); this.buffer = BufferUtil.allocate(this.bufSize);
} }
@ -42,13 +58,16 @@ public class SeekableByteChannelRangeWriter implements RangeWriter
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
this.channel.close(); if (this.channel != null)
{
this.channel.close();
}
} }
@Override @Override
public void writeTo(OutputStream outputStream, long skipTo, long length) throws IOException public void writeTo(OutputStream outputStream, long skipTo, long length) throws IOException
{ {
this.channel.position(skipTo); skipTo(skipTo);
// copy from channel to output stream // copy from channel to output stream
long readTotal = 0; long readTotal = 0;
@ -61,6 +80,87 @@ public class SeekableByteChannelRangeWriter implements RangeWriter
BufferUtil.flipToFlush(buffer, 0); BufferUtil.flipToFlush(buffer, 0);
BufferUtil.writeTo(buffer, outputStream); BufferUtil.writeTo(buffer, outputStream);
readTotal += readLen; readTotal += readLen;
pos += readLen;
}
}
private void skipTo(long skipTo) throws IOException
{
if (channel == null)
{
channel = channelSupplier.newSeekableByteChannel();
pos = 0;
}
if (defaultSeekMode)
{
try
{
if (channel.position() != skipTo)
{
channel.position(skipTo);
pos = skipTo;
return;
}
}
catch (UnsupportedOperationException e)
{
defaultSeekMode = false;
fallbackSkipTo(skipTo);
}
}
else
{
// Fallback mode
fallbackSkipTo(skipTo);
}
}
private void fallbackSkipTo(long skipTo) throws IOException
{
if (skipTo < pos)
{
channel.close();
channel = channelSupplier.newSeekableByteChannel();
pos = 0;
}
if (pos < skipTo)
{
long skipSoFar = pos;
long actualSkipped;
int noProgressLoopLimit = NO_PROGRESS_LIMIT;
// loop till we reach desired point, break out on lack of progress.
while (noProgressLoopLimit > 0 && skipSoFar < skipTo)
{
BufferUtil.clearToFill(buffer);
int len = (int)Math.min(bufSize, (skipTo - skipSoFar));
buffer.limit(len);
actualSkipped = channel.read(buffer);
if (actualSkipped == 0)
{
noProgressLoopLimit--;
}
else if (actualSkipped > 0)
{
skipSoFar += actualSkipped;
noProgressLoopLimit = NO_PROGRESS_LIMIT;
}
else
{
// negative values means the stream was closed or reached EOF
// either way, we've hit a state where we can no longer
// fulfill the requested range write.
throw new IOException("EOF reached before SeekableByteChannel skip destination");
}
}
if (noProgressLoopLimit <= 0)
{
throw new IOException("No progress made to reach SeekableByteChannel skip position " + (skipTo - pos));
}
pos = skipTo;
} }
} }
} }

View File

@ -21,10 +21,16 @@ package org.eclipse.jetty.server.resource;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel; import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.eclipse.jetty.toolchain.test.FS; import org.eclipse.jetty.toolchain.test.FS;
@ -32,6 +38,7 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.resource.PathResource; import org.eclipse.jetty.util.resource.PathResource;
import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.Resource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -43,6 +50,16 @@ import static org.hamcrest.Matchers.is;
public class RangeWriterTest public class RangeWriterTest
{ {
public static final String DATA = "01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYZ!@#$%^&*()_+/.,[]"; public static final String DATA = "01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYZ!@#$%^&*()_+/.,[]";
private static FileSystem zipfs;
@AfterAll
public static void closeZipFs() throws IOException
{
if (zipfs != null)
{
zipfs.close();
}
}
public static Path initDataFile() throws IOException public static Path initDataFile() throws IOException
{ {
@ -59,21 +76,47 @@ public class RangeWriterTest
return dataFile; return dataFile;
} }
public static Stream<Arguments> impls() throws IOException private static Path initZipFsDataFile() throws URISyntaxException, IOException
{ {
Resource resource = new PathResource(initDataFile()); Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar");
URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null);
Map<String, Object> env = new HashMap<>();
env.put("multi-release", "runtime");
if (zipfs != null)
{
// close prior one
zipfs.close();
}
zipfs = FileSystems.newFileSystem(uri, env);
Path rootPath = zipfs.getRootDirectories().iterator().next();
return rootPath.resolve("data.dat");
}
public static Stream<Arguments> impls() throws IOException, URISyntaxException
{
Resource realFileSystemResource = new PathResource(initDataFile());
Resource nonDefaultFileSystemResource = new PathResource(initZipFsDataFile());
return Stream.of( return Stream.of(
Arguments.of(new ByteBufferRangeWriter(BufferUtil.toBuffer(resource, true))), Arguments.of("Traditional / Direct Buffer", new ByteBufferRangeWriter(BufferUtil.toBuffer(realFileSystemResource, true))),
Arguments.of(new ByteBufferRangeWriter(BufferUtil.toBuffer(resource, false))), Arguments.of("Traditional / Indirect Buffer", new ByteBufferRangeWriter(BufferUtil.toBuffer(realFileSystemResource, false))),
Arguments.of(new SeekableByteChannelRangeWriter((SeekableByteChannel)resource.getReadableByteChannel())), Arguments.of("Traditional / SeekableByteChannel", new SeekableByteChannelRangeWriter(() -> (SeekableByteChannel)realFileSystemResource.getReadableByteChannel())),
Arguments.of(new InputStreamRangeWriter(() -> resource.getInputStream())) Arguments.of("Traditional / InputStream", new InputStreamRangeWriter(() -> realFileSystemResource.getInputStream())),
Arguments.of("Non-Default FS / Direct Buffer", new ByteBufferRangeWriter(BufferUtil.toBuffer(nonDefaultFileSystemResource, true))),
Arguments.of("Non-Default FS / Indirect Buffer", new ByteBufferRangeWriter(BufferUtil.toBuffer(nonDefaultFileSystemResource, false))),
Arguments.of("Non-Default FS / SeekableByteChannel", new SeekableByteChannelRangeWriter(() -> (SeekableByteChannel)nonDefaultFileSystemResource.getReadableByteChannel())),
Arguments.of("Non-Default FS / InputStream", new InputStreamRangeWriter(() -> nonDefaultFileSystemResource.getInputStream()))
); );
} }
@ParameterizedTest @ParameterizedTest(name = "[{index}] {0}")
@MethodSource("impls") @MethodSource("impls")
public void testSimpleRange(RangeWriter rangeWriter) throws IOException public void testSimpleRange(String description, RangeWriter rangeWriter) throws IOException
{ {
ByteArrayOutputStream outputStream; ByteArrayOutputStream outputStream;
@ -82,9 +125,9 @@ public class RangeWriterTest
assertThat("Range: 10 (len=50)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(10, 60))); assertThat("Range: 10 (len=50)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(10, 60)));
} }
@ParameterizedTest @ParameterizedTest(name = "[{index}] {0}")
@MethodSource("impls") @MethodSource("impls")
public void testSameRange_MultipleTimes(RangeWriter rangeWriter) throws IOException public void testSameRange_MultipleTimes(String description, RangeWriter rangeWriter) throws IOException
{ {
ByteArrayOutputStream outputStream; ByteArrayOutputStream outputStream;
@ -97,9 +140,9 @@ public class RangeWriterTest
assertThat("Range(b): 10 (len=50)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(10, 60))); assertThat("Range(b): 10 (len=50)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(10, 60)));
} }
@ParameterizedTest @ParameterizedTest(name = "[{index}] {0}")
@MethodSource("impls") @MethodSource("impls")
public void testMultipleRanges_Ordered(RangeWriter rangeWriter) throws IOException public void testMultipleRanges_Ordered(String description, RangeWriter rangeWriter) throws IOException
{ {
ByteArrayOutputStream outputStream; ByteArrayOutputStream outputStream;
@ -116,9 +159,9 @@ public class RangeWriterTest
assertThat("Range(b): 55 (len=10)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(55, 55 + 10))); assertThat("Range(b): 55 (len=10)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(55, 55 + 10)));
} }
@ParameterizedTest @ParameterizedTest(name = "[{index}] {0}")
@MethodSource("impls") @MethodSource("impls")
public void testMultipleRanges_Overlapping(RangeWriter rangeWriter) throws IOException public void testMultipleRanges_Overlapping(String description, RangeWriter rangeWriter) throws IOException
{ {
ByteArrayOutputStream outputStream; ByteArrayOutputStream outputStream;
@ -135,9 +178,9 @@ public class RangeWriterTest
assertThat("Range(b): 20 (len=20)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(20, 20 + 20))); assertThat("Range(b): 20 (len=20)", new String(outputStream.toByteArray(), UTF_8), is(DATA.substring(20, 20 + 20)));
} }
@ParameterizedTest @ParameterizedTest(name = "[{index}] {0}")
@MethodSource("impls") @MethodSource("impls")
public void testMultipleRanges_ReverseOrder(RangeWriter rangeWriter) throws IOException public void testMultipleRanges_ReverseOrder(String description, RangeWriter rangeWriter) throws IOException
{ {
ByteArrayOutputStream outputStream; ByteArrayOutputStream outputStream;

Binary file not shown.

View File

@ -388,7 +388,6 @@ public class PathResource extends Resource
@Override @Override
public InputStream getInputStream() throws IOException public InputStream getInputStream() throws IOException
{ {
// TODO: investigate if SPARSE use for default FileSystem usages is worth it
return Files.newInputStream(path, StandardOpenOption.READ); return Files.newInputStream(path, StandardOpenOption.READ);
} }
@ -401,7 +400,11 @@ public class PathResource extends Resource
@Override @Override
public ReadableByteChannel getReadableByteChannel() throws IOException public ReadableByteChannel getReadableByteChannel() throws IOException
{ {
// TODO: investigate if SPARSE use for default FileSystem usages is worth it return newSeekableByteChannel();
}
public SeekableByteChannel newSeekableByteChannel() throws IOException
{
return Files.newByteChannel(path, StandardOpenOption.READ); return Files.newByteChannel(path, StandardOpenOption.READ);
} }
@ -588,7 +591,7 @@ public class PathResource extends Resource
try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ)) try (SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ))
{ {
ByteBuffer buffer = BufferUtil.allocate(IO.bufferSize); ByteBuffer buffer = BufferUtil.allocate(IO.bufferSize);
channel.position(start); skipTo(channel, buffer, start);
// copy from channel to output stream // copy from channel to output stream
long readTotal = 0; long readTotal = 0;
@ -605,6 +608,57 @@ public class PathResource extends Resource
} }
} }
private void skipTo(SeekableByteChannel channel, ByteBuffer buffer, long skipTo) throws IOException
{
try
{
if (channel.position() != skipTo)
{
channel.position(skipTo);
}
}
catch (UnsupportedOperationException e)
{
final int NO_PROGRESS_LIMIT = 3;
if (skipTo > 0)
{
long pos = 0;
long readLen;
int noProgressLoopLimit = NO_PROGRESS_LIMIT;
// loop till we reach desired point, break out on lack of progress.
while (noProgressLoopLimit > 0 && pos < skipTo)
{
BufferUtil.clearToFill(buffer);
int len = (int)Math.min(IO.bufferSize, (skipTo - pos));
buffer.limit(len);
readLen = channel.read(buffer);
if (readLen == 0)
{
noProgressLoopLimit--;
}
else if (readLen > 0)
{
pos += readLen;
noProgressLoopLimit = NO_PROGRESS_LIMIT;
}
else
{
// negative values means the stream was closed or reached EOF
// either way, we've hit a state where we can no longer
// fulfill the requested range write.
throw new IOException("EOF reached before SeekableByteChannel skip destination");
}
}
if (noProgressLoopLimit <= 0)
{
throw new IOException("No progress made to reach SeekableByteChannel skip position " + skipTo);
}
}
}
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util.resource; package org.eclipse.jetty.util.resource;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -33,6 +34,7 @@ import java.util.Map;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
@ -46,7 +48,6 @@ public class PathResourceTest
Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar"); Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar");
URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null); URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null);
System.err.println("URI = " + uri);
Map<String, Object> env = new HashMap<>(); Map<String, Object> env = new HashMap<>();
env.put("multi-release", "runtime"); env.put("multi-release", "runtime");
@ -71,7 +72,6 @@ public class PathResourceTest
Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar"); Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar");
URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null); URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null);
System.err.println("URI = " + uri);
Map<String, Object> env = new HashMap<>(); Map<String, Object> env = new HashMap<>();
env.put("multi-release", "runtime"); env.put("multi-release", "runtime");
@ -96,7 +96,6 @@ public class PathResourceTest
Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar"); Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar");
URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null); URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null);
System.err.println("URI = " + uri);
Map<String, Object> env = new HashMap<>(); Map<String, Object> env = new HashMap<>();
env.put("multi-release", "runtime"); env.put("multi-release", "runtime");
@ -112,6 +111,32 @@ public class PathResourceTest
} }
} }
@Test
public void testNonDefaultFileSystem_WriteTo() throws URISyntaxException, IOException
{
Path exampleJar = MavenTestingUtils.getTestResourcePathFile("example.jar");
URI uri = new URI("jar", exampleJar.toUri().toASCIIString(), null);
Map<String, Object> env = new HashMap<>();
env.put("multi-release", "runtime");
try (FileSystem zipfs = FileSystems.newFileSystem(uri, env))
{
Path manifestPath = zipfs.getPath("/META-INF/MANIFEST.MF");
assertThat(manifestPath, is(not(nullValue())));
PathResource resource = new PathResource(manifestPath);
try (ByteArrayOutputStream out = new ByteArrayOutputStream())
{
resource.writeTo(out, 2, 10);
String actual = new String(out.toByteArray(), UTF_8);
String expected = "nifest-Ver";
assertThat("writeTo(out, 2, 10)", actual, is(expected));
}
}
}
@Test @Test
public void testDefaultFileSystem_GetFile() throws Exception public void testDefaultFileSystem_GetFile() throws Exception
{ {