HADOOP-18378. Implement lazy seek in S3A prefetching. (#4955)

Make S3APrefetchingInputStream.seek() completely lazy. Calls to seek() will not affect the current buffer nor interfere with prefetching, until read() is called.

This change allows various usage patterns to benefit from prefetching, e.g. when calling readFully(position, buffer) in a loop for contiguous positions the intermediate internal calls to seek() will be noops and prefetching will have the same performance as in a sequential read.

Contributed by Alessandro Passaro.
This commit is contained in:
Alessandro Passaro 2022-10-06 12:00:41 +01:00 committed by Steve Loughran
parent bb08c90228
commit 0f1a3f23a5
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
8 changed files with 222 additions and 137 deletions

View File

@ -116,7 +116,7 @@ public final class FilePosition {
readOffset,
"readOffset",
startOffset,
startOffset + bufferData.getBuffer().limit() - 1);
startOffset + bufferData.getBuffer().limit());
data = bufferData;
buffer = bufferData.getBuffer().duplicate();
@ -182,7 +182,7 @@ public final class FilePosition {
*/
public boolean isWithinCurrentBuffer(long pos) {
throwIfInvalidBuffer();
long bufferEndOffset = bufferStartOffset + buffer.limit() - 1;
long bufferEndOffset = bufferStartOffset + buffer.limit();
return (pos >= bufferStartOffset) && (pos <= bufferEndOffset);
}

View File

@ -26,6 +26,7 @@ import org.junit.Test;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -43,6 +44,7 @@ public class TestFilePosition extends AbstractHadoopTestBase {
new FilePosition(10, 5);
new FilePosition(5, 10);
new FilePosition(10, 5).setData(data, 3, 4);
new FilePosition(10, 10).setData(data, 3, 13);
// Verify it throws correctly.
@ -94,11 +96,11 @@ public class TestFilePosition extends AbstractHadoopTestBase {
"'readOffset' must not be negative", () -> pos.setData(data, 4, -4));
intercept(IllegalArgumentException.class,
"'readOffset' (15) must be within the range [4, 13]",
"'readOffset' (15) must be within the range [4, 14]",
() -> pos.setData(data, 4, 15));
intercept(IllegalArgumentException.class,
"'readOffset' (3) must be within the range [4, 13]",
"'readOffset' (3) must be within the range [4, 14]",
() -> pos.setData(data, 4, 3));
}
@ -192,4 +194,31 @@ public class TestFilePosition extends AbstractHadoopTestBase {
}
assertTrue(pos.bufferFullyRead());
}
@Test
public void testBounds() {
int bufferSize = 8;
long fileSize = bufferSize;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
BufferData data = new BufferData(0, buffer);
FilePosition pos = new FilePosition(fileSize, bufferSize);
long eofOffset = fileSize;
pos.setData(data, 0, eofOffset);
assertThat(pos.isWithinCurrentBuffer(eofOffset))
.describedAs("EOF offset %d should be within the current buffer", eofOffset)
.isTrue();
assertThat(pos.absolute())
.describedAs("absolute() should return the EOF offset")
.isEqualTo(eofOffset);
assertThat(pos.setAbsolute(eofOffset))
.describedAs("setAbsolute() should return true on the EOF offset %d", eofOffset)
.isTrue();
assertThat(pos.absolute())
.describedAs("absolute() should return the EOF offset")
.isEqualTo(eofOffset);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@ -84,46 +85,6 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
fileSize);
}
/**
* Moves the current read position so that the next read will occur at {@code pos}.
*
* @param pos the next read will take place at this position.
*
* @throws IllegalArgumentException if pos is outside of the range [0, file size].
*/
@Override
public void seek(long pos) throws IOException {
throwIfClosed();
throwIfInvalidSeek(pos);
// The call to setAbsolute() returns true if the target position is valid and
// within the current block. Therefore, no additional work is needed when we get back true.
if (!getFilePosition().setAbsolute(pos)) {
LOG.info("seek({})", getOffsetStr(pos));
// We could be here in two cases:
// -- the target position is invalid:
// We ignore this case here as the next read will return an error.
// -- it is valid but outside of the current block.
if (getFilePosition().isValid()) {
// There are two cases to consider:
// -- the seek was issued after this buffer was fully read.
// In this case, it is very unlikely that this buffer will be needed again;
// therefore we release the buffer without caching.
// -- if we are jumping out of the buffer before reading it completely then
// we will likely need this buffer again (as observed empirically for Parquet)
// therefore we issue an async request to cache this buffer.
if (!getFilePosition().bufferFullyRead()) {
blockManager.requestCaching(getFilePosition().data());
} else {
blockManager.release(getFilePosition().data());
}
getFilePosition().invalidate();
blockManager.cancelPrefetches();
}
setSeekTargetPos(pos);
}
}
@Override
public void close() throws IOException {
// Close the BlockManager first, cancelling active prefetches,
@ -139,36 +100,45 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
return false;
}
if (getFilePosition().isValid() && getFilePosition()
.buffer()
.hasRemaining()) {
return true;
}
long readPos;
int prefetchCount;
if (getFilePosition().isValid()) {
// A sequential read results in a prefetch.
readPos = getFilePosition().absolute();
prefetchCount = numBlocksToPrefetch;
} else {
// A seek invalidates the current position.
// We prefetch only 1 block immediately after a seek operation.
readPos = getSeekTargetPos();
prefetchCount = 1;
}
long readPos = getNextReadPos();
if (!getBlockData().isValidOffset(readPos)) {
return false;
}
if (getFilePosition().isValid()) {
if (getFilePosition().bufferFullyRead()) {
blockManager.release(getFilePosition().data());
// Determine whether this is an out of order read.
FilePosition filePosition = getFilePosition();
boolean outOfOrderRead = !filePosition.setAbsolute(readPos);
if (!outOfOrderRead && filePosition.buffer().hasRemaining()) {
// Use the current buffer.
return true;
}
if (filePosition.isValid()) {
// We are jumping out of the current buffer. There are two cases to consider:
if (filePosition.bufferFullyRead()) {
// This buffer was fully read:
// it is very unlikely that this buffer will be needed again;
// therefore we release the buffer without caching.
blockManager.release(filePosition.data());
} else {
blockManager.requestCaching(getFilePosition().data());
// We will likely need this buffer again (as observed empirically for Parquet)
// therefore we issue an async request to cache this buffer.
blockManager.requestCaching(filePosition.data());
}
filePosition.invalidate();
}
int prefetchCount;
if (outOfOrderRead) {
LOG.debug("lazy-seek({})", getOffsetStr(readPos));
blockManager.cancelPrefetches();
// We prefetch only 1 block immediately after a seek operation.
prefetchCount = 1;
} else {
// A sequential read results in a prefetch.
prefetchCount = numBlocksToPrefetch;
}
int toBlockNumber = getBlockData().getBlockNumber(readPos);
@ -186,7 +156,7 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
.trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ),
() -> blockManager.get(toBlockNumber));
getFilePosition().setData(data, startOffset, readPos);
filePosition.setData(data, startOffset, readPos);
return true;
}
@ -197,7 +167,7 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
}
StringBuilder sb = new StringBuilder();
sb.append(String.format("fpos = (%s)%n", getFilePosition()));
sb.append(String.format("%s%n", super.toString()));
sb.append(blockManager.toString());
return sb.toString();
}

View File

@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@ -86,7 +87,12 @@ public class S3AInMemoryInputStream extends S3ARemoteInputStream {
return false;
}
if (!getFilePosition().isValid()) {
FilePosition filePosition = getFilePosition();
if (filePosition.isValid()) {
// Update current position (lazy seek).
filePosition.setAbsolute(getNextReadPos());
} else {
// Read entire file into buffer.
buffer.clear();
int numBytesRead =
getReader().read(buffer, 0, buffer.capacity());
@ -94,9 +100,9 @@ public class S3AInMemoryInputStream extends S3ARemoteInputStream {
return false;
}
BufferData data = new BufferData(0, buffer);
getFilePosition().setData(data, 0, getSeekTargetPos());
filePosition.setData(data, 0, getNextReadPos());
}
return getFilePosition().buffer().hasRemaining();
return filePosition.buffer().hasRemaining();
}
}

View File

@ -77,12 +77,16 @@ public abstract class S3ARemoteInputStream
private volatile boolean closed;
/**
* Current position within the file.
* Internal position within the file. Updated lazily
* after a seek before a read.
*/
private FilePosition fpos;
/** The target of the most recent seek operation. */
private long seekTargetPos;
/**
* This is the actual position within the file, used by
* lazy seek to decide whether to seek on the next read or not.
*/
private long nextReadPos;
/** Information about each block of the mapped S3 file. */
private BlockData blockData;
@ -146,7 +150,7 @@ public abstract class S3ARemoteInputStream
this.remoteObject = getS3File();
this.reader = new S3ARemoteObjectReader(remoteObject);
this.seekTargetPos = 0;
this.nextReadPos = 0;
}
/**
@ -212,7 +216,8 @@ public abstract class S3ARemoteInputStream
public int available() throws IOException {
throwIfClosed();
if (!ensureCurrentBuffer()) {
// Update the current position in the current buffer, if possible.
if (!fpos.setAbsolute(nextReadPos)) {
return 0;
}
@ -228,11 +233,7 @@ public abstract class S3ARemoteInputStream
public long getPos() throws IOException {
throwIfClosed();
if (fpos.isValid()) {
return fpos.absolute();
} else {
return seekTargetPos;
}
return nextReadPos;
}
/**
@ -247,10 +248,7 @@ public abstract class S3ARemoteInputStream
throwIfClosed();
throwIfInvalidSeek(pos);
if (!fpos.setAbsolute(pos)) {
fpos.invalidate();
seekTargetPos = pos;
}
nextReadPos = pos;
}
/**
@ -268,7 +266,7 @@ public abstract class S3ARemoteInputStream
throwIfClosed();
if (remoteObject.size() == 0
|| seekTargetPos >= remoteObject.size()) {
|| nextReadPos >= remoteObject.size()) {
return -1;
}
@ -276,6 +274,7 @@ public abstract class S3ARemoteInputStream
return -1;
}
nextReadPos++;
incrementBytesRead(1);
return fpos.buffer().get() & 0xff;
@ -315,7 +314,7 @@ public abstract class S3ARemoteInputStream
}
if (remoteObject.size() == 0
|| seekTargetPos >= remoteObject.size()) {
|| nextReadPos >= remoteObject.size()) {
return -1;
}
@ -334,6 +333,7 @@ public abstract class S3ARemoteInputStream
ByteBuffer buf = fpos.buffer();
int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
buf.get(buffer, offset, bytesToRead);
nextReadPos += bytesToRead;
incrementBytesRead(bytesToRead);
offset += bytesToRead;
numBytesRemaining -= bytesToRead;
@ -367,12 +367,8 @@ public abstract class S3ARemoteInputStream
return closed;
}
protected long getSeekTargetPos() {
return seekTargetPos;
}
protected void setSeekTargetPos(long pos) {
seekTargetPos = pos;
protected long getNextReadPos() {
return nextReadPos;
}
protected BlockData getBlockData() {
@ -443,6 +439,18 @@ public abstract class S3ARemoteInputStream
return false;
}
@Override
public String toString() {
if (isClosed()) {
return "closed";
}
StringBuilder sb = new StringBuilder();
sb.append(String.format("nextReadPos = (%d)%n", nextReadPos));
sb.append(String.format("fpos = (%s)", fpos));
return sb.toString();
}
protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(
@ -453,6 +461,8 @@ public abstract class S3ARemoteInputStream
protected void throwIfInvalidSeek(long pos) throws EOFException {
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
} else if (pos > this.getBlockData().getFileSize()) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
}

View File

@ -158,7 +158,7 @@ The buffer for the block furthest from the current block is released.
Once a buffer has been acquired by `CachingBlockManager`, if the buffer is in a *READY* state, it is
returned.
This means that data was already read into this buffer asynchronously by a prefetch.
If it's state is *BLANK* then data is read into it using
If its state is *BLANK* then data is read into it using
`S3Reader.read(ByteBuffer buffer, long offset, int size).`
For the second read call, `in.read(buffer, 0, 8MB)`, since the block sizes are of 8MB and only 5MB
@ -170,7 +170,10 @@ The number of blocks to be prefetched is determined by `fs.s3a.prefetch.block.co
##### Random Reads
If the caller makes the following calls:
The `CachingInputStream` also caches prefetched blocks. This happens when `read()` is issued
after a `seek()` outside the current block, but the current block still has not been fully read.
For example, consider the following calls:
```
in.read(buffer, 0, 5MB)
@ -180,13 +183,14 @@ in.seek(2MB)
in.read(buffer, 0, 4MB)
```
The `CachingInputStream` also caches prefetched blocks.
This happens when a `seek()` is issued for outside the current block and the current block still has
not been fully read.
For the above read sequence, after the `seek(10MB)` call is issued, block 0 has not been read
completely so the subsequent call to `read()` will cache it, on the assumption that the caller
will probably want to read from it again.
For the above read sequence, when the `seek(10MB)` call is issued, block 0 has not been read
completely so cache it as the caller will probably want to read from it again.
After `seek(2MB)` is called, the position is back inside block 0. The next read can now be
satisfied from the locally cached block file, which is typically orders of magnitude faster
than a network based read.
When `seek(2MB)` is called, the position is back inside block 0.
The next read can now be satisfied from the locally cached block file, which is typically orders of
magnitude faster than a network based read.
NB: `seek()` is implemented lazily, so it only keeps track of the new position but does not
otherwise affect the internal state of the stream. Only when a `read()` is issued, it will call
the `ensureCurrentBuffer()` method and fetch a new block if required.

View File

@ -32,8 +32,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
@ -41,7 +39,13 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@ -120,20 +124,54 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
bytesRead += buffer.length;
// Blocks are fully read, no blocks should be cached
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
public void testReadLargeFileFullyLazySeek() throws Throwable {
describe("read a large file using readFully(position,buffer,offset,length),"
+ " uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;
while (bytesRead < largeFileSize) {
in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length,
largeFileSize - bytesRead));
bytesRead += buffer.length;
// Blocks are fully read, no blocks should be cached
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
@ -147,24 +185,31 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
byte[] buffer = new byte[blockSize];
// Don't read the block completely so it gets cached on seek
// Don't read block 0 completely so it gets cached on read after seek
in.read(buffer, 0, blockSize - S_1K * 10);
in.seek(blockSize + S_1K * 10);
// Backwards seek, will use cached block
// Seek to block 2 and read all of it
in.seek(blockSize * 2);
in.read(buffer, 0, blockSize);
// Seek to block 4 but don't read: noop.
in.seek(blockSize * 4);
// Backwards seek, will use cached block 0
in.seek(S_1K * 5);
in.read();
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1);
// block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched
// when we seek out of block 0, see cancelPrefetches()
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2);
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
// Blocks 0, 1, 3 were not fully read, so remain in the file cache
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
}
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
@ -184,14 +229,14 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
// The buffer pool is not used
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
// no prefetch ops, so no action_executor_acquired
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.prefetch;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -35,6 +36,7 @@ import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
/**
@ -97,7 +99,7 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
private void testRead0SizedFileHelper(S3ARemoteInputStream inputStream,
int bufferSize)
throws Exception {
assertEquals(0, inputStream.available());
assertAvailable(0, inputStream);
assertEquals(-1, inputStream.read());
assertEquals(-1, inputStream.read());
@ -121,8 +123,8 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize)
throws Exception {
assertEquals(bufferSize, inputStream.available());
assertEquals(0, inputStream.read());
assertAvailable(bufferSize - 1, inputStream);
assertEquals(1, inputStream.read());
byte[] buffer = new byte[2];
@ -170,12 +172,14 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
int bufferSize,
int fileSize)
throws Exception {
assertAvailable(0, inputStream);
assertEquals(0, inputStream.getPos());
inputStream.seek(7);
assertEquals(7, inputStream.getPos());
inputStream.seek(bufferSize);
assertAvailable(0, inputStream);
assertEquals(bufferSize, inputStream.getPos());
inputStream.seek(0);
assertAvailable(0, inputStream);
assertEquals(bufferSize, inputStream.available());
for (int i = 0; i < fileSize; i++) {
assertEquals(i, inputStream.read());
}
@ -187,11 +191,20 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
}
}
// Can seek to the EOF: read() will then return -1.
inputStream.seek(fileSize);
assertEquals(-1, inputStream.read());
// Test invalid seeks.
ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));
ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
() -> inputStream.seek(fileSize + 1));
}
@Test
@ -217,7 +230,7 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
assertEquals(7, inputStream.getPos());
inputStream.seek(0);
assertEquals(bufferSize, inputStream.available());
assertAvailable(0, inputStream);
for (int i = 0; i < fileSize; i++) {
assertEquals(i, inputStream.read());
}
@ -251,9 +264,10 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize)
throws Exception {
assertEquals(bufferSize, inputStream.available());
assertAvailable(0, inputStream);
assertEquals(0, inputStream.read());
assertEquals(1, inputStream.read());
assertAvailable(bufferSize - 2, inputStream);
inputStream.close();
@ -276,4 +290,11 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
// Verify a second close() does not throw.
inputStream.close();
}
private static void assertAvailable(int expected, InputStream inputStream)
throws IOException {
assertThat(inputStream.available())
.describedAs("Check available bytes on stream %s", inputStream)
.isEqualTo(expected);
}
}