HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of the object from S3. (Dan Hecht via stevel).
This commit is contained in:
parent
2cbac36fd3
commit
701b96ca8e
|
@ -542,6 +542,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
|
HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
|
||||||
to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
|
to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
|
||||||
|
|
||||||
|
HADOOP-11570. S3AInputStream.close() downloads the remaining bytes of
|
||||||
|
the object from S3. (Dan Hecht via stevel).
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -37,14 +37,13 @@ public class S3AInputStream extends FSInputStream {
|
||||||
private long pos;
|
private long pos;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private S3ObjectInputStream wrappedStream;
|
private S3ObjectInputStream wrappedStream;
|
||||||
private S3Object wrappedObject;
|
|
||||||
private FileSystem.Statistics stats;
|
private FileSystem.Statistics stats;
|
||||||
private AmazonS3Client client;
|
private AmazonS3Client client;
|
||||||
private String bucket;
|
private String bucket;
|
||||||
private String key;
|
private String key;
|
||||||
private long contentLength;
|
private long contentLength;
|
||||||
public static final Logger LOG = S3AFileSystem.LOG;
|
public static final Logger LOG = S3AFileSystem.LOG;
|
||||||
|
public static final long CLOSE_THRESHOLD = 4096;
|
||||||
|
|
||||||
public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
|
public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
|
||||||
FileSystem.Statistics stats) {
|
FileSystem.Statistics stats) {
|
||||||
|
@ -55,12 +54,11 @@ public class S3AInputStream extends FSInputStream {
|
||||||
this.stats = stats;
|
this.stats = stats;
|
||||||
this.pos = 0;
|
this.pos = 0;
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
this.wrappedObject = null;
|
|
||||||
this.wrappedStream = null;
|
this.wrappedStream = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void openIfNeeded() throws IOException {
|
private void openIfNeeded() throws IOException {
|
||||||
if (wrappedObject == null) {
|
if (wrappedStream == null) {
|
||||||
reopen(0);
|
reopen(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,8 +88,7 @@ public class S3AInputStream extends FSInputStream {
|
||||||
GetObjectRequest request = new GetObjectRequest(bucket, key);
|
GetObjectRequest request = new GetObjectRequest(bucket, key);
|
||||||
request.setRange(pos, contentLength-1);
|
request.setRange(pos, contentLength-1);
|
||||||
|
|
||||||
wrappedObject = client.getObject(request);
|
wrappedStream = client.getObject(request).getObjectContent();
|
||||||
wrappedStream = wrappedObject.getObjectContent();
|
|
||||||
|
|
||||||
if (wrappedStream == null) {
|
if (wrappedStream == null) {
|
||||||
throw new IOException("Null IO stream");
|
throw new IOException("Null IO stream");
|
||||||
|
@ -192,8 +189,15 @@ public class S3AInputStream extends FSInputStream {
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
super.close();
|
super.close();
|
||||||
closed = true;
|
closed = true;
|
||||||
if (wrappedObject != null) {
|
if (wrappedStream != null) {
|
||||||
wrappedObject.close();
|
if (contentLength - pos <= CLOSE_THRESHOLD) {
|
||||||
|
// Close, rather than abort, so that the http connection can be reused.
|
||||||
|
wrappedStream.close();
|
||||||
|
} else {
|
||||||
|
// Abort, rather than just close, the underlying stream. Otherwise, the
|
||||||
|
// remaining object payload is read from S3 while closing the stream.
|
||||||
|
wrappedStream.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue