diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 750358d9574..42178a49a47 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; @@ -37,82 +36,128 @@ public class S3AInputStream extends FSInputStream { private long pos; private boolean closed; private S3ObjectInputStream wrappedStream; - private FileSystem.Statistics stats; - private AmazonS3Client client; - private String bucket; - private String key; - private long contentLength; + private final FileSystem.Statistics stats; + private final AmazonS3Client client; + private final String bucket; + private final String key; + private final long contentLength; + private final String uri; public static final Logger LOG = S3AFileSystem.LOG; public static final long CLOSE_THRESHOLD = 4096; - public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client, - FileSystem.Statistics stats) { + // Used by lazy seek + private long nextReadPos; + + //Amount of data requested from the request + private long requestedStreamLen; + + public S3AInputStream(String bucket, String key, long contentLength, + AmazonS3Client client, FileSystem.Statistics stats) { this.bucket = bucket; this.key = key; this.contentLength = contentLength; this.client = client; this.stats = stats; this.pos = 0; + this.nextReadPos = 0; this.closed = false; this.wrappedStream = null; + this.uri = "s3a://" + this.bucket + "/" + this.key; } - private void openIfNeeded() throws IOException { - if (wrappedStream == null) { - reopen(0); - } - } - - private synchronized void reopen(long pos) throws IOException { + /** + * Opens up the stream at specified target position and for given length. + * + * @param targetPos target position + * @param length length requested + * @throws IOException + */ + private synchronized void reopen(long targetPos, long length) + throws IOException { + requestedStreamLen = (length < 0) ? this.contentLength : + Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length))); if (wrappedStream != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Aborting old stream to open at pos " + pos); + LOG.debug("Closing the previous stream"); } - wrappedStream.abort(); + closeStream(requestedStreamLen); } - if (pos < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK - +" " + pos); + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting for " + + "targetPos=" + targetPos + + ", length=" + length + + ", requestedStreamLen=" + requestedStreamLen + + ", streamPosition=" + pos + + ", nextReadPosition=" + nextReadPos + ); } - if (contentLength > 0 && pos > contentLength-1) { - throw new EOFException( - FSExceptionMessages.CANNOT_SEEK_PAST_EOF - + " " + pos); - } - - LOG.debug("Actually opening file " + key + " at pos " + pos); - - GetObjectRequest request = new GetObjectRequest(bucket, key); - request.setRange(pos, contentLength-1); - + GetObjectRequest request = new GetObjectRequest(bucket, key) + .withRange(targetPos, requestedStreamLen); wrappedStream = client.getObject(request).getObjectContent(); if (wrappedStream == null) { throw new IOException("Null IO stream"); } - this.pos = pos; + this.pos = targetPos; } @Override public synchronized long getPos() throws IOException { - return pos; + return (nextReadPos < 0) ? 0 : nextReadPos; } @Override - public synchronized void seek(long pos) throws IOException { + public synchronized void seek(long targetPos) throws IOException { checkNotClosed(); - if (this.pos == pos) { + // Do not allow negative seek + if (targetPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + + " " + targetPos); + } + + if (this.contentLength <= 0) { return; } - LOG.debug( - "Reopening " + this.key + " to seek to new offset " + (pos - this.pos)); - reopen(pos); + // Lazy seek + nextReadPos = targetPos; + } + + /** + * Adjust the stream to a specific position. + * + * @param targetPos target seek position + * @param length length of content that needs to be read from targetPos + * @throws IOException + */ + private void seekInStream(long targetPos, long length) throws IOException { + checkNotClosed(); + if (wrappedStream == null) { + return; + } + + // compute how much more to skip + long diff = targetPos - pos; + if (targetPos > pos) { + if ((diff + length) <= wrappedStream.available()) { + // already available in buffer + pos += wrappedStream.skip(diff); + if (pos != targetPos) { + throw new IOException("Failed to seek to " + targetPos + + ". Current position " + pos); + } + return; + } + } + + // close the stream; if read the object will be opened at the new pos + closeStream(this.requestedStreamLen); + pos = targetPos; } @Override @@ -120,22 +165,40 @@ public class S3AInputStream extends FSInputStream { return false; } + /** + * Perform lazy seek and adjust stream to correct position for reading. + * + * @param targetPos position from where data should be read + * @param len length of the content that needs to be read + */ + private void lazySeek(long targetPos, long len) throws IOException { + //For lazy seek + if (targetPos != this.pos) { + seekInStream(targetPos, len); + } + + //re-open at specific location if needed + if (wrappedStream == null) { + reopen(targetPos, len); + } + } + @Override public synchronized int read() throws IOException { checkNotClosed(); + if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + return -1; + } - openIfNeeded(); + lazySeek(nextReadPos, 1); int byteRead; try { byteRead = wrappedStream.read(); - } catch (SocketTimeoutException e) { - LOG.info("Got timeout while trying to read from stream, trying to recover " + e); - reopen(pos); - byteRead = wrappedStream.read(); - } catch (SocketException e) { - LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); - reopen(pos); + } catch (SocketTimeoutException | SocketException e) { + LOG.info("Got exception while trying to read from stream," + + " trying to recover " + e); + reopen(pos, 1); byteRead = wrappedStream.read(); } catch (EOFException e) { return -1; @@ -143,6 +206,7 @@ public class S3AInputStream extends FSInputStream { if (byteRead >= 0) { pos++; + nextReadPos++; } if (stats != null && byteRead >= 0) { @@ -152,26 +216,34 @@ public class S3AInputStream extends FSInputStream { } @Override - public synchronized int read(byte[] buf, int off, int len) throws IOException { + public synchronized int read(byte[] buf, int off, int len) + throws IOException { checkNotClosed(); - openIfNeeded(); + validatePositionedReadArgs(nextReadPos, buf, off, len); + if (len == 0) { + return 0; + } + + if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + return -1; + } + + lazySeek(nextReadPos, len); int byteRead; try { byteRead = wrappedStream.read(buf, off, len); - } catch (SocketTimeoutException e) { - LOG.info("Got timeout while trying to read from stream, trying to recover " + e); - reopen(pos); - byteRead = wrappedStream.read(buf, off, len); - } catch (SocketException e) { - LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); - reopen(pos); + } catch (SocketTimeoutException | SocketException e) { + LOG.info("Got exception while trying to read from stream," + + " trying to recover " + e); + reopen(pos, len); byteRead = wrappedStream.read(buf, off, len); } if (byteRead > 0) { pos += byteRead; + nextReadPos += byteRead; } if (stats != null && byteRead > 0) { @@ -191,15 +263,43 @@ public class S3AInputStream extends FSInputStream { public synchronized void close() throws IOException { super.close(); closed = true; + closeStream(this.contentLength); + } + + /** + * Close a stream: decide whether to abort or close, based on + * the length of the stream and the current position. + * + * This does not set the {@link #closed} flag. + * @param length length of the stream. + * @throws IOException + */ + private void closeStream(long length) throws IOException { if (wrappedStream != null) { - if (contentLength - pos <= CLOSE_THRESHOLD) { - // Close, rather than abort, so that the http connection can be reused. - wrappedStream.close(); - } else { + String reason = null; + boolean shouldAbort = length - pos > CLOSE_THRESHOLD; + if (!shouldAbort) { + try { + reason = "Closed stream"; + wrappedStream.close(); + } catch (IOException e) { + // exception escalates to an abort + LOG.debug("When closing stream", e); + shouldAbort = true; + } + } + if (shouldAbort) { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. wrappedStream.abort(); + reason = "Closed stream with abort"; } + if (LOG.isDebugEnabled()) { + LOG.debug(reason + "; streamPos=" + pos + + ", nextReadPos=" + nextReadPos + + ", contentLength=" + length); + } + wrappedStream = null; } } @@ -219,6 +319,18 @@ public class S3AInputStream extends FSInputStream { return false; } + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "S3AInputStream{"); + sb.append(uri); + sb.append(" pos=").append(pos); + sb.append(" nextReadPos=").append(nextReadPos); + sb.append(" contentLength=").append(contentLength); + sb.append('}'); + return sb.toString(); + } + /** * Subclass {@code readFully()} operation which only seeks at the start * of the series of operations; seeking back at the end. @@ -234,6 +346,7 @@ public class S3AInputStream extends FSInputStream { @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + checkNotClosed(); validatePositionedReadArgs(position, buffer, offset, length); if (length == 0) { return;