HADOOP-13203 S3A: Support fadvise "random" mode for high performance readPositioned() reads. Contributed by Rajesh Balamohan and stevel.

This commit is contained in:
Steve Loughran 2016-06-22 15:41:26 +01:00
parent 67089875f0
commit 4ee3543625
11 changed files with 823 additions and 146 deletions

View File

@ -105,7 +105,10 @@ public abstract class FSInputStream extends InputStream
Preconditions.checkArgument(buffer != null, "Null buffer"); Preconditions.checkArgument(buffer != null, "Null buffer");
if (buffer.length - offset < length) { if (buffer.length - offset < length) {
throw new IndexOutOfBoundsException( throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER); FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ ": request length=" + length
+ ", with offset ="+ offset
+ "; buffer capacity =" + (buffer.length - offset));
} }
} }

View File

@ -271,7 +271,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
public void testSeekBigFile() throws Throwable { public void testSeekBigFile() throws Throwable {
describe("Seek round a large file and verify the bytes are what is expected"); describe("Seek round a large file and verify the bytes are what is expected");
Path testSeekFile = path("bigseekfile.txt"); Path testSeekFile = path("bigseekfile.txt");
byte[] block = dataset(65536, 0, 255); byte[] block = dataset(100 * 1024, 0, 255);
createFile(getFileSystem(), testSeekFile, false, block); createFile(getFileSystem(), testSeekFile, false, block);
instream = getFileSystem().open(testSeekFile); instream = getFileSystem().open(testSeekFile);
assertEquals(0, instream.getPos()); assertEquals(0, instream.getPos());
@ -291,6 +291,15 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
assertEquals("@8191", block[8191], (byte) instream.read()); assertEquals("@8191", block[8191], (byte) instream.read());
instream.seek(0); instream.seek(0);
assertEquals("@0", 0, (byte) instream.read()); assertEquals("@0", 0, (byte) instream.read());
// try read & readFully
instream.seek(0);
assertEquals(0, instream.getPos());
instream.read();
assertEquals(1, instream.getPos());
byte[] buf = new byte[80 * 1024];
instream.readFully(1, buf, 0, buf.length);
assertEquals(1, instream.getPos());
} }
@Test @Test

View File

@ -23,6 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
/** /**
* All the constants used with the {@link S3AFileSystem}. * All the constants used with the {@link S3AFileSystem}.
*
* Some of the strings are marked as {@code Unstable}. This means
* that they may be unsupported in future; at which point they will be marked
* as deprecated and simply ignored.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -154,4 +158,36 @@ public final class Constants {
/** read ahead buffer size to prevent connection re-establishments. */ /** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024; public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
/**
* Which input strategy to use for buffering, seeking and similar when
* reading data.
* Value: {@value}
*/
@InterfaceStability.Unstable
public static final String INPUT_FADVISE =
"fs.s3a.experimental.input.fadvise";
/**
* General input. Some seeks, some reads.
* Value: {@value}
*/
@InterfaceStability.Unstable
public static final String INPUT_FADV_NORMAL = "normal";
/**
* Optimized for sequential access.
* Value: {@value}
*/
@InterfaceStability.Unstable
public static final String INPUT_FADV_SEQUENTIAL = "sequential";
/**
* Optimized purely for random seek+read/positionedRead operations;
* The performance of sequential IO may be reduced in exchange for
* more efficient {@code seek()} operations.
* Value: {@value}
*/
@InterfaceStability.Unstable
public static final String INPUT_FADV_RANDOM = "random";
} }

View File

@ -29,6 +29,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@ -82,7 +83,6 @@ import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import static org.apache.commons.lang.StringUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.Statistic.*;
@ -126,6 +126,7 @@ public class S3AFileSystem extends FileSystem {
private S3AInstrumentation instrumentation; private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics; private S3AStorageStatistics storageStatistics;
private long readAhead; private long readAhead;
private S3AInputPolicy inputPolicy;
// The maximum number of entries that can be deleted in any call to s3 // The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000; private static final int MAX_ENTRIES_TO_DELETE = 1000;
@ -227,6 +228,8 @@ public class S3AFileSystem extends FileSystem {
serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e); throw translateException("initializing ", new Path(name), e);
} }
@ -482,6 +485,26 @@ public class S3AFileSystem extends FileSystem {
return s3; return s3;
} }
/**
* Get the input policy for this FS instance.
* @return the input policy
*/
@InterfaceStability.Unstable
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
/**
* Change the input policy for this FS.
* @param inputPolicy new policy
*/
@InterfaceStability.Unstable
public void setInputPolicy(S3AInputPolicy inputPolicy) {
Objects.requireNonNull(inputPolicy, "Null inputStrategy");
LOG.debug("Setting input strategy: {}", inputPolicy);
this.inputPolicy = inputPolicy;
}
public S3AFileSystem() { public S3AFileSystem() {
super(); super();
} }
@ -537,7 +560,8 @@ public class S3AFileSystem extends FileSystem {
} }
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics, instrumentation, readAhead)); fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
inputPolicy));
} }
/** /**
@ -1745,6 +1769,7 @@ public class S3AFileSystem extends FileSystem {
"S3AFileSystem{"); "S3AFileSystem{");
sb.append("uri=").append(uri); sb.append("uri=").append(uri);
sb.append(", workingDir=").append(workingDir); sb.append(", workingDir=").append(workingDir);
sb.append(", inputPolicy=").append(inputPolicy);
sb.append(", partSize=").append(partSize); sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete); sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
sb.append(", maxKeys=").append(maxKeys); sb.append(", maxKeys=").append(maxKeys);

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Filesystem input policy.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public enum S3AInputPolicy {
Normal(INPUT_FADV_NORMAL),
Sequential(INPUT_FADV_SEQUENTIAL),
Random(INPUT_FADV_RANDOM);
private static final Logger LOG =
LoggerFactory.getLogger(S3AInputPolicy.class);
private final String policy;
S3AInputPolicy(String policy) {
this.policy = policy;
}
@Override
public String toString() {
return policy;
}
/**
* Choose an FS access policy.
* Always returns something,
* primarily by downgrading to "normal" if there is no other match.
* @param name strategy name from a configuration option, etc.
* @return the chosen strategy
*/
public static S3AInputPolicy getPolicy(String name) {
String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
switch (trimmed) {
case INPUT_FADV_NORMAL:
return Normal;
case INPUT_FADV_RANDOM:
return Random;
case INPUT_FADV_SEQUENTIAL:
return Sequential;
default:
LOG.warn("Unrecognized " + INPUT_FADVISE + " value: \"{}\"", trimmed);
return Normal;
}
}
}

View File

@ -77,9 +77,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private final long contentLength; private final long contentLength;
private final String uri; private final String uri;
public static final Logger LOG = S3AFileSystem.LOG; public static final Logger LOG = S3AFileSystem.LOG;
public static final long CLOSE_THRESHOLD = 4096;
private final S3AInstrumentation.InputStreamStatistics streamStatistics; private final S3AInstrumentation.InputStreamStatistics streamStatistics;
private long readahead; private final S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
/** /**
* This is the actual position within the object, used by * This is the actual position within the object, used by
@ -87,8 +87,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/ */
private long nextReadPos; private long nextReadPos;
/* Amount of data desired from the request */ /**
private long requestedStreamLen; * The end of the content range of the last request.
* This is an absolute value of the range, not a length field.
*/
private long contentRangeFinish;
/**
* The start of the content range of the last request.
*/
private long contentRangeStart;
public S3AInputStream(String bucket, public S3AInputStream(String bucket,
String key, String key,
@ -96,7 +104,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
AmazonS3Client client, AmazonS3Client client,
FileSystem.Statistics stats, FileSystem.Statistics stats,
S3AInstrumentation instrumentation, S3AInstrumentation instrumentation,
long readahead) { long readahead,
S3AInputPolicy inputPolicy) {
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
@ -107,6 +116,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
this.stats = stats; this.stats = stats;
this.uri = "s3a://" + this.bucket + "/" + this.key; this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = instrumentation.newInputStreamStatistics(); this.streamStatistics = instrumentation.newInputStreamStatistics();
this.inputPolicy = inputPolicy;
setReadahead(readahead); setReadahead(readahead);
} }
@ -120,21 +130,23 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/ */
private synchronized void reopen(String reason, long targetPos, long length) private synchronized void reopen(String reason, long targetPos, long length)
throws IOException { throws IOException {
requestedStreamLen = this.contentLength;
if (wrappedStream != null) { if (wrappedStream != null) {
closeStream("reopen(" + reason + ")", requestedStreamLen); closeStream("reopen(" + reason + ")", contentRangeFinish);
} }
LOG.debug("reopen({}) for {} at targetPos={}, length={}," +
" requestedStreamLen={}, streamPosition={}, nextReadPosition={}", contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos); length, contentLength, readahead);
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
" streamPosition={}, nextReadPosition={}",
uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos);
streamStatistics.streamOpened(); streamStatistics.streamOpened();
try { try {
GetObjectRequest request = new GetObjectRequest(bucket, key) GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, requestedStreamLen); .withRange(targetPos, contentRangeFinish);
wrappedStream = client.getObject(request).getObjectContent(); wrappedStream = client.getObject(request).getObjectContent();
contentRangeStart = targetPos;
if (wrappedStream == null) { if (wrappedStream == null) {
throw new IOException("Null IO stream from reopen of (" + reason + ") " throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri); + uri);
@ -205,8 +217,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
long forwardSeekRange = Math.max(readahead, available); long forwardSeekRange = Math.max(readahead, available);
// work out how much is actually left in the stream // work out how much is actually left in the stream
// then choose whichever comes first: the range or the EOF // then choose whichever comes first: the range or the EOF
long forwardSeekLimit = Math.min(remaining(), forwardSeekRange); long remainingInCurrentRequest = remainingInCurrentRequest();
if (diff <= forwardSeekLimit) {
long forwardSeekLimit = Math.min(remainingInCurrentRequest,
forwardSeekRange);
boolean skipForward = remainingInCurrentRequest > 0
&& diff <= forwardSeekLimit;
if (skipForward) {
// the forward seek range is within the limits // the forward seek range is within the limits
LOG.debug("Forward seek on {}, of {} bytes", uri, diff); LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
streamStatistics.seekForwards(diff); streamStatistics.seekForwards(diff);
@ -231,14 +248,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
streamStatistics.seekBackwards(diff); streamStatistics.seekBackwards(diff);
} else { } else {
// targetPos == pos // targetPos == pos
// this should never happen as the caller filters it out. if (remainingInCurrentRequest() > 0) {
// Retained just in case // if there is data left in the stream, keep going
LOG.debug("Ignoring seek {} to {} as target position == current", return;
uri, targetPos);
} }
}
// if the code reaches here, the stream needs to be reopened.
// close the stream; if read the object will be opened at the new pos // close the stream; if read the object will be opened at the new pos
closeStream("seekInStream()", this.requestedStreamLen); closeStream("seekInStream()", this.contentRangeFinish);
pos = targetPos; pos = targetPos;
} }
@ -255,9 +274,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/ */
private void lazySeek(long targetPos, long len) throws IOException { private void lazySeek(long targetPos, long len) throws IOException {
//For lazy seek //For lazy seek
if (targetPos != this.pos) {
seekInStream(targetPos, len); seekInStream(targetPos, len);
}
//re-open at specific location if needed //re-open at specific location if needed
if (wrappedStream == null) { if (wrappedStream == null) {
@ -284,7 +301,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
return -1; return -1;
} }
int byteRead; int byteRead;
try { try {
lazySeek(nextReadPos, 1); lazySeek(nextReadPos, 1);
@ -328,7 +344,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* This updates the statistics on read operations started and whether * This updates the statistics on read operations started and whether
* or not the read operation "completed", that is: returned the exact * or not the read operation "completed", that is: returned the exact
* number of bytes requested. * number of bytes requested.
* @throws EOFException if there is no more data
* @throws IOException if there are other problems * @throws IOException if there are other problems
*/ */
@Override @Override
@ -357,7 +372,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
streamStatistics.readOperationStarted(nextReadPos, len); streamStatistics.readOperationStarted(nextReadPos, len);
bytesRead = wrappedStream.read(buf, off, len); bytesRead = wrappedStream.read(buf, off, len);
} catch (EOFException e) { } catch (EOFException e) {
throw e; onReadFailure(e, len);
// the base implementation swallows EOFs.
return -1;
} catch (IOException e) { } catch (IOException e) {
onReadFailure(e, len); onReadFailure(e, len);
bytesRead = wrappedStream.read(buf, off, len); bytesRead = wrappedStream.read(buf, off, len);
@ -397,7 +414,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
closed = true; closed = true;
try { try {
// close or abort the stream // close or abort the stream
closeStream("close() operation", this.contentLength); closeStream("close() operation", this.contentRangeFinish);
// this is actually a no-op // this is actually a no-op
super.close(); super.close();
} finally { } finally {
@ -420,13 +437,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/ */
private void closeStream(String reason, long length) { private void closeStream(String reason, long length) {
if (wrappedStream != null) { if (wrappedStream != null) {
boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
boolean shouldAbort = remaining > readahead;
if (!shouldAbort) { if (!shouldAbort) {
try { try {
// clean close. This will read to the end of the stream, // clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object // so, while cleaner, can be pathological on a multi-GB object
wrappedStream.close(); wrappedStream.close();
streamStatistics.streamClose(false); streamStatistics.streamClose(false, remaining);
} catch (IOException e) { } catch (IOException e) {
// exception escalates to an abort // exception escalates to an abort
LOG.debug("When closing {} stream for {}", uri, reason, e); LOG.debug("When closing {} stream for {}", uri, reason, e);
@ -437,11 +458,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
// Abort, rather than just close, the underlying stream. Otherwise, the // Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream. // remaining object payload is read from S3 while closing the stream.
wrappedStream.abort(); wrappedStream.abort();
streamStatistics.streamClose(true); streamStatistics.streamClose(true, remaining);
} }
LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," + LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
" length={}", " request range {}-{} length={}",
uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos, uri, (shouldAbort ? "aborted" : "closed"), reason,
pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length); length);
wrappedStream = null; wrappedStream = null;
} }
@ -451,7 +474,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
checkNotClosed(); checkNotClosed();
long remaining = remaining(); long remaining = remainingInFile();
if (remaining > Integer.MAX_VALUE) { if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }
@ -462,10 +485,35 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* Bytes left in stream. * Bytes left in stream.
* @return how many bytes are left to read * @return how many bytes are left to read
*/ */
protected long remaining() { @InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInFile() {
return this.contentLength - this.pos; return this.contentLength - this.pos;
} }
/**
* Bytes left in the current request.
* Only valid if there is an active request.
* @return how many bytes are left to read in the current GET.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInCurrentRequest() {
return this.contentRangeFinish - this.pos;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long getContentRangeFinish() {
return contentRangeFinish;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long getContentRangeStart() {
return contentRangeStart;
}
@Override @Override
public boolean markSupported() { public boolean markSupported() {
return false; return false;
@ -480,16 +528,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
@Override @Override
@InterfaceStability.Unstable @InterfaceStability.Unstable
public String toString() { public String toString() {
String s = streamStatistics.toString();
synchronized (this) {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"S3AInputStream{"); "S3AInputStream{");
sb.append(uri); sb.append(uri);
sb.append(" wrappedStream=")
.append(wrappedStream != null ? "open" : "closed");
sb.append(" read policy=").append(inputPolicy);
sb.append(" pos=").append(pos); sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos); sb.append(" nextReadPos=").append(nextReadPos);
sb.append(" contentLength=").append(contentLength); sb.append(" contentLength=").append(contentLength);
sb.append(" ").append(streamStatistics.toString()); sb.append(" contentRangeStart=").append(contentRangeStart);
sb.append(" contentRangeFinish=").append(contentRangeFinish);
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
sb.append('\n').append(s);
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
}
/** /**
* Subclass {@code readFully()} operation which only seeks at the start * Subclass {@code readFully()} operation which only seeks at the start
@ -542,7 +600,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
} }
@Override @Override
public void setReadahead(Long readahead) { public synchronized void setReadahead(Long readahead) {
if (readahead == null) { if (readahead == null) {
this.readahead = Constants.DEFAULT_READAHEAD_RANGE; this.readahead = Constants.DEFAULT_READAHEAD_RANGE;
} else { } else {
@ -555,7 +613,48 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* Get the current readahead value. * Get the current readahead value.
* @return a non-negative readahead value * @return a non-negative readahead value
*/ */
public long getReadahead() { public synchronized long getReadahead() {
return readahead; return readahead;
} }
/**
* Calculate the limit for a get request, based on input policy
* and state of object.
* @param inputPolicy input policy
* @param targetPos position of the read
* @param length length of bytes requested; if less than zero "unknown"
* @param contentLength total length of file
* @param readahead current readahead value
* @return the absolute value of the limit of the request.
*/
static long calculateRequestLimit(
S3AInputPolicy inputPolicy,
long targetPos,
long length,
long contentLength,
long readahead) {
long rangeLimit;
switch (inputPolicy) {
case Random:
// positioned.
// read either this block, or the here + readahead value.
rangeLimit = (length < 0) ? contentLength
: targetPos + Math.max(readahead, length);
break;
case Sequential:
// sequential: plan for reading the entire object.
rangeLimit = contentLength;
break;
case Normal:
default:
rangeLimit = contentLength;
}
// cannot read past the end of the object
rangeLimit = Math.min(contentLength, rangeLimit);
return rangeLimit;
}
} }

View File

@ -67,6 +67,8 @@ public class S3AInstrumentation {
private final MutableCounterLong streamReadOperations; private final MutableCounterLong streamReadOperations;
private final MutableCounterLong streamReadFullyOperations; private final MutableCounterLong streamReadFullyOperations;
private final MutableCounterLong streamReadsIncomplete; private final MutableCounterLong streamReadsIncomplete;
private final MutableCounterLong streamBytesReadInClose;
private final MutableCounterLong streamBytesDiscardedInAbort;
private final MutableCounterLong ignoredErrors; private final MutableCounterLong ignoredErrors;
private final MutableCounterLong numberOfFilesCreated; private final MutableCounterLong numberOfFilesCreated;
@ -75,7 +77,8 @@ public class S3AInstrumentation {
private final MutableCounterLong numberOfFilesDeleted; private final MutableCounterLong numberOfFilesDeleted;
private final MutableCounterLong numberOfDirectoriesCreated; private final MutableCounterLong numberOfDirectoriesCreated;
private final MutableCounterLong numberOfDirectoriesDeleted; private final MutableCounterLong numberOfDirectoriesDeleted;
private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>(); private final Map<String, MutableCounterLong> streamMetrics =
new HashMap<>(30);
private static final Statistic[] COUNTERS_TO_CREATE = { private static final Statistic[] COUNTERS_TO_CREATE = {
INVOCATION_COPY_FROM_LOCAL_FILE, INVOCATION_COPY_FROM_LOCAL_FILE,
@ -125,6 +128,8 @@ public class S3AInstrumentation {
streamCounter(STREAM_READ_FULLY_OPERATIONS); streamCounter(STREAM_READ_FULLY_OPERATIONS);
streamReadsIncomplete = streamReadsIncomplete =
streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE); streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ);
streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED);
numberOfFilesCreated = counter(FILES_CREATED); numberOfFilesCreated = counter(FILES_CREATED);
numberOfFilesCopied = counter(FILES_COPIED); numberOfFilesCopied = counter(FILES_COPIED);
bytesOfFilesCopied = counter(FILES_COPIED_BYTES); bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
@ -362,6 +367,8 @@ public class S3AInstrumentation {
streamReadOperations.incr(statistics.readOperations); streamReadOperations.incr(statistics.readOperations);
streamReadFullyOperations.incr(statistics.readFullyOperations); streamReadFullyOperations.incr(statistics.readFullyOperations);
streamReadsIncomplete.incr(statistics.readsIncomplete); streamReadsIncomplete.incr(statistics.readsIncomplete);
streamBytesReadInClose.incr(statistics.bytesReadInClose);
streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
} }
/** /**
@ -386,6 +393,8 @@ public class S3AInstrumentation {
public long readOperations; public long readOperations;
public long readFullyOperations; public long readFullyOperations;
public long readsIncomplete; public long readsIncomplete;
public long bytesReadInClose;
public long bytesDiscardedInAbort;
private InputStreamStatistics() { private InputStreamStatistics() {
} }
@ -426,13 +435,18 @@ public class S3AInstrumentation {
* The inner stream was closed. * The inner stream was closed.
* @param abortedConnection flag to indicate the stream was aborted, * @param abortedConnection flag to indicate the stream was aborted,
* rather than closed cleanly * rather than closed cleanly
* @param remainingInCurrentRequest the number of bytes remaining in
* the current request.
*/ */
public void streamClose(boolean abortedConnection) { public void streamClose(boolean abortedConnection,
long remainingInCurrentRequest) {
closeOperations++; closeOperations++;
if (abortedConnection) { if (abortedConnection) {
this.aborted++; this.aborted++;
bytesDiscardedInAbort += remainingInCurrentRequest;
} else { } else {
closed++; closed++;
bytesReadInClose += remainingInCurrentRequest;
} }
} }
@ -522,6 +536,8 @@ public class S3AInstrumentation {
sb.append(", ReadOperations=").append(readOperations); sb.append(", ReadOperations=").append(readOperations);
sb.append(", ReadFullyOperations=").append(readFullyOperations); sb.append(", ReadFullyOperations=").append(readFullyOperations);
sb.append(", ReadsIncomplete=").append(readsIncomplete); sb.append(", ReadsIncomplete=").append(readsIncomplete);
sb.append(", BytesReadInClose=").append(bytesReadInClose);
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }

View File

@ -71,33 +71,37 @@ public enum Statistic {
OBJECT_PUT_REQUESTS("object_put_requests", OBJECT_PUT_REQUESTS("object_put_requests",
"Object put/multipart upload count"), "Object put/multipart upload count"),
OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"), OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
STREAM_ABORTED("streamAborted", STREAM_ABORTED("stream_aborted",
"Count of times the TCP stream was aborted"), "Count of times the TCP stream was aborted"),
STREAM_BACKWARD_SEEK_OPERATIONS("streamBackwardSeekOperations", STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations",
"Number of executed seek operations which went backwards in a stream"), "Number of executed seek operations which went backwards in a stream"),
STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"), STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
STREAM_CLOSE_OPERATIONS("streamCloseOperations", STREAM_CLOSE_OPERATIONS("stream_close_operations",
"Total count of times an attempt to close a data stream was made"), "Total count of times an attempt to close a data stream was made"),
STREAM_FORWARD_SEEK_OPERATIONS("streamForwardSeekOperations", STREAM_FORWARD_SEEK_OPERATIONS("stream_forward_seek_operations",
"Number of executed seek operations which went forward in a stream"), "Number of executed seek operations which went forward in a stream"),
STREAM_OPENED("streamOpened", STREAM_OPENED("streamOpened",
"Total count of times an input stream to object store was opened"), "Total count of times an input stream to object store was opened"),
STREAM_READ_EXCEPTIONS("streamReadExceptions", STREAM_READ_EXCEPTIONS("stream_read_exceptions",
"Number of seek operations invoked on input streams"), "Number of seek operations invoked on input streams"),
STREAM_READ_FULLY_OPERATIONS("streamReadFullyOperations", STREAM_READ_FULLY_OPERATIONS("stream_read_fully_operations",
"count of readFully() operations in streams"), "Count of readFully() operations in streams"),
STREAM_READ_OPERATIONS("streamReadOperations", STREAM_READ_OPERATIONS("stream_read_operations",
"Count of read() operations in streams"), "Count of read() operations in streams"),
STREAM_READ_OPERATIONS_INCOMPLETE("streamReadOperationsIncomplete", STREAM_READ_OPERATIONS_INCOMPLETE("stream_read_operations_incomplete",
"Count of incomplete read() operations in streams"), "Count of incomplete read() operations in streams"),
STREAM_SEEK_BYTES_BACKWARDS("streamBytesBackwardsOnSeek", STREAM_SEEK_BYTES_BACKWARDS("stream_bytes_backwards_on_seek",
"Count of bytes moved backwards during seek operations"), "Count of bytes moved backwards during seek operations"),
STREAM_SEEK_BYTES_READ("streamBytesRead", STREAM_SEEK_BYTES_READ("stream_bytes_read",
"Count of bytes read during seek() in stream operations"), "Count of bytes read during seek() in stream operations"),
STREAM_SEEK_BYTES_SKIPPED("streamBytesSkippedOnSeek", STREAM_SEEK_BYTES_SKIPPED("stream_bytes_skipped_on_seek",
"Count of bytes skipped during forward seek operation"), "Count of bytes skipped during forward seek operation"),
STREAM_SEEK_OPERATIONS("streamSeekOperations", STREAM_SEEK_OPERATIONS("stream_seek_operations",
"Number of read exceptions caught and attempted to recovered from"); "Number of seek operations during stream IO."),
STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close",
"Count of bytes read when closing streams during seek operations."),
STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort",
"Count of bytes discarded by aborting the stream");
Statistic(String symbol, String description) { Statistic(String symbol, String description) {
this.symbol = symbol; this.symbol = symbol;

View File

@ -657,6 +657,78 @@ the available memory. These settings should be tuned to the envisioned
workflow (some large files, many small ones, ...) and the physical workflow (some large files, many small ones, ...) and the physical
limitations of the machine and cluster (memory, network bandwidth). limitations of the machine and cluster (memory, network bandwidth).
### S3A Experimental "fadvise" input policy support
**Warning: EXPERIMENTAL: behavior may change in future**
The S3A Filesystem client supports the notion of input policies, similar
to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A
client to optimise HTTP GET requests for the different use cases.
#### "sequential" (default)
Read through the file, possibly with some short forward seeks.
The whole document is requested in a single HTTP request; forward seeks
within the readahead range are supported by skipping over the intermediate
data.
This is leads to maximum read throughput —but with very expensive
backward seeks.
#### "normal"
This is currently the same as "sequential".
#### "random"
Optimised for random IO, specifically the Hadoop `PositionedReadable`
operations —though `seek(offset); read(byte_buffer)` also benefits.
Rather than ask for the whole file, the range of the HTTP request is
set to that that of the length of data desired in the `read` operation
(Rounded up to the readahead value set in `setReadahead()` if necessary).
By reducing the cost of closing existing HTTP requests, this is
highly efficient for file IO accessing a binary file
through a series of `PositionedReadable.read()` and `PositionedReadable.readFully()`
calls. Sequential reading of a file is expensive, as now many HTTP requests must
be made to read through the file.
For operations simply reading through a file: copying, distCp, reading
Gzipped or other compressed formats, parsing .csv files, etc, the `sequential`
policy is appropriate. This is the default: S3A does not need to be configured.
For the specific case of high-performance random access IO, the `random` policy
may be considered. The requirements are:
* Data is read using the `PositionedReadable` API.
* Long distance (many MB) forward seeks
* Backward seeks as likely as forward seeks.
* Little or no use of single character `read()` calls or small `read(buffer)`
calls.
* Applications running close to the S3 data store. That is: in EC2 VMs in
the same datacenter as the S3 instance.
The desired fadvise policy must be set in the configuration option
`fs.s3a.experimental.input.fadvise` when the filesystem instance is created.
That is: it can only be set on a per-filesystem basis, not on a per-file-read
basis.
<property>
<name>fs.s3a.experimental.input.fadvise</name>
<value>random</value>
<description>Policy for reading files.
Values: 'random', 'sequential' or 'normal'
</description>
</property>
[HDFS-2744](https://issues.apache.org/jira/browse/HDFS-2744),
*Extend FSDataInputStream to allow fadvise* proposes adding a public API
to set fadvise policies on input streams. Once implemented,
this will become the supported mechanism used for configuring the input IO policy.
## Troubleshooting S3A ## Troubleshooting S3A
Common problems working with S3A are Common problems working with S3A are
@ -832,6 +904,10 @@ a failure should not lose data —it may result in duplicate datasets.
* Because the write only begins on a `close()` operation, it may be in the final * Because the write only begins on a `close()` operation, it may be in the final
phase of a process where the write starts —this can take so long that some things phase of a process where the write starts —this can take so long that some things
can actually time out. can actually time out.
* File IO performing many seek calls/positioned read calls will encounter
performance problems due to the size of the HTTP requests made. On S3a,
the (experimental) fadvise policy "random" can be set to alleviate this at the
expense of sequential read performance and bandwidth.
The slow performance of `rename()` surfaces during the commit phase of work, The slow performance of `rename()` surfaces during the commit phase of work,
including including

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
/**
* Unit test of the input policy logic, without making any S3 calls.
*/
@RunWith(Parameterized.class)
public class TestS3AInputPolicies {
private S3AInputPolicy policy;
private long targetPos;
private long length;
private long contentLength;
private long readahead;
private long expectedLimit;
public static final long _64K = 64 * 1024;
public static final long _128K = 128 * 1024;
public static final long _256K = 256 * 1024;
public static final long _1MB = 1024L * 1024;
public static final long _10MB = _1MB * 10;
public TestS3AInputPolicies(S3AInputPolicy policy,
long targetPos,
long length,
long contentLength,
long readahead,
long expectedLimit) {
this.policy = policy;
this.targetPos = targetPos;
this.length = length;
this.contentLength = contentLength;
this.readahead = readahead;
this.expectedLimit = expectedLimit;
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{S3AInputPolicy.Normal, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Normal, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Normal, _64K, _64K, _10MB, _64K, _10MB},
{S3AInputPolicy.Sequential, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Sequential, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Random, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Random, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Random, 0, _128K, _10MB, _64K, _128K},
{S3AInputPolicy.Random, 0, _128K, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, 0, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, 1, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, _1MB, _10MB, _256K, _1MB},
{S3AInputPolicy.Random, 0, _1MB, _10MB, 0, _1MB},
{S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB},
});
}
@Test
public void testInputPolicies() throws Throwable {
Assert.assertEquals(
String.format("calculateRequestLimit(%s, %d, %d, %d, %d)",
policy, targetPos, length, contentLength, readahead),
expectedLimit,
S3AInputStream.calculateRequestLimit(policy, targetPos,
length, contentLength, readahead));
}
}

View File

@ -22,11 +22,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.junit.After; import org.junit.After;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
@ -34,6 +40,7 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
@ -53,13 +60,13 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
public static final int BLOCK_SIZE = 32 * 1024; public static final int BLOCK_SIZE = 32 * 1024;
public static final int BIG_BLOCK_SIZE = 256 * 1024; public static final int BIG_BLOCK_SIZE = 256 * 1024;
/** Tests only run if the there is a named test file that can be read */ /** Tests only run if the there is a named test file that can be read. */
private boolean testDataAvailable = true; private boolean testDataAvailable = true;
private String assumptionMessage = "test file"; private String assumptionMessage = "test file";
/** /**
* Open the FS and the test data. The input stream is always set up here. * Open the FS and the test data. The input stream is always set up here.
* @throws IOException * @throws IOException IO Problems.
*/ */
@Before @Before
public void openFS() throws IOException { public void openFS() throws IOException {
@ -70,9 +77,10 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
testDataAvailable = false; testDataAvailable = false;
} else { } else {
testData = new Path(testFile); testData = new Path(testFile);
s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf); Path path = this.testData;
bindS3aFS(path);
try { try {
testDataStatus = s3aFS.getFileStatus(testData); testDataStatus = s3aFS.getFileStatus(this.testData);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to read file {} specified in {}", LOG.warn("Failed to read file {} specified in {}",
testFile, KEY_CSVTEST_FILE, e); testFile, KEY_CSVTEST_FILE, e);
@ -81,98 +89,131 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
} }
} }
private void bindS3aFS(Path path) throws IOException {
s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
}
/** /**
* Cleanup: close the stream, close the FS. * Cleanup: close the stream, close the FS.
*/ */
@After @After
public void cleanup() { public void cleanup() {
describe("cleanup");
IOUtils.closeStream(in); IOUtils.closeStream(in);
IOUtils.closeStream(s3aFS); IOUtils.closeStream(s3aFS);
} }
/** /**
* Declare that the test requires the CSV test dataset * Declare that the test requires the CSV test dataset.
*/ */
private void requireCSVTestData() { private void requireCSVTestData() {
Assume.assumeTrue(assumptionMessage, testDataAvailable); Assume.assumeTrue(assumptionMessage, testDataAvailable);
} }
/** /**
* Open the test file with the read buffer specified in the setting * Open the test file with the read buffer specified in the setting.
* {@link #KEY_READ_BUFFER_SIZE} * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy
* @return the stream, wrapping an S3a one * @return the stream, wrapping an S3a one
* @throws IOException * @throws IOException IO problems
*/ */
FSDataInputStream openTestFile() throws IOException { FSDataInputStream openTestFile() throws IOException {
int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, return openTestFile(S3AInputPolicy.Normal, 0);
DEFAULT_READ_BUFFER_SIZE);
FSDataInputStream stream = s3aFS.open(testData, bufferSize);
streamStatistics = getInputStreamStatistics(stream);
return stream;
} }
/** /**
* assert tha the stream was only ever opened once * Open the test file with the read buffer specified in the setting
* {@link #KEY_READ_BUFFER_SIZE}.
* This includes the {@link #requireCSVTestData()} assumption; so
* if called before any FS op, will automatically skip the test
* if the CSV file is absent.
*
* @param inputPolicy input policy to use
* @param readahead readahead/buffer size
* @return the stream, wrapping an S3a one
* @throws IOException IO problems
*/
FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
throws IOException {
requireCSVTestData();
return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
}
/**
* Open a test file with the read buffer specified in the setting
* {@link #KEY_READ_BUFFER_SIZE}.
*
* @param path path to open
* @param inputPolicy input policy to use
* @param readahead readahead/buffer size
* @return the stream, wrapping an S3a one
* @throws IOException IO problems
*/
private FSDataInputStream openDataFile(S3AFileSystem fs,
Path path,
S3AInputPolicy inputPolicy,
long readahead) throws IOException {
int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
DEFAULT_READ_BUFFER_SIZE);
S3AInputPolicy policy = fs.getInputPolicy();
fs.setInputPolicy(inputPolicy);
try {
FSDataInputStream stream = fs.open(path, bufferSize);
if (readahead >= 0) {
stream.setReadahead(readahead);
}
streamStatistics = getInputStreamStatistics(stream);
return stream;
} finally {
fs.setInputPolicy(policy);
}
}
/**
* Assert that the stream was only ever opened once.
*/ */
protected void assertStreamOpenedExactlyOnce() { protected void assertStreamOpenedExactlyOnce() {
assertOpenOperationCount(1); assertOpenOperationCount(1);
} }
/** /**
* Make an assertion count about the number of open operations * Make an assertion count about the number of open operations.
* @param expected the expected number * @param expected the expected number
*/ */
private void assertOpenOperationCount(int expected) { private void assertOpenOperationCount(long expected) {
assertEquals("open operations in " + streamStatistics, assertEquals("open operations in\n" + in,
expected, streamStatistics.openOperations); expected, streamStatistics.openOperations);
} }
/** /**
* Log how long an IOP took, by dividing the total time by the * Log how long an IOP took, by dividing the total time by the
* count of operations, printing in a human-readable form * count of operations, printing in a human-readable form.
* @param operation operation being measured
* @param timer timing data * @param timer timing data
* @param count IOP count. * @param count IOP count.
*/ */
protected void logTimePerIOP(NanoTimer timer, long count) { protected void logTimePerIOP(String operation,
LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count)); NanoTimer timer,
} long count) {
LOG.info("Time per {}: {} nS",
@Test operation, toHuman(timer.duration() / count));
public void testTimeToOpenAndReadWholeFileByByte() throws Throwable {
requireCSVTestData();
describe("Open the test file %s and read it byte by byte", testData);
long len = testDataStatus.getLen();
NanoTimer timeOpen = new NanoTimer();
in = openTestFile();
timeOpen.end("Open stream");
NanoTimer readTimer = new NanoTimer();
long count = 0;
while (in.read() >= 0) {
count ++;
}
readTimer.end("Time to read %d bytes", len);
bandwidth(readTimer, count);
assertEquals("Not enough bytes were read)", len, count);
long nanosPerByte = readTimer.nanosPerOperation(count);
LOG.info("An open() call has the equivalent duration of reading {} bytes",
toHuman( timeOpen.duration() / nanosPerByte));
} }
@Test @Test
public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
requireCSVTestData(); requireCSVTestData();
int blockSize = _1MB;
describe("Open the test file %s and read it in blocks of size %d", describe("Open the test file %s and read it in blocks of size %d",
testData, BLOCK_SIZE); testData, blockSize);
long len = testDataStatus.getLen(); long len = testDataStatus.getLen();
in = openTestFile(); in = openTestFile();
byte[] block = new byte[BLOCK_SIZE]; byte[] block = new byte[blockSize];
NanoTimer timer2 = new NanoTimer(); NanoTimer timer2 = new NanoTimer();
long count = 0; long count = 0;
// implicitly rounding down here // implicitly rounding down here
long blockCount = len / BLOCK_SIZE; long blockCount = len / blockSize;
for (long i = 0; i < blockCount; i++) { for (long i = 0; i < blockCount; i++) {
int offset = 0; int offset = 0;
int remaining = BLOCK_SIZE; int remaining = blockSize;
NanoTimer blockTimer = new NanoTimer(); NanoTimer blockTimer = new NanoTimer();
int reads = 0; int reads = 0;
while (remaining > 0) { while (remaining > 0) {
@ -189,15 +230,14 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
} }
timer2.end("Time to read %d bytes in %d blocks", len, blockCount ); timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
bandwidth(timer2, count); bandwidth(timer2, count);
LOG.info("{}", streamStatistics); logStreamStatistics();
} }
@Test @Test
public void testLazySeekEnabled() throws Throwable { public void testLazySeekEnabled() throws Throwable {
requireCSVTestData();
describe("Verify that seeks do not trigger any IO"); describe("Verify that seeks do not trigger any IO");
long len = testDataStatus.getLen();
in = openTestFile(); in = openTestFile();
long len = testDataStatus.getLen();
NanoTimer timer = new NanoTimer(); NanoTimer timer = new NanoTimer();
long blockCount = len / BLOCK_SIZE; long blockCount = len / BLOCK_SIZE;
for (long i = 0; i < blockCount; i++) { for (long i = 0; i < blockCount; i++) {
@ -206,24 +246,14 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
in.seek(0); in.seek(0);
blockCount++; blockCount++;
timer.end("Time to execute %d seeks", blockCount); timer.end("Time to execute %d seeks", blockCount);
logTimePerIOP(timer, blockCount); logTimePerIOP("seek()", timer, blockCount);
LOG.info("{}", streamStatistics); logStreamStatistics();
assertOpenOperationCount(0); assertOpenOperationCount(0);
assertEquals("bytes read", 0, streamStatistics.bytesRead); assertEquals("bytes read", 0, streamStatistics.bytesRead);
} }
@Test
public void testReadAheadDefault() throws Throwable {
requireCSVTestData();
describe("Verify that a series of forward skips within the readahead" +
" range do not close and reopen the stream");
executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE);
assertStreamOpenedExactlyOnce();
}
@Test @Test
public void testReadaheadOutOfRange() throws Throwable { public void testReadaheadOutOfRange() throws Throwable {
requireCSVTestData();
try { try {
in = openTestFile(); in = openTestFile();
in.setReadahead(-1L); in.setReadahead(-1L);
@ -231,39 +261,75 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// expected // expected
} }
} }
@Test @Test
public void testReadBigBlocksAvailableReadahead() throws Throwable { public void testReadWithNormalPolicy() throws Throwable {
requireCSVTestData();
describe("set readahead to available bytes only");
executeSeekReadSequence(BIG_BLOCK_SIZE, 0);
// expect that the stream will have had lots of opens
assertTrue("not enough open operations in " + streamStatistics,
streamStatistics.openOperations > 1);
}
@Test
public void testReadBigBlocksBigReadahead() throws Throwable {
requireCSVTestData();
describe("Read big blocks with a big readahead"); describe("Read big blocks with a big readahead");
executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2); executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
S3AInputPolicy.Normal);
assertStreamOpenedExactlyOnce();
}
@Test
public void testDecompressionSequential128K() throws Throwable {
describe("Decompress with a 128K readahead");
executeDecompression(128 * 1024, S3AInputPolicy.Sequential);
assertStreamOpenedExactlyOnce(); assertStreamOpenedExactlyOnce();
} }
/** /**
* Execute a seek+read sequence * Execute a decompression + line read with the given input policy.
* @param readahead byte readahead
* @param inputPolicy read policy
* @throws IOException IO Problems
*/
private void executeDecompression(long readahead,
S3AInputPolicy inputPolicy) throws IOException {
CompressionCodecFactory factory
= new CompressionCodecFactory(getConf());
CompressionCodec codec = factory.getCodec(testData);
long bytesRead = 0;
int lines = 0;
FSDataInputStream objectIn = openTestFile(inputPolicy, readahead);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (LineReader lineReader = new LineReader(
codec.createInputStream(objectIn), getConf())) {
Text line = new Text();
int read;
while ((read = lineReader.readLine(line)) > 0) {
bytesRead += read;
lines++;
}
} catch (EOFException eof) {
// done
}
timer.end("Time to read %d lines [%d bytes expanded, %d raw]" +
" with readahead = %d",
lines,
bytesRead,
testDataStatus.getLen(),
readahead);
logTimePerIOP("line read", timer, lines);
logStreamStatistics();
}
private void logStreamStatistics() {
LOG.info(String.format("Stream Statistics%n{}"), streamStatistics);
}
/**
* Execute a seek+read sequence.
* @param blockSize block size for seeks * @param blockSize block size for seeks
* @param readahead what the readahead value of the stream should be * @param readahead what the readahead value of the stream should be
* @throws IOException IO problems * @throws IOException IO problems
*/ */
protected void executeSeekReadSequence(long blockSize, protected void executeSeekReadSequence(long blockSize,
long readahead) throws IOException { long readahead,
requireCSVTestData(); S3AInputPolicy policy) throws IOException {
in = openTestFile(policy, readahead);
long len = testDataStatus.getLen(); long len = testDataStatus.getLen();
in = openTestFile();
in.setReadahead(readahead);
NanoTimer timer = new NanoTimer(); NanoTimer timer = new NanoTimer();
long blockCount = len / blockSize; long blockCount = len / blockSize;
LOG.info("Reading {} blocks, readahead = {}", LOG.info("Reading {} blocks, readahead = {}",
@ -277,11 +343,187 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
blockCount, blockCount,
blockSize, blockSize,
readahead); readahead);
logTimePerIOP(timer, blockCount); logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount);
LOG.info("Effective bandwidth {} MB/S", LOG.info("Effective bandwidth {} MB/S",
timer.bandwidthDescription(streamStatistics.bytesRead - timer.bandwidthDescription(streamStatistics.bytesRead -
streamStatistics.bytesSkippedOnSeek)); streamStatistics.bytesSkippedOnSeek));
LOG.info("{}", streamStatistics); logStreamStatistics();
} }
public static final int _4K = 4 * 1024;
public static final int _8K = 8 * 1024;
public static final int _16K = 16 * 1024;
public static final int _32K = 32 * 1024;
public static final int _64K = 64 * 1024;
public static final int _128K = 128 * 1024;
public static final int _256K = 256 * 1024;
public static final int _1MB = 1024 * 1024;
public static final int _2MB = 2 * _1MB;
public static final int _10MB = _1MB * 10;
public static final int _5MB = _1MB * 5;
private static final int[][] RANDOM_IO_SEQUENCE = {
{_2MB, _128K},
{_128K, _128K},
{_5MB, _64K},
{_1MB, _1MB},
};
@Test
public void testRandomIORandomPolicy() throws Throwable {
executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
assertEquals("streams aborted in " + streamStatistics,
0, streamStatistics.aborted);
}
@Test
public void testRandomIONormalPolicy() throws Throwable {
long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
assertEquals("streams aborted in " + streamStatistics,
4, streamStatistics.aborted);
}
/**
* Execute the random IO {@code readFully(pos, bytes[])} sequence defined by
* {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used
* in the timing too
* @param policy read policy
* @param expectedOpenCount expected number of stream openings
* @throws IOException IO problems
* @return the timer
*/
private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy,
long expectedOpenCount)
throws IOException {
describe("Random IO with policy \"%s\"", policy);
byte[] buffer = new byte[_1MB];
long totalBytesRead = 0;
in = openTestFile(policy, 0);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
for (int[] action : RANDOM_IO_SEQUENCE) {
int position = action[0];
int range = action[1];
in.readFully(position, buffer, 0, range);
totalBytesRead += range;
}
int reads = RANDOM_IO_SEQUENCE.length;
timer.end("Time to execute %d reads of total size %d bytes",
reads,
totalBytesRead);
in.close();
assertOpenOperationCount(expectedOpenCount);
logTimePerIOP("byte read", timer, totalBytesRead);
LOG.info("Effective bandwidth {} MB/S",
timer.bandwidthDescription(streamStatistics.bytesRead -
streamStatistics.bytesSkippedOnSeek));
logStreamStatistics();
return timer;
}
S3AInputStream getS3aStream() {
return (S3AInputStream) in.getWrappedStream();
}
@Test
public void testRandomReadOverBuffer() throws Throwable {
describe("read over a buffer, making sure that the requests" +
" spans readahead ranges");
int datasetLen = _32K;
Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin");
byte[] sourceData = dataset(datasetLen, 0, 64);
// relies on the field 'fs' referring to the R/W FS
writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true);
byte[] buffer = new byte[datasetLen];
int readahead = _8K;
int halfReadahead = _4K;
in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
LOG.info("Starting initial reads");
S3AInputStream s3aStream = getS3aStream();
assertEquals(readahead, s3aStream.getReadahead());
byte[] oneByte = new byte[1];
assertEquals(1, in.read(0, oneByte, 0, 1));
// make some assertions about the current state
assertEquals("remaining in\n" + in,
readahead - 1, s3aStream.remainingInCurrentRequest());
assertEquals("range start in\n" + in,
0, s3aStream.getContentRangeStart());
assertEquals("range finish in\n" + in,
readahead, s3aStream.getContentRangeFinish());
assertStreamOpenedExactlyOnce();
describe("Starting sequence of positioned read calls over\n%s", in);
NanoTimer readTimer = new NanoTimer();
int currentPos = halfReadahead;
int offset = currentPos;
int bytesRead = 0;
int readOps = 0;
// make multiple read() calls
while (bytesRead < halfReadahead) {
int length = buffer.length - offset;
int read = in.read(currentPos, buffer, offset, length);
bytesRead += read;
offset += read;
readOps++;
assertEquals("open operations on request #" + readOps
+ " after reading " + bytesRead
+ " current position in stream " + currentPos
+ " in\n" + fs
+ "\n " + in,
1, streamStatistics.openOperations);
for (int i = currentPos; i < currentPos + read; i++) {
assertEquals("Wrong value from byte " + i,
sourceData[i], buffer[i]);
}
currentPos += read;
}
assertStreamOpenedExactlyOnce();
// assert at the end of the original block
assertEquals(readahead, currentPos);
readTimer.end("read %d in %d operations", bytesRead, readOps);
bandwidth(readTimer, bytesRead);
LOG.info("Time per byte(): {} nS",
toHuman(readTimer.nanosPerOperation(bytesRead)));
LOG.info("Time per read(): {} nS",
toHuman(readTimer.nanosPerOperation(readOps)));
describe("read last byte");
// read one more
int read = in.read(currentPos, buffer, bytesRead, 1);
assertTrue("-1 from last read", read >= 0);
assertOpenOperationCount(2);
assertEquals("Wrong value from read ", sourceData[currentPos],
(int) buffer[currentPos]);
currentPos++;
// now scan all the way to the end of the file, using single byte read()
// calls
describe("read() to EOF over \n%s", in);
long readCount = 0;
NanoTimer timer = new NanoTimer();
LOG.info("seeking");
in.seek(currentPos);
LOG.info("reading");
while(currentPos < datasetLen) {
int r = in.read();
assertTrue("Negative read() at position " + currentPos + " in\n" + in,
r >= 0);
buffer[currentPos] = (byte)r;
assertEquals("Wrong value from read from\n" + in,
sourceData[currentPos], r);
currentPos++;
readCount++;
}
timer.end("read %d bytes", readCount);
bandwidth(timer, readCount);
LOG.info("Time per read(): {} nS",
toHuman(timer.nanosPerOperation(readCount)));
assertEquals("last read in " + in, -1, in.read());
}
} }