HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2022-10-19 06:38:11 -07:00 committed by GitHub
parent 6207ac47e0
commit 8aa04b0b24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 9 deletions

View File

@ -1164,6 +1164,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() { public S3AInputStreamStatistics getS3AStreamStatistics() {
return streamStatistics; return streamStatistics;
} }

View File

@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
*/ */
private S3ARemoteInputStream inputStream; private S3ARemoteInputStream inputStream;
/**
* To be only used by synchronized getPos().
*/
private long lastReadCurrentPos = 0;
/**
* To be only used by getIOStatistics().
*/
private IOStatistics ioStatistics = null;
/**
* To be only used by getS3AStreamStatistics().
*/
private S3AInputStreamStatistics inputStreamStatistics = null;
/** /**
* Initializes a new instance of the {@code S3APrefetchingInputStream} class. * Initializes a new instance of the {@code S3APrefetchingInputStream} class.
* *
@ -115,14 +131,20 @@ public class S3APrefetchingInputStream
} }
/** /**
* Gets the current position. * Gets the current position. If the underlying S3 input stream is closed,
* it returns last read current position from the underlying steam. If the
* current position was never read and the underlying input stream is closed,
* this would return 0.
* *
* @return the current position. * @return the current position.
* @throws IOException if there is an IO error during this operation. * @throws IOException if there is an IO error during this operation.
*/ */
@Override @Override
public synchronized long getPos() throws IOException { public synchronized long getPos() throws IOException {
return isClosed() ? 0 : inputStream.getPos(); if (!isClosed()) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
} }
/** /**
@ -215,11 +237,12 @@ public class S3APrefetchingInputStream
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() { public S3AInputStreamStatistics getS3AStreamStatistics() {
if (isClosed()) { if (!isClosed()) {
return null; inputStreamStatistics = inputStream.getS3AStreamStatistics();
} }
return inputStream.getS3AStreamStatistics(); return inputStreamStatistics;
} }
/** /**
@ -229,10 +252,10 @@ public class S3APrefetchingInputStream
*/ */
@Override @Override
public IOStatistics getIOStatistics() { public IOStatistics getIOStatistics() {
if (isClosed()) { if (!isClosed()) {
return null; ioStatistics = inputStream.getIOStatistics();
} }
return inputStream.getIOStatistics(); return ioStatistics;
} }
protected boolean isClosed() { protected boolean isClosed() {
@ -249,7 +272,6 @@ public class S3APrefetchingInputStream
@Override @Override
public boolean seekToNewSource(long targetPos) throws IOException { public boolean seekToNewSource(long targetPos) throws IOException {
throwIfClosed();
return false; return false;
} }

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
@ -240,4 +242,56 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
} }
} }
@Test
public void testStatusProbesAfterClosingStream() throws Throwable {
describe("When the underlying input stream is closed, the prefetch input stream"
+ " should still support some status probes");
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
FSDataInputStream in = getFileSystem().open(smallFile);
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);
in.close();
// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);
// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);
assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
}
} }