From 9b8044d00b0edb0a597c6fd768e9be6a96da74da Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 13 Mar 2019 15:33:58 +0000 Subject: [PATCH] HADOOP-16109. Parquet reading S3AFileSystem causes EOF (#589) Nobody gets seek right. No matter how many times they think they have. Reproducible test from: Dave Christianson Fixed seek() logic: Steve Loughran Change-Id: I39b87f3d5daa98f65de2c0a44e348821a4930573 --- .../fs/contract/AbstractContractSeekTest.java | 4 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 4 +- .../fs/contract/s3a/ITestS3AContractSeek.java | 273 ++++++++++++++++++ .../hadoop/fs/s3a/S3ATestConstants.java | 6 + .../apache/hadoop/fs/s3a/S3ATestUtils.java | 71 +++++ 5 files changed, 355 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 7af3cb0a525..957f86a14e0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -272,7 +272,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas describe("Seek round a large file and verify the bytes are what is expected"); Path testSeekFile = path("bigseekfile.txt"); byte[] block = dataset(100 * 1024, 0, 255); - createFile(getFileSystem(), testSeekFile, false, block); + createFile(getFileSystem(), testSeekFile, true, block); instream = getFileSystem().open(testSeekFile); assertEquals(0, instream.getPos()); //expect that seek to 0 works @@ -309,7 +309,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas assumeSupportsPositionedReadable(); Path testSeekFile = path("bigseekfile.txt"); byte[] block = dataset(65536, 0, 255); - createFile(getFileSystem(), testSeekFile, false, block); + createFile(getFileSystem(), testSeekFile, true, block); instream = getFileSystem().open(testSeekFile); instream.seek(39999); assertTrue(-1 != instream.read()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 09dd7476b6c..62c6635f27e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -234,7 +234,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { long forwardSeekLimit = Math.min(remainingInCurrentRequest, forwardSeekRange); boolean skipForward = remainingInCurrentRequest > 0 - && diff <= forwardSeekLimit; + && diff < forwardSeekLimit; if (skipForward) { // the forward seek range is within the limits LOG.debug("Forward seek on {}, of {} bytes", uri, diff); @@ -248,6 +248,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { if (pos == targetPos) { // all is well + LOG.debug("Now at {}: bytes remaining in current request: {}", + pos, remainingInCurrentRequest()); return; } else { // log a warning; continue to attempt to re-open diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java index 379ace8ffec..252fcd8fe81 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java @@ -18,19 +18,80 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; /** * S3A contract tests covering file seek. */ +@RunWith(Parameterized.class) public class ITestS3AContractSeek extends AbstractContractSeekTest { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AContractSeek.class); + + protected static final int READAHEAD = 1024; + + private final String seekPolicy; + + public static final int DATASET_LEN = READAHEAD * 2; + + public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + + /** + * This test suite is parameterized for the different seek policies + * which S3A Supports. + * @return a list of seek policies to test. + */ + @Parameterized.Parameters + public static Collection params() { + return Arrays.asList(new Object[][]{ + {INPUT_FADV_RANDOM}, + {INPUT_FADV_SEQUENTIAL}, + }); + } + + /** + * Run the test with a chosen seek policy. + * @param seekPolicy fadvise policy to use. + */ + public ITestS3AContractSeek(final String seekPolicy) { + this.seekPolicy = seekPolicy; + } + /** * Create a configuration, possibly patching in S3Guard options. + * The FS is set to be uncached and the readahead and seek policies + * of the bucket itself are removed, so as to guarantee that the + * parameterized and test settings are * @return a configuration */ @Override @@ -38,6 +99,19 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { Configuration conf = super.createConfiguration(); // patch in S3Guard options maybeEnableS3Guard(conf); + // purge any per-bucket overrides. + try { + URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a"))); + S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf, + READAHEAD_RANGE, + INPUT_FADVISE); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + // the FS is uncached, so will need clearing in test teardowns. + S3ATestUtils.disableFilesystemCaching(conf); + conf.setInt(READAHEAD_RANGE, READAHEAD); + conf.set(INPUT_FADVISE, seekPolicy); return conf; } @@ -45,4 +119,203 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Override + public void teardown() throws Exception { + S3AFileSystem fs = getFileSystem(); + if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) { + fs.close(); + } + super.teardown(); + } + + /** + * This subclass of the {@code path(path)} operation adds the seek policy + * to the end to guarantee uniqueness across different calls of the same + * method. + * + * {@inheritDoc} + */ + @Override + protected Path path(final String filepath) throws IOException { + return super.path(filepath + "-" + seekPolicy); + } + + /** + * Go to end, read then seek back to the previous position to force normal + * seek policy to switch to random IO. + * This will call readByte to trigger the second GET + * @param in input stream + * @return the byte read + * @throws IOException failure. + */ + private byte readAtEndAndReturn(final FSDataInputStream in) + throws IOException { + long pos = in.getPos(); + in.seek(DATASET_LEN -1); + in.readByte(); + // go back to start and force a new GET + in.seek(pos); + return in.readByte(); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + */ + private void assertDatasetEquals( + final int readOffset, final String operation, + final byte[] data, + int length) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with seek policy " + seekPolicy + + "and read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + DATASET[o], data[i]); + } + } + + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + @Test + public void testReadPolicyInFS() throws Throwable { + describe("Verify the read policy is being consistently set"); + S3AFileSystem fs = getFileSystem(); + assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy()); + } + + /** + * Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF. + * This sets up a read which will span the active readahead and, + * in random IO mode, a subsequent GET. + */ + @Test + public void testReadAcrossReadahead() throws Throwable { + describe("Sets up a read which will span the active readahead" + + " and the rest of the file."); + Path path = path("testReadAcrossReadahead"); + writeTestDataset(path); + FileSystem fs = getFileSystem(); + // forward seek reading across readahead boundary + try (FSDataInputStream in = fs.open(path)) { + final byte[] temp = new byte[5]; + in.readByte(); + int offset = READAHEAD - 1; + in.readFully(offset, temp); // <-- works + assertDatasetEquals(offset, "read spanning boundary", temp, temp.length); + } + // Read exactly on the the boundary + try (FSDataInputStream in = fs.open(path)) { + final byte[] temp = new byte[5]; + readAtEndAndReturn(in); + assertEquals("current position", 1, (int)(in.getPos())); + in.readFully(READAHEAD, temp); + assertDatasetEquals(READAHEAD, "read exactly on boundary", + temp, temp.length); + } + } + + /** + * Read across the end of the read buffer using the readByte call, + * which will read a single byte only. + */ + @Test + public void testReadSingleByteAcrossReadahead() throws Throwable { + describe("Read over boundary using read()/readByte() calls."); + Path path = path("testReadSingleByteAcrossReadahead"); + writeTestDataset(path); + FileSystem fs = getFileSystem(); + try (FSDataInputStream in = fs.open(path)) { + final byte[] b0 = new byte[1]; + readAtEndAndReturn(in); + in.seek(READAHEAD - 1); + b0[0] = in.readByte(); + assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0, + b0.length); + b0[0] = in.readByte(); + assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length); + b0[0] = in.readByte(); + assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0, + b0.length); + } + } + + @Test + public void testSeekToReadaheadAndRead() throws Throwable { + describe("Seek to just before readahead limit and call" + + " InputStream.read(byte[])"); + Path path = path("testSeekToReadaheadAndRead"); + FileSystem fs = getFileSystem(); + writeTestDataset(path); + try (FSDataInputStream in = fs.open(path)) { + readAtEndAndReturn(in); + final byte[] temp = new byte[5]; + int offset = READAHEAD - 1; + in.seek(offset); + // expect to read at least one byte. + int l = in.read(temp); + assertTrue("Reading in temp data", l > 0); + LOG.info("Read of byte array at offset {} returned {} bytes", offset, l); + assertDatasetEquals(offset, "read at end of boundary", temp, l); + } + } + + @Test + public void testSeekToReadaheadExactlyAndRead() throws Throwable { + describe("Seek to exactly the readahead limit and call" + + " InputStream.read(byte[])"); + Path path = path("testSeekToReadaheadExactlyAndRead"); + FileSystem fs = getFileSystem(); + writeTestDataset(path); + try (FSDataInputStream in = fs.open(path)) { + readAtEndAndReturn(in); + final byte[] temp = new byte[5]; + int offset = READAHEAD; + in.seek(offset); + // expect to read at least one byte. + int l = in.read(temp); + LOG.info("Read of byte array at offset {} returned {} bytes", offset, l); + assertTrue("Reading in temp data", l > 0); + assertDatasetEquals(offset, "read at end of boundary", temp, l); + } + } + + @Test + public void testSeekToReadaheadExactlyAndReadByte() throws Throwable { + describe("Seek to exactly the readahead limit and call" + + " readByte()"); + Path path = path("testSeekToReadaheadExactlyAndReadByte"); + FileSystem fs = getFileSystem(); + writeTestDataset(path); + try (FSDataInputStream in = fs.open(path)) { + readAtEndAndReturn(in); + final byte[] temp = new byte[1]; + int offset = READAHEAD; + in.seek(offset); + // expect to read a byte successfully. + temp[0] = in.readByte(); + assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1); + LOG.info("Read of byte at offset {} returned expected value", offset); + } + } + + /** + * Write the standard {@link #DATASET} dataset to the given path. + * @param path path to write to. + * @throws IOException failure + */ + private void writeTestDataset(final Path path) throws IOException { + ContractTestUtils.writeDataset(getFileSystem(), path, + DATASET, DATASET_LEN, READAHEAD, true); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 2c4f0094004..7f7802d24d7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -163,4 +163,10 @@ public interface S3ATestConstants { */ String CONFIGURATION_TEST_ENDPOINT = "test.fs.s3a.endpoint"; + + /** + * Property to set to disable caching. + */ + String FS_S3A_IMPL_DISABLE_CACHE + = "fs.s3a.impl.disable.cache"; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index b302e72b0bd..033395e7631 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -36,15 +36,18 @@ import org.junit.internal.AssumptionViolatedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.List; +import java.util.concurrent.Callable; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.*; /** @@ -444,6 +447,74 @@ public final class S3ATestUtils { reset(metrics); } + /** + * Variant of {@code LambdaTestUtils#intercept() which closes the Closeable + * returned by the invoked operation, and using its toString() value + * for exception messages. + * @param clazz class of exception; the raised exception must be this class + * or a subclass. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param eval expression to eval + * @param return type of expression + * @param exception class + * @return the caught exception if it was of the expected type and contents + */ + public static E interceptClosing( + Class clazz, + String contained, + final Callable eval) + throws Exception { + + return intercept(clazz, contained, + new Callable() { + @Override + public String call() throws Exception { + try (final Closeable c = eval.call()) { + return c.toString(); + } + }}); + } + + /** + * Remove any values from a bucket. + * @param bucket bucket whose overrides are to be removed. Can be null/empty + * @param conf config + * @param options list of fs.s3a options to remove + */ + public static void removeBucketOverrides(final String bucket, + final Configuration conf, + final String... options) { + + if (StringUtils.isEmpty(bucket)) { + return; + } + final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket + '.'; + for (String option : options) { + final String stripped = option.substring("fs.s3a.".length()); + String target = bucketPrefix + stripped; + if (conf.get(target) != null) { + LOG.debug("Removing option {}", target); + conf.unset(target); + } + } + } + + /** + * Remove any values from a bucket and the base values too. + * @param bucket bucket whose overrides are to be removed. Can be null/empty. + * @param conf config + * @param options list of fs.s3a options to remove + */ + public static void removeBaseAndBucketOverrides(final String bucket, + final Configuration conf, + final String... options) { + for (String option : options) { + conf.unset(option); + } + removeBucketOverrides(bucket, conf, options); + } + /** * Helper class to do diffs of metrics. */