From ed5d10ee48260bfe3928eac160a8404c0baf5d4a Mon Sep 17 00:00:00 2001 From: Zamil Majdy <76959103+majdyz@users.noreply.github.com> Date: Sat, 26 Jun 2021 02:01:48 +0700 Subject: [PATCH] HADOOP-17764. S3AInputStream read does not re-open the input stream on the second read retry attempt (#3109) Contributed by Zamil Majdy. --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 5 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 8 +- .../fs/s3a/TestS3AInputStreamRetry.java | 209 ++++++++++++++++++ 3 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 439d52edc14..1522432c9f5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1439,10 +1439,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * using FS state as well as the status. * @param fileStatus file status. * @param seekPolicy input policy for this operation + * @param changePolicy change policy for this operation. * @param readAheadRange readahead value. + * @param auditSpan audit span. * @return a context for read and select operations. */ - private S3AReadOpContext createReadContext( + @VisibleForTesting + protected S3AReadOpContext createReadContext( final FileStatus fileStatus, final S3AInputPolicy seekPolicy, final ChangeDetectionPolicy changePolicy, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index d56d4ac433e..b65dcc95293 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -424,10 +424,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, return -1; } catch (SocketTimeoutException e) { onReadFailure(e, 1, true); - b = wrappedStream.read(); + throw e; } catch (IOException e) { onReadFailure(e, 1, false); - b = wrappedStream.read(); + throw e; } return b; }); @@ -513,10 +513,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, return -1; } catch (SocketTimeoutException e) { onReadFailure(e, len, true); - bytes = wrappedStream.read(buf, off, len); + throw e; } catch (IOException e) { onReadFailure(e, len, false); - bytes= wrappedStream.read(buf, off, len); + throw e; } return bytes; }); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java new file mode 100644 index 00000000000..05a07ce444c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.net.SocketException; +import java.nio.charset.Charset; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.junit.Test; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; +import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; + +import static java.lang.Math.min; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests S3AInputStream retry behavior on read failure. + * These tests are for validating expected behavior of retrying the + * S3AInputStream read() and read(b, off, len), it tests that the read should + * reopen the input stream and retry the read when IOException is thrown + * during the read process. + */ +public class TestS3AInputStreamRetry extends AbstractS3AMockTest { + + private static final String INPUT = "ab"; + + @Test + public void testInputStreamReadRetryForException() throws IOException { + S3AInputStream s3AInputStream = getMockedS3AInputStream(); + + assertEquals("'a' from the test input stream 'ab' should be the first " + + "character being read", INPUT.charAt(0), s3AInputStream.read()); + assertEquals("'b' from the test input stream 'ab' should be the second " + + "character being read", INPUT.charAt(1), s3AInputStream.read()); + } + + @Test + public void testInputStreamReadLengthRetryForException() throws IOException { + byte[] result = new byte[INPUT.length()]; + S3AInputStream s3AInputStream = getMockedS3AInputStream(); + s3AInputStream.read(result, 0, INPUT.length()); + + assertArrayEquals( + "The read result should equals to the test input stream content", + INPUT.getBytes(), result); + } + + @Test + public void testInputStreamReadFullyRetryForException() throws IOException { + byte[] result = new byte[INPUT.length()]; + S3AInputStream s3AInputStream = getMockedS3AInputStream(); + s3AInputStream.readFully(0, result); + + assertArrayEquals( + "The read result should equals to the test input stream content", + INPUT.getBytes(), result); + } + + private S3AInputStream getMockedS3AInputStream() { + Path path = new Path("test-path"); + String eTag = "test-etag"; + String versionId = "test-version-id"; + String owner = "test-owner"; + + S3AFileStatus s3AFileStatus = new S3AFileStatus( + INPUT.length(), 0, path, INPUT.length(), owner, eTag, versionId); + + S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( + fs.getBucket(), + path, + fs.pathToKey(path), + fs.getServerSideEncryptionAlgorithm(), + new EncryptionSecrets().getEncryptionKey(), + eTag, + versionId, + INPUT.length()); + + S3AReadOpContext s3AReadOpContext = fs.createReadContext( + s3AFileStatus, S3AInputPolicy.Normal, + ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE); + + return new S3AInputStream( + s3AReadOpContext, + s3ObjectAttributes, + getMockedInputStreamCallback()); + } + + /** + * Get mocked InputStreamCallbacks where we return mocked S3Object. + * + * @return mocked object. + */ + private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { + return new S3AInputStream.InputStreamCallbacks() { + + private final S3Object mockedS3Object = getMockedS3Object(); + + @Override + public S3Object getObject(GetObjectRequest request) { + // Set s3 client to return mocked s3object with defined read behavior. + return mockedS3Object; + } + + @Override + public GetObjectRequest newGetRequest(String key) { + return new GetObjectRequest(fs.getBucket(), key); + } + + @Override + public void close() { + } + }; + } + + /** + * Get mocked S3Object that returns bad input stream on the initial of + * getObjectContent calls. + * + * @return mocked object. + */ + private S3Object getMockedS3Object() { + S3ObjectInputStream objectInputStreamBad1 = getMockedInputStream(true); + S3ObjectInputStream objectInputStreamBad2 = getMockedInputStream(true); + S3ObjectInputStream objectInputStreamGood = getMockedInputStream(false); + + return new S3Object() { + private final S3ObjectInputStream[] inputStreams = + {objectInputStreamBad1, objectInputStreamBad2, objectInputStreamGood}; + + private Integer inputStreamIndex = 0; + + @Override + public S3ObjectInputStream getObjectContent() { + // Set getObjectContent behavior: + // Returns bad stream twice, and good stream afterwards. + inputStreamIndex++; + return inputStreams[min(inputStreamIndex, inputStreams.length) - 1]; + } + + @Override + public ObjectMetadata getObjectMetadata() { + // Set getObjectMetadata behavior: returns dummy metadata + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setHeader("ETag", "test-etag"); + return metadata; + } + }; + } + + /** + * Get mocked S3ObjectInputStream where we can trigger IOException to + * simulate the read failure. + * + * @param triggerFailure true when a failure injection is enabled. + * @return mocked object. + */ + private S3ObjectInputStream getMockedInputStream(boolean triggerFailure) { + return new S3ObjectInputStream( + IOUtils.toInputStream(INPUT, Charset.defaultCharset()), null) { + + private final IOException exception = + new SSLException(new SocketException("Connection reset")); + + @Override + public int read() throws IOException { + int result = super.read(); + if (triggerFailure) { + throw exception; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = super.read(b, off, len); + if (triggerFailure) { + throw exception; + } + return result; + } + }; + } +}