HADOOP-16109. Parquet reading S3AFileSystem causes EOF (#589)
Nobody gets seek right. No matter how many times they think they have. Reproducible test from: Dave Christianson Fixed seek() logic: Steve Loughran Change-Id: I39b87f3d5daa98f65de2c0a44e348821a4930573
This commit is contained in:
parent
396fcee0b0
commit
9b8044d00b
|
@ -272,7 +272,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
describe("Seek round a large file and verify the bytes are what is expected");
|
||||
Path testSeekFile = path("bigseekfile.txt");
|
||||
byte[] block = dataset(100 * 1024, 0, 255);
|
||||
createFile(getFileSystem(), testSeekFile, false, block);
|
||||
createFile(getFileSystem(), testSeekFile, true, block);
|
||||
instream = getFileSystem().open(testSeekFile);
|
||||
assertEquals(0, instream.getPos());
|
||||
//expect that seek to 0 works
|
||||
|
@ -309,7 +309,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
assumeSupportsPositionedReadable();
|
||||
Path testSeekFile = path("bigseekfile.txt");
|
||||
byte[] block = dataset(65536, 0, 255);
|
||||
createFile(getFileSystem(), testSeekFile, false, block);
|
||||
createFile(getFileSystem(), testSeekFile, true, block);
|
||||
instream = getFileSystem().open(testSeekFile);
|
||||
instream.seek(39999);
|
||||
assertTrue(-1 != instream.read());
|
||||
|
|
|
@ -234,7 +234,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
long forwardSeekLimit = Math.min(remainingInCurrentRequest,
|
||||
forwardSeekRange);
|
||||
boolean skipForward = remainingInCurrentRequest > 0
|
||||
&& diff <= forwardSeekLimit;
|
||||
&& diff < forwardSeekLimit;
|
||||
if (skipForward) {
|
||||
// the forward seek range is within the limits
|
||||
LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
|
||||
|
@ -248,6 +248,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
|
||||
if (pos == targetPos) {
|
||||
// all is well
|
||||
LOG.debug("Now at {}: bytes remaining in current request: {}",
|
||||
pos, remainingInCurrentRequest());
|
||||
return;
|
||||
} else {
|
||||
// log a warning; continue to attempt to re-open
|
||||
|
|
|
@ -18,19 +18,80 @@
|
|||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
|
||||
|
||||
/**
|
||||
* S3A contract tests covering file seek.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestS3AContractSeek extends AbstractContractSeekTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestS3AContractSeek.class);
|
||||
|
||||
protected static final int READAHEAD = 1024;
|
||||
|
||||
private final String seekPolicy;
|
||||
|
||||
public static final int DATASET_LEN = READAHEAD * 2;
|
||||
|
||||
public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
|
||||
|
||||
/**
|
||||
* This test suite is parameterized for the different seek policies
|
||||
* which S3A Supports.
|
||||
* @return a list of seek policies to test.
|
||||
*/
|
||||
@Parameterized.Parameters
|
||||
public static Collection<Object[]> params() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{INPUT_FADV_RANDOM},
|
||||
{INPUT_FADV_SEQUENTIAL},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run the test with a chosen seek policy.
|
||||
* @param seekPolicy fadvise policy to use.
|
||||
*/
|
||||
public ITestS3AContractSeek(final String seekPolicy) {
|
||||
this.seekPolicy = seekPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a configuration, possibly patching in S3Guard options.
|
||||
* The FS is set to be uncached and the readahead and seek policies
|
||||
* of the bucket itself are removed, so as to guarantee that the
|
||||
* parameterized and test settings are
|
||||
* @return a configuration
|
||||
*/
|
||||
@Override
|
||||
|
@ -38,6 +99,19 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
|
|||
Configuration conf = super.createConfiguration();
|
||||
// patch in S3Guard options
|
||||
maybeEnableS3Guard(conf);
|
||||
// purge any per-bucket overrides.
|
||||
try {
|
||||
URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a")));
|
||||
S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf,
|
||||
READAHEAD_RANGE,
|
||||
INPUT_FADVISE);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// the FS is uncached, so will need clearing in test teardowns.
|
||||
S3ATestUtils.disableFilesystemCaching(conf);
|
||||
conf.setInt(READAHEAD_RANGE, READAHEAD);
|
||||
conf.set(INPUT_FADVISE, seekPolicy);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -45,4 +119,203 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
|
|||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
|
||||
fs.close();
|
||||
}
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This subclass of the {@code path(path)} operation adds the seek policy
|
||||
* to the end to guarantee uniqueness across different calls of the same
|
||||
* method.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected Path path(final String filepath) throws IOException {
|
||||
return super.path(filepath + "-" + seekPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Go to end, read then seek back to the previous position to force normal
|
||||
* seek policy to switch to random IO.
|
||||
* This will call readByte to trigger the second GET
|
||||
* @param in input stream
|
||||
* @return the byte read
|
||||
* @throws IOException failure.
|
||||
*/
|
||||
private byte readAtEndAndReturn(final FSDataInputStream in)
|
||||
throws IOException {
|
||||
long pos = in.getPos();
|
||||
in.seek(DATASET_LEN -1);
|
||||
in.readByte();
|
||||
// go back to start and force a new GET
|
||||
in.seek(pos);
|
||||
return in.readByte();
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the data read matches the dataset at the given offset.
|
||||
* This helps verify that the seek process is moving the read pointer
|
||||
* to the correct location in the file.
|
||||
* @param readOffset the offset in the file where the read began.
|
||||
* @param operation operation name for the assertion.
|
||||
* @param data data read in.
|
||||
* @param length length of data to check.
|
||||
*/
|
||||
private void assertDatasetEquals(
|
||||
final int readOffset, final String operation,
|
||||
final byte[] data,
|
||||
int length) {
|
||||
for (int i = 0; i < length; i++) {
|
||||
int o = readOffset + i;
|
||||
assertEquals(operation + " with seek policy " + seekPolicy
|
||||
+ "and read offset " + readOffset
|
||||
+ ": data[" + i + "] != DATASET[" + o + "]",
|
||||
DATASET[o], data[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3AFileSystem getFileSystem() {
|
||||
return (S3AFileSystem) super.getFileSystem();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadPolicyInFS() throws Throwable {
|
||||
describe("Verify the read policy is being consistently set");
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF.
|
||||
* This sets up a read which will span the active readahead and,
|
||||
* in random IO mode, a subsequent GET.
|
||||
*/
|
||||
@Test
|
||||
public void testReadAcrossReadahead() throws Throwable {
|
||||
describe("Sets up a read which will span the active readahead"
|
||||
+ " and the rest of the file.");
|
||||
Path path = path("testReadAcrossReadahead");
|
||||
writeTestDataset(path);
|
||||
FileSystem fs = getFileSystem();
|
||||
// forward seek reading across readahead boundary
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
final byte[] temp = new byte[5];
|
||||
in.readByte();
|
||||
int offset = READAHEAD - 1;
|
||||
in.readFully(offset, temp); // <-- works
|
||||
assertDatasetEquals(offset, "read spanning boundary", temp, temp.length);
|
||||
}
|
||||
// Read exactly on the the boundary
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
final byte[] temp = new byte[5];
|
||||
readAtEndAndReturn(in);
|
||||
assertEquals("current position", 1, (int)(in.getPos()));
|
||||
in.readFully(READAHEAD, temp);
|
||||
assertDatasetEquals(READAHEAD, "read exactly on boundary",
|
||||
temp, temp.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read across the end of the read buffer using the readByte call,
|
||||
* which will read a single byte only.
|
||||
*/
|
||||
@Test
|
||||
public void testReadSingleByteAcrossReadahead() throws Throwable {
|
||||
describe("Read over boundary using read()/readByte() calls.");
|
||||
Path path = path("testReadSingleByteAcrossReadahead");
|
||||
writeTestDataset(path);
|
||||
FileSystem fs = getFileSystem();
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
final byte[] b0 = new byte[1];
|
||||
readAtEndAndReturn(in);
|
||||
in.seek(READAHEAD - 1);
|
||||
b0[0] = in.readByte();
|
||||
assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0,
|
||||
b0.length);
|
||||
b0[0] = in.readByte();
|
||||
assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length);
|
||||
b0[0] = in.readByte();
|
||||
assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0,
|
||||
b0.length);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToReadaheadAndRead() throws Throwable {
|
||||
describe("Seek to just before readahead limit and call"
|
||||
+ " InputStream.read(byte[])");
|
||||
Path path = path("testSeekToReadaheadAndRead");
|
||||
FileSystem fs = getFileSystem();
|
||||
writeTestDataset(path);
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
readAtEndAndReturn(in);
|
||||
final byte[] temp = new byte[5];
|
||||
int offset = READAHEAD - 1;
|
||||
in.seek(offset);
|
||||
// expect to read at least one byte.
|
||||
int l = in.read(temp);
|
||||
assertTrue("Reading in temp data", l > 0);
|
||||
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
|
||||
assertDatasetEquals(offset, "read at end of boundary", temp, l);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToReadaheadExactlyAndRead() throws Throwable {
|
||||
describe("Seek to exactly the readahead limit and call"
|
||||
+ " InputStream.read(byte[])");
|
||||
Path path = path("testSeekToReadaheadExactlyAndRead");
|
||||
FileSystem fs = getFileSystem();
|
||||
writeTestDataset(path);
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
readAtEndAndReturn(in);
|
||||
final byte[] temp = new byte[5];
|
||||
int offset = READAHEAD;
|
||||
in.seek(offset);
|
||||
// expect to read at least one byte.
|
||||
int l = in.read(temp);
|
||||
LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
|
||||
assertTrue("Reading in temp data", l > 0);
|
||||
assertDatasetEquals(offset, "read at end of boundary", temp, l);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToReadaheadExactlyAndReadByte() throws Throwable {
|
||||
describe("Seek to exactly the readahead limit and call"
|
||||
+ " readByte()");
|
||||
Path path = path("testSeekToReadaheadExactlyAndReadByte");
|
||||
FileSystem fs = getFileSystem();
|
||||
writeTestDataset(path);
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
readAtEndAndReturn(in);
|
||||
final byte[] temp = new byte[1];
|
||||
int offset = READAHEAD;
|
||||
in.seek(offset);
|
||||
// expect to read a byte successfully.
|
||||
temp[0] = in.readByte();
|
||||
assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1);
|
||||
LOG.info("Read of byte at offset {} returned expected value", offset);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the standard {@link #DATASET} dataset to the given path.
|
||||
* @param path path to write to.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
private void writeTestDataset(final Path path) throws IOException {
|
||||
ContractTestUtils.writeDataset(getFileSystem(), path,
|
||||
DATASET, DATASET_LEN, READAHEAD, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -163,4 +163,10 @@ public interface S3ATestConstants {
|
|||
*/
|
||||
String CONFIGURATION_TEST_ENDPOINT =
|
||||
"test.fs.s3a.endpoint";
|
||||
|
||||
/**
|
||||
* Property to set to disable caching.
|
||||
*/
|
||||
String FS_S3A_IMPL_DISABLE_CACHE
|
||||
= "fs.s3a.impl.disable.cache";
|
||||
}
|
||||
|
|
|
@ -36,15 +36,18 @@ import org.junit.internal.AssumptionViolatedException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
|
@ -444,6 +447,74 @@ public final class S3ATestUtils {
|
|||
reset(metrics);
|
||||
}
|
||||
|
||||
/**
|
||||
* Variant of {@code LambdaTestUtils#intercept() which closes the Closeable
|
||||
* returned by the invoked operation, and using its toString() value
|
||||
* for exception messages.
|
||||
* @param clazz class of exception; the raised exception must be this class
|
||||
* <i>or a subclass</i>.
|
||||
* @param contained string which must be in the {@code toString()} value
|
||||
* of the exception
|
||||
* @param eval expression to eval
|
||||
* @param <T> return type of expression
|
||||
* @param <E> exception class
|
||||
* @return the caught exception if it was of the expected type and contents
|
||||
*/
|
||||
public static <E extends Throwable, T extends Closeable> E interceptClosing(
|
||||
Class<E> clazz,
|
||||
String contained,
|
||||
final Callable<T> eval)
|
||||
throws Exception {
|
||||
|
||||
return intercept(clazz, contained,
|
||||
new Callable<String>() {
|
||||
@Override
|
||||
public String call() throws Exception {
|
||||
try (final Closeable c = eval.call()) {
|
||||
return c.toString();
|
||||
}
|
||||
}});
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove any values from a bucket.
|
||||
* @param bucket bucket whose overrides are to be removed. Can be null/empty
|
||||
* @param conf config
|
||||
* @param options list of fs.s3a options to remove
|
||||
*/
|
||||
public static void removeBucketOverrides(final String bucket,
|
||||
final Configuration conf,
|
||||
final String... options) {
|
||||
|
||||
if (StringUtils.isEmpty(bucket)) {
|
||||
return;
|
||||
}
|
||||
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket + '.';
|
||||
for (String option : options) {
|
||||
final String stripped = option.substring("fs.s3a.".length());
|
||||
String target = bucketPrefix + stripped;
|
||||
if (conf.get(target) != null) {
|
||||
LOG.debug("Removing option {}", target);
|
||||
conf.unset(target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove any values from a bucket and the base values too.
|
||||
* @param bucket bucket whose overrides are to be removed. Can be null/empty.
|
||||
* @param conf config
|
||||
* @param options list of fs.s3a options to remove
|
||||
*/
|
||||
public static void removeBaseAndBucketOverrides(final String bucket,
|
||||
final Configuration conf,
|
||||
final String... options) {
|
||||
for (String option : options) {
|
||||
conf.unset(option);
|
||||
}
|
||||
removeBucketOverrides(bucket, conf, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class to do diffs of metrics.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue