From 8d74a5804250cb07030a2aca85074e5f8d8bb276 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 12 May 2016 19:23:18 +0100 Subject: [PATCH] HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel patch includes HADOOP-12844 Recover when S3A fails on IOException in read() HADOOP-13058 S3A FS fails during init against a read-only FS if multipart purge HADOOP-13047 S3a Forward seek in stream length to be configurable --- .../apache/hadoop/fs/FSDataInputStream.java | 9 + .../hadoop/metrics2/MetricStringBuilder.java | 141 +++++ .../metrics2/lib/MutableCounterLong.java | 2 +- .../src/main/resources/core-default.xml | 10 +- .../dev-support/findbugs-exclude.xml | 357 +------------ .../apache/hadoop/fs/s3/FileSystemStore.java | 4 +- .../apache/hadoop/fs/s3/S3Credentials.java | 2 + .../s3a/AnonymousAWSCredentialsProvider.java | 4 + .../fs/s3a/BasicAWSCredentialsProvider.java | 4 + .../org/apache/hadoop/fs/s3a/Constants.java | 18 +- .../hadoop/fs/s3a/S3AFastOutputStream.java | 11 +- .../apache/hadoop/fs/s3a/S3AFileStatus.java | 12 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 484 +++++++++++------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 342 ++++++++++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 457 +++++++++++++++++ .../apache/hadoop/fs/s3a/S3AOutputStream.java | 40 +- .../site/markdown/tools/hadoop-aws/index.md | 68 ++- .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 191 ++++++- .../fs/s3a/scale/TestS3ADeleteManyFiles.java | 1 - .../scale/TestS3AInputStreamPerformance.java | 285 +++++++++++ .../src/test/resources/log4j.properties | 3 + 21 files changed, 1768 insertions(+), 677 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index da987692af0..640db592fb1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -234,4 +234,13 @@ public class FSDataInputStream extends DataInputStream "support unbuffering."); } } + + /** + * String value. Includes the string value of the inner stream + * @return the stream + */ + @Override + public String toString() { + return super.toString() + ": " + in; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java new file mode 100644 index 00000000000..18a499abc12 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Build a string dump of the metrics. + * + * The {@link #toString()} operator dumps out all values collected. + * + * Every entry is formatted as + * {@code prefix + name + separator + value + suffix} + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MetricStringBuilder extends MetricsRecordBuilder { + + private final StringBuilder builder = new StringBuilder(256); + + private final String prefix; + private final String suffix; + private final String separator; + private final MetricsCollector parent; + + /** + * Build an instance. + * @param parent parent collector. Unused in this instance; only used for + * the {@link #parent()} method + * @param prefix string before each entry + * @param separator separator between name and value + * @param suffix suffix after each entry + */ + public MetricStringBuilder(MetricsCollector parent, + String prefix, + String separator, + String suffix) { + this.parent = parent; + this.prefix = prefix; + this.suffix = suffix; + this.separator = separator; + } + + public MetricStringBuilder add(MetricsInfo info, Object value) { + return tuple(info.name(), value.toString()); + } + + /** + * Add any key,val pair to the string, between the prefix and suffix, + * separated by the separator. + * @param key key + * @param value value + * @return this instance + */ + public MetricStringBuilder tuple(String key, String value) { + builder.append(prefix) + .append(key) + .append(separator) + .append(value) + .append(suffix); + return this; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return tuple(tag.name(), tag.value()); + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + add(metric.info(), metric.toString()); + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return tuple("context", value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return add(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return add(info, value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + @Override + public String toString() { + return builder.toString(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java index 03a6043375f..d3dec2e4d06 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java @@ -34,7 +34,7 @@ public class MutableCounterLong extends MutableCounter { private AtomicLong value = new AtomicLong(); - MutableCounterLong(MetricsInfo info, long initValue) { + public MutableCounterLong(MetricsInfo info, long initValue) { super(info); this.value.set(initValue); } diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 3869c2c2f3d..2ad68819297 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -911,7 +911,15 @@ uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks) - + + fs.s3a.readahead.range + 65536 + Bytes to read ahead during a seek() before closing and + re-opening the S3 HTTP connection. This option will be overridden if + any call to setReadahead() is made to an open stream. + + + fs.s3a.fast.buffer.size 1048576 Size of initial memory buffer in bytes allocated for an diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index 204e6abeaeb..2b4160a25b9 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -15,361 +15,8 @@ limitations under the License. --> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - + - diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java index 07e456b59f5..3c7ed60a7e5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java @@ -55,13 +55,13 @@ public interface FileSystemStore { /** * Delete everything. Used for testing. - * @throws IOException + * @throws IOException on any problem */ void purge() throws IOException; /** * Diagnostic method to dump all INodes to the console. - * @throws IOException + * @throws IOException on any problem */ void dump() throws IOException; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java index fdacc3ff751..5ab352a0dcc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java @@ -38,6 +38,8 @@ public class S3Credentials { private String secretAccessKey; /** + * @param uri bucket URI optionally containing username and password. + * @param conf configuration * @throws IllegalArgumentException if credentials for S3 cannot be * determined. * @throws IOException if credential providers are misconfigured and we have diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java index 2a242736bfd..e62ec77e1bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java @@ -21,7 +21,11 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AnonymousAWSCredentials; import com.amazonaws.auth.AWSCredentials; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +@InterfaceAudience.Private +@InterfaceStability.Stable public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider { public AWSCredentials getCredentials() { return new AnonymousAWSCredentials(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java index 9a0adda0dd6..2f721e4ed84 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java @@ -23,7 +23,11 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.AWSCredentials; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +@InterfaceAudience.Private +@InterfaceStability.Stable public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { private final String accessKey; private final String secretKey; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index a0707bbcaf4..223b3aabbf8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -18,7 +18,19 @@ package org.apache.hadoop.fs.s3a; -public class Constants { +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * All the constants used with the {@link S3AFileSystem}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class Constants { + + private Constants() { + } + // s3 access key public static final String ACCESS_KEY = "fs.s3a.access.key"; @@ -129,4 +141,8 @@ public class Constants { public static final int S3A_DEFAULT_PORT = -1; public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix"; + + /** read ahead buffer size to prevent connection re-establishments. */ + public static final String READAHEAD_RANGE = "fs.s3a.readahead.range"; + public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 2e06fba2752..33535223a12 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.Progressable; @@ -64,6 +65,7 @@ import java.util.concurrent.ThreadPoolExecutor; *

* Unstable: statistics and error handling might evolve */ +@InterfaceAudience.Private @InterfaceStability.Unstable public class S3AFastOutputStream extends OutputStream { @@ -102,7 +104,8 @@ public class S3AFastOutputStream extends OutputStream { * @param partSize size of a single part in a multi-part upload (except * last part) * @param multiPartThreshold files at least this size use multi-part upload - * @throws IOException + * @param threadPoolExecutor thread factory + * @throws IOException on any problem */ public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs, String bucket, String key, Progressable progress, @@ -159,7 +162,7 @@ public class S3AFastOutputStream extends OutputStream { * Writes a byte to the memory buffer. If this causes the buffer to reach * its limit, the actual upload is submitted to the threadpool. * @param b the int of which the lowest byte is written - * @throws IOException + * @throws IOException on any problem */ @Override public synchronized void write(int b) throws IOException { @@ -177,10 +180,10 @@ public class S3AFastOutputStream extends OutputStream { * @param b byte array containing * @param off offset in array where to start * @param len number of bytes to be written - * @throws IOException + * @throws IOException on any problem */ @Override - public synchronized void write(byte b[], int off, int len) + public synchronized void write(byte[] b, int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index 47caea8390d..9ecca332663 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -17,9 +17,19 @@ */ package org.apache.hadoop.fs.s3a; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +/** + * File status for an S3A "file". + * Modification time is trouble, see {@link #getModificationTime()}. + * + * The subclass is private as it should not be created directly. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AFileStatus extends FileStatus { private boolean isEmptyDirectory; @@ -45,7 +55,7 @@ public class S3AFileStatus extends FileStatus { return System.getProperty("user.name"); } - /** Compare if this object is equal to another object + /** Compare if this object is equal to another object. * @param o the object to be compared. * @return true if two file status has the same path name; false if not. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d47b7b5ce01..d6a46178257 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; import java.util.Date; @@ -59,8 +60,11 @@ import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -79,9 +83,24 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The core S3A Filesystem implementation. + * + * This subclass is marked as private as code should not be creating it + * directly; use {@link FileSystem#get(Configuration)} and variants to + * create one. + * + * If cast to {@code S3AFileSystem}, extra methods and features may be accessed. + * Consider those private and unstable. + * + * Because it prints some of the state of the instrumentation, + * the output of {@link #toString()} must also be considered unstable. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AFileSystem extends FileSystem { /** - * Default blocksize as used in blocksize and FS status queries + * Default blocksize as used in blocksize and FS status queries. */ public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; private URI uri; @@ -97,11 +116,14 @@ public class S3AFileSystem extends FileSystem { public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; + private S3AInstrumentation instrumentation; + private long readAhead; // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; private static final AtomicInteger poolNumber = new AtomicInteger(1); + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. @@ -110,17 +132,19 @@ public class S3AFileSystem extends FileSystem { */ public static ThreadFactory getNamedThreadFactory(final String prefix) { SecurityManager s = System.getSecurityManager(); - final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() - .getThreadGroup(); + final ThreadGroup threadGroup = (s != null) + ? s.getThreadGroup() + : Thread.currentThread().getThreadGroup(); return new ThreadFactory() { - final AtomicInteger threadNumber = new AtomicInteger(1); + private final AtomicInteger threadNumber = new AtomicInteger(1); private final int poolNum = poolNumber.getAndIncrement(); - final ThreadGroup group = threadGroup; + private final ThreadGroup group = threadGroup; @Override public Thread newThread(Runnable r) { - final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + final String name = String.format("%s-pool%03d-t%04d", + prefix, poolNum, threadNumber.getAndIncrement()); return new Thread(group, r, name); } }; @@ -157,10 +181,12 @@ public class S3AFileSystem extends FileSystem { */ public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); + setConf(conf); + instrumentation = new S3AInstrumentation(name); uri = URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, - this.getWorkingDirectory()); + workingDir = new Path("/user", System.getProperty("user.name")) + .makeQualified(this.uri, this.getWorkingDirectory()); AWSAccessKeys creds = getAWSAccessKeys(name, conf); @@ -174,19 +200,20 @@ public class S3AFileSystem extends FileSystem { bucket = name.getHost(); ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS)); + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES)); - awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT)); - awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT)); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if(!signerOverride.isEmpty()) { + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); awsConf.setSignerOverride(signerOverride); } @@ -196,34 +223,38 @@ public class S3AFileSystem extends FileSystem { initAmazonS3Client(conf, credentials, awsConf); - maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - if (partSize < 5 * 1024 * 1024) { LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); partSize = 5 * 1024 * 1024; } + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); if (multiPartThreshold < 5 * 1024 * 1024) { LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); multiPartThreshold = 5 * 1024 * 1024; } + //check but do not store the block size + longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); + enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS); + readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); + + int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0); + int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors() * 8; } if (coreThreads == 0) { coreThreads = Runtime.getRuntime().availableProcessors() * 8; } - long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); + long keepAliveTime = longOption(conf, KEEPALIVE_TIME, + DEFAULT_KEEPALIVE_TIME, 0); LinkedBlockingQueue workQueue = - new LinkedBlockingQueue<>(maxThreads * - conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); + new LinkedBlockingQueue<>(maxThreads * + intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1)); threadPoolExecutor = new ThreadPoolExecutor( coreThreads, maxThreads, @@ -238,19 +269,17 @@ public class S3AFileSystem extends FileSystem { initCannedAcls(conf); if (!s3.doesBucketExist(bucket)) { - throw new IOException("Bucket " + bucket + " does not exist"); + throw new FileNotFoundException("Bucket " + bucket + " does not exist"); } initMultipartUploads(conf); serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); - setConf(conf); } void initProxySupport(Configuration conf, ClientConfiguration awsConf, - boolean secureConnections) throws IllegalArgumentException, - IllegalArgumentException { + boolean secureConnections) throws IllegalArgumentException { String proxyHost = conf.getTrimmed(PROXY_HOST, ""); int proxyPort = conf.getInt(PROXY_PORT, -1); if (!proxyHost.isEmpty()) { @@ -281,7 +310,8 @@ public class S3AFileSystem extends FileSystem { if (LOG.isDebugEnabled()) { LOG.debug("Using proxy server {}:{} as user {} with password {} on " + "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation()); } @@ -316,7 +346,7 @@ public class S3AFileSystem extends FileSystem { AWSCredentialsProviderChain credentials, ClientConfiguration awsConf) throws IllegalArgumentException { s3 = new AmazonS3Client(credentials, awsConf); - String endPoint = conf.getTrimmed(ENDPOINT,""); + String endPoint = conf.getTrimmed(ENDPOINT, ""); if (!endPoint.isEmpty()) { try { s3.setEndpoint(endPoint); @@ -359,14 +389,25 @@ public class S3AFileSystem extends FileSystem { private void initMultipartUploads(Configuration conf) { boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, - DEFAULT_PURGE_EXISTING_MULTIPART); - long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, - DEFAULT_PURGE_EXISTING_MULTIPART_AGE); + DEFAULT_PURGE_EXISTING_MULTIPART); + long purgeExistingMultipartAge = longOption(conf, + PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0); if (purgeExistingMultipart) { - Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); + Date purgeBefore = + new Date(new Date().getTime() - purgeExistingMultipartAge * 1000); - transfers.abortMultipartUploads(bucket, purgeBefore); + try { + transfers.abortMultipartUploads(bucket, purgeBefore); + } catch (AmazonServiceException e) { + if (e.getStatusCode() == 403) { + instrumentation.errorIgnored(); + LOG.debug("Failed to abort multipart uploads against {}," + + " FS may be read only", bucket, e); + } else { + throw e; + } + } } } @@ -479,16 +520,15 @@ public class S3AFileSystem extends FileSystem { public FSDataInputStream open(Path f, int bufferSize) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Opening '{}' for reading.", f); - } + LOG.debug("Opening '{}' for reading.", f); final FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + f + " because it is a directory"); + throw new FileNotFoundException("Can't open " + f + + " because it is a directory"); } return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), - fileStatus.getLen(), s3, statistics)); + fileStatus.getLen(), s3, statistics, instrumentation, readAhead)); } /** @@ -514,16 +554,26 @@ public class S3AFileSystem extends FileSystem { if (!overwrite && exists(f)) { throw new FileAlreadyExistsException(f + " already exists"); } + instrumentation.fileCreated(); if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, key, progress, statistics, cannedACL, serverSideEncryptionAlgorithm, partSize, multiPartThreshold, threadPoolExecutor), statistics); } - // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file - return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this, - bucket, key, progress, cannedACL, statistics, - serverSideEncryptionAlgorithm), null); + // We pass null to FSDataOutputStream so it won't count writes that + // are being buffered to a file + return new FSDataOutputStream( + new S3AOutputStream(getConf(), + transfers, + this, + bucket, + key, + progress, + cannedACL, + statistics, + serverSideEncryptionAlgorithm), + null); } /** @@ -534,7 +584,7 @@ public class S3AFileSystem extends FileSystem { * @throws IOException indicating that append is not supported. */ public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) throws IOException { + Progressable progress) throws IOException { throw new IOException("Not supported"); } @@ -559,17 +609,13 @@ public class S3AFileSystem extends FileSystem { * @return true if rename is successful */ public boolean rename(Path src, Path dst) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Rename path {} to {}", src, dst); - } + LOG.debug("Rename path {} to {}", src, dst); String srcKey = pathToKey(src); String dstKey = pathToKey(dst); if (srcKey.isEmpty() || dstKey.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src or dst are empty"); - } + LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey); return false; } @@ -582,9 +628,8 @@ public class S3AFileSystem extends FileSystem { } if (srcKey.equals(dstKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src and dst refer to the same file or directory"); - } + LOG.debug("rename: src and dst refer to the same file or directory: {}", + dst); return srcStatus.isFile(); } @@ -593,9 +638,8 @@ public class S3AFileSystem extends FileSystem { dstStatus = getFileStatus(dst); if (srcStatus.isDirectory() && dstStatus.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: src is a directory and dst is a file"); - } + LOG.debug("rename: src {} is a directory and dst {} is a file", + src, dst); return false; } @@ -603,6 +647,7 @@ public class S3AFileSystem extends FileSystem { return false; } } catch (FileNotFoundException e) { + LOG.debug("rename: destination path {} not found", dst); // Parent must exist Path parent = dst.getParent(); if (!pathToKey(parent).isEmpty()) { @@ -612,6 +657,8 @@ public class S3AFileSystem extends FileSystem { return false; } } catch (FileNotFoundException e2) { + LOG.debug("rename: destination path {} has no parent {}", + dst, parent); return false; } } @@ -619,9 +666,7 @@ public class S3AFileSystem extends FileSystem { // Ok! Time to start if (srcStatus.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming file " + src + " to " + dst); - } + LOG.debug("rename: renaming file {} to {}", src, dst); if (dstStatus != null && dstStatus.isDirectory()) { String newDstKey = dstKey; if (!newDstKey.endsWith("/")) { @@ -630,15 +675,13 @@ public class S3AFileSystem extends FileSystem { String filename = srcKey.substring(pathToKey(src.getParent()).length()+1); newDstKey = newDstKey + filename; - copyFile(srcKey, newDstKey); + copyFile(srcKey, newDstKey, srcStatus.getLen()); } else { - copyFile(srcKey, dstKey); + copyFile(srcKey, dstKey, srcStatus.getLen()); } delete(src, false); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("rename: renaming directory " + src + " to " + dst); - } + LOG.debug("rename: renaming directory {} to {}", src, dst); // This is a directory to directory copy if (!dstKey.endsWith("/")) { @@ -651,14 +694,12 @@ public class S3AFileSystem extends FileSystem { //Verify dest is not a child of the source directory if (dstKey.startsWith(srcKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot rename a directory to a subdirectory of self"); - } + LOG.debug("cannot rename a directory {}" + + " to a subdirectory of self: {}", srcKey, dstKey); return false; } - List keysToDelete = - new ArrayList<>(); + List keysToDelete = new ArrayList<>(); if (dstStatus != null && dstStatus.isEmptyDirectory()) { // delete unnecessary fake directory. keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); @@ -676,7 +717,7 @@ public class S3AFileSystem extends FileSystem { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey); + copyFile(summary.getKey(), newDstKey, summary.getSize()); if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { removeKeys(keysToDelete, true); @@ -715,6 +756,7 @@ public class S3AFileSystem extends FileSystem { DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); s3.deleteObjects(deleteRequest); + instrumentation.fileDeleted(keysToDelete.size()); statistics.incrementWriteOps(1); } else { int writeops = 0; @@ -724,7 +766,7 @@ public class S3AFileSystem extends FileSystem { new DeleteObjectRequest(bucket, keyVersion.getKey())); writeops++; } - + instrumentation.fileDeleted(keysToDelete.size()); statistics.incrementWriteOps(writeops); } if (clearKeys) { @@ -742,25 +784,20 @@ public class S3AFileSystem extends FileSystem { * @throws IOException due to inability to delete a directory or file. */ public boolean delete(Path f, boolean recursive) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete path " + f + " - recursive " + recursive); - } + LOG.debug("Delete path {} - recursive {}", f , recursive); S3AFileStatus status; try { status = getFileStatus(f); } catch (FileNotFoundException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Couldn't delete " + f + " - does not exist"); - } + LOG.debug("Couldn't delete {} - does not exist", f); + instrumentation.errorIgnored(); return false; } String key = pathToKey(f); if (status.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a directory"); - } + LOG.debug("delete: Path is a directory: {}", f); if (!recursive && !status.isEmptyDirectory()) { throw new IOException("Path is a folder: " + f + @@ -777,15 +814,12 @@ public class S3AFileSystem extends FileSystem { } if (status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake empty directory"); - } + LOG.debug("Deleting fake empty directory {}", key); s3.deleteObject(bucket, key); + instrumentation.directoryDeleted(); statistics.incrementWriteOps(1); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Getting objects for directory prefix " + key + " to delete"); - } + LOG.debug("Getting objects for directory prefix {} to delete", key); ListObjectsRequest request = new ListObjectsRequest(); request.setBucketName(bucket); @@ -794,16 +828,13 @@ public class S3AFileSystem extends FileSystem { //request.setDelimiter("/"); request.setMaxKeys(maxKeys); - List keys = - new ArrayList<>(); + List keys = new ArrayList<>(); ObjectListing objects = s3.listObjects(request); statistics.incrementReadOps(1); while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); - if (LOG.isDebugEnabled()) { - LOG.debug("Got object to delete " + summary.getKey()); - } + LOG.debug("Got object to delete {}", summary.getKey()); if (keys.size() == MAX_ENTRIES_TO_DELETE) { removeKeys(keys, true); @@ -822,10 +853,9 @@ public class S3AFileSystem extends FileSystem { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("delete: Path is a file"); - } + LOG.debug("delete: Path is a file"); s3.deleteObject(bucket, key); + instrumentation.fileDeleted(1); statistics.incrementWriteOps(1); } @@ -837,9 +867,7 @@ public class S3AFileSystem extends FileSystem { private void createFakeDirectoryIfNecessary(Path f) throws IOException { String key = pathToKey(f); if (!key.isEmpty() && !exists(f)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new fake directory at " + f); - } + LOG.debug("Creating new fake directory at {}", f); createFakeDirectory(bucket, key); } } @@ -856,9 +884,7 @@ public class S3AFileSystem extends FileSystem { public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("List status for path: " + f); - } + LOG.debug("List status for path: {}", f); final List result = new ArrayList(); final FileStatus fileStatus = getFileStatus(f); @@ -874,9 +900,7 @@ public class S3AFileSystem extends FileSystem { request.setDelimiter("/"); request.setMaxKeys(maxKeys); - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: doing listObjects for directory " + key); - } + LOG.debug("listStatus: doing listObjects for directory {}", key); ObjectListing objects = s3.listObjects(request); statistics.incrementReadOps(1); @@ -889,24 +913,18 @@ public class S3AFileSystem extends FileSystem { // Skip over keys that are ourselves and old S3N _$folder$ files if (keyPath.equals(fQualified) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + keyPath); - } + LOG.debug("Ignoring: {}", keyPath); continue; } if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { result.add(new S3AFileStatus(true, true, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fd: " + keyPath); - } + LOG.debug("Adding: fd: {}", keyPath); } else { result.add(new S3AFileStatus(summary.getSize(), dateToLong(summary.getLastModified()), keyPath, getDefaultBlockSize(fQualified))); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fi: " + keyPath); - } + LOG.debug("Adding: fi: {}", keyPath); } } @@ -916,16 +934,11 @@ public class S3AFileSystem extends FileSystem { continue; } result.add(new S3AFileStatus(true, false, keyPath)); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd: " + keyPath); - } + LOG.debug("Adding: rd: {}", keyPath); } if (objects.isTruncated()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: list truncated - getting next batch"); - } - + LOG.debug("listStatus: list truncated - getting next batch"); objects = s3.listNextBatchOfObjects(objects); statistics.incrementReadOps(1); } else { @@ -933,9 +946,7 @@ public class S3AFileSystem extends FileSystem { } } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd (not a dir): " + f); - } + LOG.debug("Adding: rd (not a dir): {}", f); result.add(fileStatus); } @@ -948,14 +959,14 @@ public class S3AFileSystem extends FileSystem { * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. * - * @param new_dir the current working directory. + * @param newDir the current working directory. */ - public void setWorkingDirectory(Path new_dir) { - workingDir = new_dir; + public void setWorkingDirectory(Path newDir) { + workingDir = newDir; } /** - * Get the current working directory for the given file system + * Get the current working directory for the given file system. * @return the directory pathname */ public Path getWorkingDirectory() { @@ -972,10 +983,7 @@ public class S3AFileSystem extends FileSystem { // TODO: If we have created an empty file at /foo/bar and we then call // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Making directory: " + f); - } - + LOG.debug("Making directory: {}", f); try { FileStatus fileStatus = getFileStatus(f); @@ -996,6 +1004,7 @@ public class S3AFileSystem extends FileSystem { fPart)); } } catch (FileNotFoundException fnfe) { + instrumentation.errorIgnored(); } fPart = fPart.getParent(); } while (fPart != null); @@ -1015,10 +1024,7 @@ public class S3AFileSystem extends FileSystem { */ public S3AFileStatus getFileStatus(Path f) throws IOException { String key = pathToKey(f); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting path status for " + f + " (" + key + ")"); - } - + LOG.debug("Getting path status for {} ({})", f , key); if (!key.isEmpty()) { try { @@ -1026,15 +1032,11 @@ public class S3AFileSystem extends FileSystem { statistics.incrementReadOps(1); if (objectRepresentsDirectory(key, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: fake directory"); - } + LOG.debug("Found exact file: fake directory"); return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Found exact file: normal file"); - } + LOG.debug("Found exact file: normal file"); return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), f.makeQualified(uri, workingDir), @@ -1042,25 +1044,23 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(f.toString(), e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(f.toString(), e); throw e; } // Necessary? if (!key.endsWith("/")) { + String newKey = key + "/"; try { - String newKey = key + "/"; ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); statistics.incrementReadOps(1); if (objectRepresentsDirectory(newKey, meta.getContentLength())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Found file (with /): fake directory"); - } + LOG.debug("Found file (with /): fake directory"); return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); } else { LOG.warn("Found file (with /): real file? should not happen: {}", key); @@ -1072,11 +1072,11 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(newKey, e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(newKey, e); throw e; } } @@ -1096,17 +1096,17 @@ public class S3AFileSystem extends FileSystem { statistics.incrementReadOps(1); if (!objects.getCommonPrefixes().isEmpty() - || objects.getObjectSummaries().size() > 0) { + || !objects.getObjectSummaries().isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("Found path as directory (with /): " + - objects.getCommonPrefixes().size() + "/" + + LOG.debug("Found path as directory (with /): {}/{}", + objects.getCommonPrefixes().size() , objects.getObjectSummaries().size()); for (S3ObjectSummary summary : objects.getObjectSummaries()) { - LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); + LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize()); } for (String prefix : objects.getCommonPrefixes()) { - LOG.debug("Prefix: " + prefix); + LOG.debug("Prefix: {}", prefix); } } @@ -1118,17 +1118,15 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(e); + printAmazonServiceException(key, e); throw e; } } catch (AmazonClientException e) { - printAmazonClientException(e); + printAmazonClientException(key, e); throw e; } - if (LOG.isDebugEnabled()) { - LOG.debug("Not Found: " + f); - } + LOG.debug("Not Found: {}", f); throw new FileNotFoundException("No such file or directory: " + f); } @@ -1147,15 +1145,13 @@ public class S3AFileSystem extends FileSystem { */ @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, - Path dst) throws IOException { + Path dst) throws IOException { String key = pathToKey(dst); if (!overwrite && exists(dst)) { - throw new IOException(dst + " already exists"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Copying local file from " + src + " to " + dst); + throw new FileAlreadyExistsException(dst + " already exists"); } + LOG.debug("Copying local file from {} to {}", src, dst); // Since we have a local file, we don't need to stream into a temporary file LocalFileSystem local = getLocal(getConf()); @@ -1181,13 +1177,14 @@ public class S3AFileSystem extends FileSystem { } }; + statistics.incrementWriteOps(1); Upload up = transfers.upload(putObjectRequest); up.addProgressListener(progressListener); try { up.waitForUploadResult(); - statistics.incrementWriteOps(1); } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); + throw new InterruptedIOException("Interrupted copying " + src + + " to " + dst + ", cancelling"); } // This will delete unnecessary fake parent directories @@ -1211,7 +1208,7 @@ public class S3AFileSystem extends FileSystem { } /** - * Override getCononicalServiceName because we don't support token in S3A + * Override getCanonicalServiceName because we don't support token in S3A. */ @Override public String getCanonicalServiceName() { @@ -1219,17 +1216,17 @@ public class S3AFileSystem extends FileSystem { return null; } - private void copyFile(String srcKey, String dstKey) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("copyFile " + srcKey + " -> " + dstKey); - } + private void copyFile(String srcKey, String dstKey, long size) + throws IOException { + LOG.debug("copyFile {} -> {} ", srcKey, dstKey); ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); } - CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + CopyObjectRequest copyObjectRequest = + new CopyObjectRequest(bucket, srcKey, bucket, dstKey); copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setNewObjectMetadata(dstom); @@ -1250,13 +1247,17 @@ public class S3AFileSystem extends FileSystem { try { copy.waitForCopyResult(); statistics.incrementWriteOps(1); + instrumentation.filesCopied(1, size); } catch (InterruptedException e) { - throw new IOException("Got interrupted, cancelling"); + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); } } private boolean objectRepresentsDirectory(final String name, final long size) { - return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + return !name.isEmpty() + && name.charAt(name.length() - 1) == '/' + && size == 0L; } // Handles null Dates that can be returned by AWS @@ -1274,8 +1275,9 @@ public class S3AFileSystem extends FileSystem { private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { while (true) { + String key = ""; try { - String key = pathToKey(f); + key = pathToKey(f); if (key.isEmpty()) { break; } @@ -1283,13 +1285,13 @@ public class S3AFileSystem extends FileSystem { S3AFileStatus status = getFileStatus(f); if (status.isDirectory() && status.isEmptyDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting fake directory " + key + "/"); - } + LOG.debug("Deleting fake directory {}/", key); s3.deleteObject(bucket, key + "/"); statistics.incrementWriteOps(1); } } catch (FileNotFoundException | AmazonServiceException e) { + LOG.debug("While deleting key {} ", key, e); + instrumentation.errorIgnored(); } if (f.isRoot()) { @@ -1325,10 +1327,12 @@ public class S3AFileSystem extends FileSystem { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); + PutObjectRequest putObjectRequest = + new PutObjectRequest(bucketName, objectName, im, om); putObjectRequest.setCannedAcl(cannedACL); s3.putObject(putObjectRequest); statistics.incrementWriteOps(1); + instrumentation.directoryCreated(); } /** @@ -1400,31 +1404,115 @@ public class S3AFileSystem extends FileSystem { /** * Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. + * be split into to minimize I/O time. * @deprecated use {@link #getDefaultBlockSize(Path)} instead */ @Deprecated public long getDefaultBlockSize() { - // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); } - private void printAmazonServiceException(AmazonServiceException ase) { - LOG.info("Caught an AmazonServiceException, which means your request made it " + - "to Amazon S3, but was rejected with an error response for some reason."); - LOG.info("Error Message: " + ase.getMessage()); - LOG.info("HTTP Status Code: " + ase.getStatusCode()); - LOG.info("AWS Error Code: " + ase.getErrorCode()); - LOG.info("Error Type: " + ase.getErrorType()); - LOG.info("Request ID: " + ase.getRequestId()); - LOG.info("Class Name: " + ase.getClass().getName()); + private void printAmazonServiceException(String target, + AmazonServiceException ase) { + LOG.info("{}: caught an AmazonServiceException {}", target, ase); + LOG.info("This means your request made it to Amazon S3," + + " but was rejected with an error response for some reason."); + LOG.info("Error Message: {}", ase.getMessage()); + LOG.info("HTTP Status Code: {}", ase.getStatusCode()); + LOG.info("AWS Error Code: {}", ase.getErrorCode()); + LOG.info("Error Type: {}", ase.getErrorType()); + LOG.info("Request ID: {}", ase.getRequestId()); + LOG.info("Class Name: {}", ase.getClass().getName()); + LOG.info("Exception", ase); } - private void printAmazonClientException(AmazonClientException ace) { - LOG.info("Caught an AmazonClientException, which means the client encountered " + - "a serious internal problem while trying to communicate with S3, " + - "such as not being able to access the network."); - LOG.info("Error Message: {}" + ace, ace); + private void printAmazonClientException(String target, + AmazonClientException ace) { + LOG.info("{}: caught an AmazonClientException {}", target, ace); + LOG.info("This means the client encountered " + + "a problem while trying to communicate with S3, " + + "such as not being able to access the network.", ace); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "S3AFileSystem{"); + sb.append("uri=").append(uri); + sb.append(", workingDir=").append(workingDir); + sb.append(", partSize=").append(partSize); + sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete); + sb.append(", maxKeys=").append(maxKeys); + sb.append(", cannedACL=").append(cannedACL.toString()); + sb.append(", readAhead=").append(readAhead); + sb.append(", blockSize=").append(getDefaultBlockSize()); + sb.append(", multiPartThreshold=").append(multiPartThreshold); + if (serverSideEncryptionAlgorithm != null) { + sb.append(", serverSideEncryptionAlgorithm='") + .append(serverSideEncryptionAlgorithm) + .append('\''); + } + sb.append(", statistics {") + .append(statistics.toString()) + .append("}"); + sb.append(", metrics {") + .append(instrumentation.dump("{", "=", "} ", true)) + .append("}"); + sb.append('}'); + return sb.toString(); + } + + /** + * Get the partition size for multipart operations. + * @return the value as set during initialization + */ + public long getPartitionSize() { + return partSize; + } + + /** + * Get the threshold for multipart files + * @return the value as set during initialization + */ + public long getMultiPartThreshold() { + return multiPartThreshold; + } + + /** + * Get a integer option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static int intOption(Configuration conf, String key, int defVal, int min) { + int v = conf.getInt(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; + } + + /** + * Get a long option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static long longOption(Configuration conf, + String key, + long defVal, + long min) { + long v = conf.getLong(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 42178a49a47..27557f82a83 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -21,20 +21,50 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; - import org.slf4j.Logger; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.net.SocketException; -public class S3AInputStream extends FSInputStream { +/** + * The input stream for an S3A object. + * + * As this stream seeks withing an object, it may close then re-open the stream. + * When this happens, any updated stream data may be retrieved, and, given + * the consistency model of Amazon S3, outdated data may in fact be picked up. + * + * As a result, the outcome of reading from a stream of an object which is + * actively manipulated during the read process is "undefined". + * + * The class is marked as private as code should not be creating instances + * themselves. Any extra feature (e.g instrumentation) should be considered + * unstable. + * + * Because it prints some of the state of the instrumentation, + * the output of {@link #toString()} must also be considered unstable. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AInputStream extends FSInputStream implements CanSetReadahead { + /** + * This is the public position; the one set in {@link #seek(long)} + * and returned in {@link #getPos()}. + */ private long pos; - private boolean closed; + /** + * Closed bit. Volatile so reads are non-blocking. + * Updates must be in a synchronized block to guarantee an atomic check and + * set + */ + private volatile boolean closed; private S3ObjectInputStream wrappedStream; private final FileSystem.Statistics stats; private final AmazonS3Client client; @@ -44,62 +74,65 @@ public class S3AInputStream extends FSInputStream { private final String uri; public static final Logger LOG = S3AFileSystem.LOG; public static final long CLOSE_THRESHOLD = 4096; + private final S3AInstrumentation.InputStreamStatistics streamStatistics; + private long readahead; - // Used by lazy seek + /** + * This is the actual position within the object, used by + * lazy seek to decide whether to seek on the next read or not. + */ private long nextReadPos; - //Amount of data requested from the request + /* Amount of data desired from the request */ private long requestedStreamLen; - public S3AInputStream(String bucket, String key, long contentLength, - AmazonS3Client client, FileSystem.Statistics stats) { + public S3AInputStream(String bucket, + String key, + long contentLength, + AmazonS3Client client, + FileSystem.Statistics stats, + S3AInstrumentation instrumentation, + long readahead) { + Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); + Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); + Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); this.bucket = bucket; this.key = key; this.contentLength = contentLength; this.client = client; this.stats = stats; - this.pos = 0; - this.nextReadPos = 0; - this.closed = false; - this.wrappedStream = null; this.uri = "s3a://" + this.bucket + "/" + this.key; + this.streamStatistics = instrumentation.newInputStreamStatistics(); + setReadahead(readahead); } /** * Opens up the stream at specified target position and for given length. * + * @param reason reason for reopen * @param targetPos target position * @param length length requested * @throws IOException */ - private synchronized void reopen(long targetPos, long length) + private synchronized void reopen(String reason, long targetPos, long length) throws IOException { - requestedStreamLen = (length < 0) ? this.contentLength : - Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length))); + requestedStreamLen = this.contentLength; if (wrappedStream != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing the previous stream"); - } - closeStream(requestedStreamLen); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Requesting for " - + "targetPos=" + targetPos - + ", length=" + length - + ", requestedStreamLen=" + requestedStreamLen - + ", streamPosition=" + pos - + ", nextReadPosition=" + nextReadPos - ); + closeStream("reopen(" + reason + ")", requestedStreamLen); } + LOG.debug("reopen({}) for {} at targetPos={}, length={}," + + " requestedStreamLen={}, streamPosition={}, nextReadPosition={}", + uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos); + streamStatistics.streamOpened(); GetObjectRequest request = new GetObjectRequest(bucket, key) .withRange(targetPos, requestedStreamLen); wrappedStream = client.getObject(request).getObjectContent(); if (wrappedStream == null) { - throw new IOException("Null IO stream"); + throw new IOException("Null IO stream from reopen of (" + reason + ") " + + uri); } this.pos = targetPos; @@ -128,6 +161,20 @@ public class S3AInputStream extends FSInputStream { nextReadPos = targetPos; } + /** + * Seek without raising any exception. This is for use in + * {@code finally} clauses + * @param positiveTargetPos a target position which must be positive. + */ + private void seekQuietly(long positiveTargetPos) { + try { + seek(positiveTargetPos); + } catch (IOException ioe) { + LOG.debug("Ignoring IOE on seek of {} to {}", + uri, positiveTargetPos, ioe); + } + } + /** * Adjust the stream to a specific position. * @@ -140,23 +187,50 @@ public class S3AInputStream extends FSInputStream { if (wrappedStream == null) { return; } - // compute how much more to skip long diff = targetPos - pos; - if (targetPos > pos) { - if ((diff + length) <= wrappedStream.available()) { - // already available in buffer - pos += wrappedStream.skip(diff); - if (pos != targetPos) { - throw new IOException("Failed to seek to " + targetPos - + ". Current position " + pos); + if (diff > 0) { + // forward seek -this is where data can be skipped + + int available = wrappedStream.available(); + // always seek at least as far as what is available + long forwardSeekRange = Math.max(readahead, available); + // work out how much is actually left in the stream + // then choose whichever comes first: the range or the EOF + long forwardSeekLimit = Math.min(remaining(), forwardSeekRange); + if (diff <= forwardSeekLimit) { + // the forward seek range is within the limits + LOG.debug("Forward seek on {}, of {} bytes", uri, diff); + streamStatistics.seekForwards(diff); + long skipped = wrappedStream.skip(diff); + if (skipped > 0) { + pos += skipped; + // as these bytes have been read, they are included in the counter + incrementBytesRead(diff); + } + + if (pos == targetPos) { + // all is well + return; + } else { + // log a warning; continue to attempt to re-open + LOG.warn("Failed to seek on {} to {}. Current position {}", + uri, targetPos, pos); } - return; } + } else if (diff < 0) { + // backwards seek + streamStatistics.seekBackwards(diff); + } else { + // targetPos == pos + // this should never happen as the caller filters it out. + // Retained just in case + LOG.debug("Ignoring seek {} to {} as target position == current", + uri, targetPos); } // close the stream; if read the object will be opened at the new pos - closeStream(this.requestedStreamLen); + closeStream("seekInStream()", this.requestedStreamLen); pos = targetPos; } @@ -179,7 +253,19 @@ public class S3AInputStream extends FSInputStream { //re-open at specific location if needed if (wrappedStream == null) { - reopen(targetPos, len); + reopen("read from new offset", targetPos, len); + } + } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + streamStatistics.bytesRead(bytesRead); + if (stats != null && bytesRead > 0) { + stats.incrementBytesRead(bytesRead); } } @@ -195,13 +281,11 @@ public class S3AInputStream extends FSInputStream { int byteRead; try { byteRead = wrappedStream.read(); - } catch (SocketTimeoutException | SocketException e) { - LOG.info("Got exception while trying to read from stream," - + " trying to recover " + e); - reopen(pos, 1); - byteRead = wrappedStream.read(); } catch (EOFException e) { return -1; + } catch (IOException e) { + onReadFailure(e, 1); + byteRead = wrappedStream.read(); } if (byteRead >= 0) { @@ -209,12 +293,36 @@ public class S3AInputStream extends FSInputStream { nextReadPos++; } - if (stats != null && byteRead >= 0) { - stats.incrementBytesRead(1); + if (byteRead >= 0) { + incrementBytesRead(1); } return byteRead; } + /** + * Handle an IOE on a read by attempting to re-open the stream. + * The filesystem's readException count will be incremented. + * @param ioe exception caught. + * @param length length of data being attempted to read + * @throws IOException any exception thrown on the re-open attempt. + */ + private void onReadFailure(IOException ioe, int length) throws IOException { + LOG.info("Got exception while trying to read from stream {}" + + " trying to recover: "+ ioe, uri); + LOG.debug("While trying to read from stream {}", uri, ioe); + streamStatistics.readException(); + reopen("failure recovery", pos, length); + } + + /** + * {@inheritDoc} + * + * This updates the statistics on read operations started and whether + * or not the read operation "completed", that is: returned the exact + * number of bytes requested. + * @throws EOFException if there is no more data + * @throws IOException if there are other problems + */ @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { @@ -230,61 +338,85 @@ public class S3AInputStream extends FSInputStream { } lazySeek(nextReadPos, len); + streamStatistics.readOperationStarted(nextReadPos, len); - int byteRead; + int bytesRead; try { - byteRead = wrappedStream.read(buf, off, len); - } catch (SocketTimeoutException | SocketException e) { - LOG.info("Got exception while trying to read from stream," - + " trying to recover " + e); - reopen(pos, len); - byteRead = wrappedStream.read(buf, off, len); + bytesRead = wrappedStream.read(buf, off, len); + } catch (EOFException e) { + throw e; + } catch (IOException e) { + onReadFailure(e, len); + bytesRead = wrappedStream.read(buf, off, len); } - if (byteRead > 0) { - pos += byteRead; - nextReadPos += byteRead; + if (bytesRead > 0) { + pos += bytesRead; + nextReadPos += bytesRead; } - - if (stats != null && byteRead > 0) { - stats.incrementBytesRead(byteRead); - } - - return byteRead; + incrementBytesRead(bytesRead); + streamStatistics.readOperationCompleted(len, bytesRead); + return bytesRead; } + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ private void checkNotClosed() throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + /** + * Close the stream. + * This triggers publishing of the stream statistics back to the filesystem + * statistics. + * This operation is synchronized, so that only one thread can attempt to + * close the connection; all later/blocked calls are no-ops. + * @throws IOException on any problem + */ @Override public synchronized void close() throws IOException { - super.close(); - closed = true; - closeStream(this.contentLength); + if (!closed) { + closed = true; + try { + // close or abort the stream + closeStream("close() operation", this.contentLength); + // this is actually a no-op + super.close(); + } finally { + // merge the statistics back into the FS statistics. + streamStatistics.close(); + } + } } /** * Close a stream: decide whether to abort or close, based on * the length of the stream and the current position. + * If a close() is attempted and fails, the operation escalates to + * an abort. * * This does not set the {@link #closed} flag. + * + * @param reason reason for stream being closed; used in messages * @param length length of the stream. - * @throws IOException */ - private void closeStream(long length) throws IOException { + private void closeStream(String reason, long length) { if (wrappedStream != null) { - String reason = null; boolean shouldAbort = length - pos > CLOSE_THRESHOLD; if (!shouldAbort) { try { - reason = "Closed stream"; + // clean close. This will read to the end of the stream, + // so, while cleaner, can be pathological on a multi-GB object wrappedStream.close(); + streamStatistics.streamClose(false); } catch (IOException e) { // exception escalates to an abort - LOG.debug("When closing stream", e); + LOG.debug("When closing {} stream for {}", uri, reason, e); shouldAbort = true; } } @@ -292,13 +424,12 @@ public class S3AInputStream extends FSInputStream { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. wrappedStream.abort(); - reason = "Closed stream with abort"; - } - if (LOG.isDebugEnabled()) { - LOG.debug(reason + "; streamPos=" + pos - + ", nextReadPos=" + nextReadPos - + ", contentLength=" + length); + streamStatistics.streamClose(true); } + LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," + + " length={}", + uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos, + length); wrappedStream = null; } } @@ -307,19 +438,34 @@ public class S3AInputStream extends FSInputStream { public synchronized int available() throws IOException { checkNotClosed(); - long remaining = this.contentLength - this.pos; + long remaining = remaining(); if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } return (int)remaining; } + /** + * Bytes left in stream. + * @return how many bytes are left to read + */ + protected long remaining() { + return this.contentLength - this.pos; + } + @Override public boolean markSupported() { return false; } + /** + * String value includes statistics as well as stream state. + * Important: there are no guarantees as to the stability + * of this value. + * @return a string value for printing in logs/diagnostics + */ @Override + @InterfaceStability.Unstable public String toString() { final StringBuilder sb = new StringBuilder( "S3AInputStream{"); @@ -327,6 +473,7 @@ public class S3AInputStream extends FSInputStream { sb.append(" pos=").append(pos); sb.append(" nextReadPos=").append(nextReadPos); sb.append(" contentLength=").append(contentLength); + sb.append(" ").append(streamStatistics.toString()); sb.append('}'); return sb.toString(); } @@ -348,6 +495,7 @@ public class S3AInputStream extends FSInputStream { throws IOException { checkNotClosed(); validatePositionedReadArgs(position, buffer, offset, length); + streamStatistics.readFullyOperationStarted(position, length); if (length == 0) { return; } @@ -363,10 +511,38 @@ public class S3AInputStream extends FSInputStream { } nread += nbytes; } - } finally { - seek(oldPos); + seekQuietly(oldPos); } } } + + /** + * Access the input stream statistics. + * This is for internal testing and may be removed without warning. + * @return the statistics for this input stream + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public S3AInstrumentation.InputStreamStatistics getS3AStreamStatistics() { + return streamStatistics; + } + + @Override + public void setReadahead(Long readahead) { + if (readahead == null) { + this.readahead = Constants.DEFAULT_READAHEAD_RANGE; + } else { + Preconditions.checkArgument(readahead >= 0, "Negative readahead value"); + this.readahead = readahead; + } + } + + /** + * Get the current readahead value. + * @return a non-negative readahead value + */ + public long getReadahead() { + return readahead; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java new file mode 100644 index 00000000000..285f2284b6c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * Instrumentation of S3a. + * Derived from the {@code AzureFileSystemInstrumentation} + */ +@Metrics(about = "Metrics for S3a", context = "S3AFileSystem") +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class S3AInstrumentation { + public static final String CONTEXT = "S3AFileSystem"; + + public static final String STREAM_OPENED = "streamOpened"; + public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations"; + public static final String STREAM_CLOSED = "streamClosed"; + public static final String STREAM_ABORTED = "streamAborted"; + public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions"; + public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations"; + public static final String STREAM_FORWARD_SEEK_OPERATIONS + = "streamForwardSeekOperations"; + public static final String STREAM_BACKWARD_SEEK_OPERATIONS + = "streamBackwardSeekOperations"; + public static final String STREAM_SEEK_BYTES_SKIPPED = + "streamBytesSkippedOnSeek"; + public static final String STREAM_SEEK_BYTES_BACKWARDS = + "streamBytesBackwardsOnSeek"; + public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead"; + public static final String STREAM_READ_OPERATIONS = "streamReadOperations"; + public static final String STREAM_READ_FULLY_OPERATIONS + = "streamReadFullyOperations"; + public static final String STREAM_READ_OPERATIONS_INCOMPLETE + = "streamReadOperationsIncomplete"; + public static final String FILES_CREATED = "files_created"; + public static final String FILES_COPIED = "files_copied"; + public static final String FILES_COPIED_BYTES = "files_copied_bytes"; + public static final String FILES_DELETED = "files_deleted"; + public static final String DIRECTORIES_CREATED = "directories_created"; + public static final String DIRECTORIES_DELETED = "directories_deleted"; + public static final String IGNORED_ERRORS = "ignored_errors"; + private final MetricsRegistry registry = + new MetricsRegistry("S3AFileSystem").setContext(CONTEXT); + private final MutableCounterLong streamOpenOperations; + private final MutableCounterLong streamCloseOperations; + private final MutableCounterLong streamClosed; + private final MutableCounterLong streamAborted; + private final MutableCounterLong streamSeekOperations; + private final MutableCounterLong streamReadExceptions; + private final MutableCounterLong streamForwardSeekOperations; + private final MutableCounterLong streamBackwardSeekOperations; + private final MutableCounterLong streamBytesSkippedOnSeek; + private final MutableCounterLong streamBytesBackwardsOnSeek; + private final MutableCounterLong streamBytesRead; + private final MutableCounterLong streamReadOperations; + private final MutableCounterLong streamReadFullyOperations; + private final MutableCounterLong streamReadsIncomplete; + private final MutableCounterLong ignoredErrors; + + private final MutableCounterLong numberOfFilesCreated; + private final MutableCounterLong numberOfFilesCopied; + private final MutableCounterLong bytesOfFilesCopied; + private final MutableCounterLong numberOfFilesDeleted; + private final MutableCounterLong numberOfDirectoriesCreated; + private final MutableCounterLong numberOfDirectoriesDeleted; + private final Map streamMetrics = new HashMap<>(); + + public S3AInstrumentation(URI name) { + UUID fileSystemInstanceId = UUID.randomUUID(); + registry.tag("FileSystemId", + "A unique identifier for the FS ", + fileSystemInstanceId.toString() + "-" + name.getHost()); + registry.tag("fsURI", + "URI of this filesystem", + name.toString()); + streamOpenOperations = streamCounter(STREAM_OPENED, + "Total count of times an input stream to object store was opened"); + streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS, + "Total count of times an attempt to close a data stream was made"); + streamClosed = streamCounter(STREAM_CLOSED, + "Count of times the TCP stream was closed"); + streamAborted = streamCounter(STREAM_ABORTED, + "Count of times the TCP stream was aborted"); + streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS, + "Number of seek operations invoked on input streams"); + streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS, + "Number of read exceptions caught and attempted to recovered from"); + streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS, + "Number of executed seek operations which went forward in a stream"); + streamBackwardSeekOperations = streamCounter( + STREAM_BACKWARD_SEEK_OPERATIONS, + "Number of executed seek operations which went backwards in a stream"); + streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED, + "Count of bytes skipped during forward seek operations"); + streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS, + "Count of bytes moved backwards during seek operations"); + streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ, + "Count of bytes read during seek() in stream operations"); + streamReadOperations = streamCounter(STREAM_READ_OPERATIONS, + "Count of read() operations in streams"); + streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS, + "Count of readFully() operations in streams"); + streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE, + "Count of incomplete read() operations in streams"); + + numberOfFilesCreated = counter(FILES_CREATED, + "Total number of files created through the object store."); + numberOfFilesCopied = counter(FILES_COPIED, + "Total number of files copied within the object store."); + bytesOfFilesCopied = counter(FILES_COPIED_BYTES, + "Total number of bytes copied within the object store."); + numberOfFilesDeleted = counter(FILES_DELETED, + "Total number of files deleted through from the object store."); + numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED, + "Total number of directories created through the object store."); + numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED, + "Total number of directories deleted through the object store."); + ignoredErrors = counter(IGNORED_ERRORS, + "Total number of errors caught and ingored."); + } + + /** + * Create a counter in the registry. + * @param name counter name + * @param desc counter description + * @return a new counter + */ + protected final MutableCounterLong counter(String name, String desc) { + return registry.newCounter(name, desc, 0L); + } + + /** + * Create a counter in the stream map: these are unregistered in the public + * metrics. + * @param name counter name + * @param desc counter description + * @return a new counter + */ + protected final MutableCounterLong streamCounter(String name, String desc) { + MutableCounterLong counter = new MutableCounterLong( + Interns.info(name, desc), 0L); + streamMetrics.put(name, counter); + return counter; + } + + /** + * Create a gauge in the registry. + * @param name name gauge name + * @param desc description + * @return the gauge + */ + protected final MutableGaugeLong gauge(String name, String desc) { + return registry.newGauge(name, desc, 0L); + } + + /** + * Get the metrics registry. + * @return the registry + */ + public MetricsRegistry getRegistry() { + return registry; + } + + /** + * Dump all the metrics to a string. + * @param prefix prefix before every entry + * @param separator separator between name and value + * @param suffix suffix + * @param all get all the metrics even if the values are not changed. + * @return a string dump of the metrics + */ + public String dump(String prefix, + String separator, + String suffix, + boolean all) { + MetricStringBuilder metricBuilder = new MetricStringBuilder(null, + prefix, + separator, suffix); + registry.snapshot(metricBuilder, all); + for (Map.Entry entry: + streamMetrics.entrySet()) { + metricBuilder.tuple(entry.getKey(), + Long.toString(entry.getValue().value())); + } + return metricBuilder.toString(); + } + + /** + * Indicate that S3A created a file. + */ + public void fileCreated() { + numberOfFilesCreated.incr(); + } + + /** + * Indicate that S3A deleted one or more file.s + * @param count number of files. + */ + public void fileDeleted(int count) { + numberOfFilesDeleted.incr(count); + } + + /** + * Indicate that S3A created a directory. + */ + public void directoryCreated() { + numberOfDirectoriesCreated.incr(); + } + + /** + * Indicate that S3A just deleted a directory. + */ + public void directoryDeleted() { + numberOfDirectoriesDeleted.incr(); + } + + /** + * Indicate that S3A copied some files within the store. + * + * @param files number of files + * @param size total size in bytes + */ + public void filesCopied(int files, long size) { + numberOfFilesCopied.incr(files); + bytesOfFilesCopied.incr(size); + } + + /** + * Note that an error was ignored. + */ + public void errorIgnored() { + ignoredErrors.incr(); + } + + /** + * Create a stream input statistics instance. + * @return the new instance + */ + InputStreamStatistics newInputStreamStatistics() { + return new InputStreamStatistics(); + } + + /** + * Merge in the statistics of a single input stream into + * the filesystem-wide statistics. + * @param statistics stream statistics + */ + private void mergeInputStreamStatistics(InputStreamStatistics statistics) { + streamOpenOperations.incr(statistics.openOperations); + streamCloseOperations.incr(statistics.closeOperations); + streamClosed.incr(statistics.closed); + streamAborted.incr(statistics.aborted); + streamSeekOperations.incr(statistics.seekOperations); + streamReadExceptions.incr(statistics.readExceptions); + streamForwardSeekOperations.incr(statistics.forwardSeekOperations); + streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek); + streamBackwardSeekOperations.incr(statistics.backwardSeekOperations); + streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek); + streamBytesRead.incr(statistics.bytesRead); + streamReadOperations.incr(statistics.readOperations); + streamReadFullyOperations.incr(statistics.readFullyOperations); + streamReadsIncomplete.incr(statistics.readsIncomplete); + } + + /** + * Statistics updated by an input stream during its actual operation. + * These counters not thread-safe and are for use in a single instance + * of a stream. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public final class InputStreamStatistics implements AutoCloseable { + public long openOperations; + public long closeOperations; + public long closed; + public long aborted; + public long seekOperations; + public long readExceptions; + public long forwardSeekOperations; + public long backwardSeekOperations; + public long bytesRead; + public long bytesSkippedOnSeek; + public long bytesBackwardsOnSeek; + public long readOperations; + public long readFullyOperations; + public long readsIncomplete; + + private InputStreamStatistics() { + } + + /** + * Seek backwards, incrementing the seek and backward seek counters. + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + public void seekBackwards(long negativeOffset) { + seekOperations++; + backwardSeekOperations++; + bytesBackwardsOnSeek -= negativeOffset; + } + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + public void seekForwards(long skipped) { + seekOperations++; + forwardSeekOperations++; + if (skipped > 0) { + bytesSkippedOnSeek += skipped; + } + } + + /** + * The inner stream was opened. + */ + public void streamOpened() { + openOperations++; + } + + /** + * The inner stream was closed. + * @param abortedConnection flag to indicate the stream was aborted, + * rather than closed cleanly + */ + public void streamClose(boolean abortedConnection) { + closeOperations++; + if (abortedConnection) { + this.aborted++; + } else { + closed++; + } + } + + /** + * An ignored stream read exception was received. + */ + public void readException() { + readExceptions++; + } + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * @param bytes number of bytes read + */ + public void bytesRead(long bytes) { + if (bytes > 0) { + bytesRead += bytes; + } + } + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + public void readOperationStarted(long pos, long len) { + readOperations++; + } + + /** + * A {@code PositionedRead.read(position, buffer, offset, length)} + * operation has just started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + public void readFullyOperationStarted(long pos, long len) { + readFullyOperations++; + } + + /** + * A read operation has completed. + * @param requested number of requested bytes + * @param actual the actual number of bytes + */ + public void readOperationCompleted(int requested, int actual) { + if (requested > actual) { + readsIncomplete++; + } + } + + /** + * Close triggers the merge of statistics into the filesystem's + * instrumentation instance. + */ + @Override + public void close() { + mergeInputStreamStatistics(this); + } + + /** + * String operator describes all the current statistics. + * Important: there are no guarantees as to the stability + * of this value. + * @return the current values of the stream statistics. + */ + @Override + @InterfaceStability.Unstable + public String toString() { + final StringBuilder sb = new StringBuilder( + "StreamStatistics{"); + sb.append("OpenOperations=").append(openOperations); + sb.append(", CloseOperations=").append(closeOperations); + sb.append(", Closed=").append(closed); + sb.append(", Aborted=").append(aborted); + sb.append(", SeekOperations=").append(seekOperations); + sb.append(", ReadExceptions=").append(readExceptions); + sb.append(", ForwardSeekOperations=") + .append(forwardSeekOperations); + sb.append(", BackwardSeekOperations=") + .append(backwardSeekOperations); + sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek); + sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek); + sb.append(", BytesRead=").append(bytesRead); + sb.append(", BytesRead excluding skipped=") + .append(bytesRead - bytesSkippedOnSeek); + sb.append(", ReadOperations=").append(readOperations); + sb.append(", ReadFullyOperations=").append(readFullyOperations); + sb.append(", ReadsIncomplete=").append(readsIncomplete); + sb.append('}'); + return sb.toString(); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 3e079f23679..f9ff701a9b9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -21,14 +21,14 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; import com.amazonaws.services.s3.transfer.Upload; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -46,6 +46,11 @@ import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; +/** + * Output stream to save data to S3. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { private OutputStream backupStream; private File backupFile; @@ -65,9 +70,9 @@ public class S3AOutputStream extends OutputStream { public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, TransferManager transfers, - S3AFileSystem fs, String bucket, String key, Progressable progress, - CannedAccessControlList cannedACL, FileSystem.Statistics statistics, - String serverSideEncryptionAlgorithm) + S3AFileSystem fs, String bucket, String key, Progressable progress, + CannedAccessControlList cannedACL, FileSystem.Statistics statistics, + String serverSideEncryptionAlgorithm) throws IOException { this.bucket = bucket; this.key = key; @@ -78,9 +83,8 @@ public class S3AOutputStream extends OutputStream { this.statistics = statistics; this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); + partSize = fs.getPartitionSize(); + partSizeThreshold = fs.getMultiPartThreshold(); if (conf.get(BUFFER_DIR, null) != null) { lDirAlloc = new LocalDirAllocator(BUFFER_DIR); @@ -91,10 +95,8 @@ public class S3AOutputStream extends OutputStream { backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); closed = false; - if (LOG.isDebugEnabled()) { - LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " + - this.backupFile); - } + LOG.debug("OutputStream for key '{}' writing to tempfile: {}", + key, backupFile); this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); } @@ -111,10 +113,9 @@ public class S3AOutputStream extends OutputStream { } backupStream.close(); - if (LOG.isDebugEnabled()) { - LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload"); - LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold); - } + LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key); + LOG.debug("Minimum upload part size: {} threshold {}" , partSize, + partSizeThreshold); try { @@ -129,7 +130,7 @@ public class S3AOutputStream extends OutputStream { Upload upload = transfers.upload(putObjectRequest); ProgressableProgressListener listener = - new ProgressableProgressListener(upload, progress, statistics); + new ProgressableProgressListener(upload, progress, statistics); upload.addProgressListener(listener); upload.waitForUploadResult(); @@ -168,6 +169,9 @@ public class S3AOutputStream extends OutputStream { backupStream.write(b, off, len); } + /** + * Listener to progress from AWS regarding transfers. + */ public static class ProgressableProgressListener implements ProgressListener { private Progressable progress; private FileSystem.Statistics statistics; @@ -175,7 +179,7 @@ public class S3AOutputStream extends OutputStream { private Upload upload; public ProgressableProgressListener(Upload upload, Progressable progress, - FileSystem.Statistics statistics) { + FileSystem.Statistics statistics) { this.upload = upload; this.progress = progress; this.statistics = statistics; diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 002338d3464..a46e0847762 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -437,6 +437,14 @@ this capability. The implementation class of the S3A AbstractFileSystem. + + fs.s3a.readahead.range + 65536 + Bytes to read ahead during a seek() before closing and + re-opening the S3 HTTP connection. This option will be overridden if + any call to setReadahead() is made to an open stream. + + ### S3AFastOutputStream **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk** @@ -647,7 +655,7 @@ Example: + href="/home/testuser/.ssh/auth-keys.xml"/> fs.contract.test.fs.s3 @@ -667,7 +675,61 @@ Example: -This example pulls in the `auth-keys.xml` file for the credentials. +This example pulls in the `~/.ssh/auth-keys.xml` file for the credentials. This provides one single place to keep the keys up to date —and means that the file `contract-test-options.xml` does not contain any -secret credentials itself. +secret credentials itself. As the auth keys XML file is kept out of the +source code tree, it is not going to get accidentally committed. + +### Running Performance Tests against non-AWS storage infrastructures + + +#### CSV Data source + +The `TestS3AInputStreamPerformance` tests require read access to a multi-MB +text file. The default file for these tests is one published by amazon, +[s3a://landsat-pds.s3.amazonaws.com/scene_list.gz](http://landsat-pds.s3.amazonaws.com/scene_list.gz). +This is a gzipped CSV index of other files which amazon serves for open use. + +The path to this object is set in the option `fs.s3a.scale.test.csvfile`: + + + fs.s3a.scale.test.csvfile + s3a://landsat-pds/scene_list.gz + + +1. If the option is not overridden, the default value is used. This +is hosted in Amazon's US-east datacenter. +1. If the property is empty, tests which require it will be skipped. +1. If the data cannot be read for any reason then the test will fail. +1. If the property is set to a different path, then that data must be readable +and "sufficiently" large. + +To test on different S3 endpoints, or alternate infrastructures supporting +the same APIs, the option `fs.s3a.scale.test.csvfile` must therefore be +set to " ", or an object of at least 10MB is uploaded to the object store, and +the `fs.s3a.scale.test.csvfile` option set to its path. + + + fs.s3a.scale.test.csvfile + + + + +#### Scale test operation count + +Some scale tests perform multiple operations (such as creating many directories). + +The exact number of operations to perform is configurable in the option +`scale.test.operation.count` + + + scale.test.operation.count + 10 + + +Larger values generate more load, and are recommended when testing locally, +or in batch runs. + +Smaller values should result in faster test runs, especially when the object +store is a long way away. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index e44a90e902e..42c552a3bbf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -19,18 +19,25 @@ package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; +import java.io.InputStream; +import java.util.Locale; import static org.junit.Assume.assumeTrue; @@ -38,30 +45,61 @@ import static org.junit.Assume.assumeTrue; * Base class for scale tests; here is where the common scale configuration * keys are defined. */ -public class S3AScaleTestBase { +public class S3AScaleTestBase extends Assert { public static final String SCALE_TEST = "scale.test."; + public static final String S3A_SCALE_TEST = "fs.s3a.scale.test."; + + @Rule + public TestName methodName = new TestName(); + + @BeforeClass + public static void nameThread() { + Thread.currentThread().setName("JUnit"); + } + /** - * The number of operations to perform: {@value} + * The number of operations to perform: {@value}. */ public static final String KEY_OPERATION_COUNT = SCALE_TEST + "operation.count"; /** - * The default number of operations to perform: {@value} + * The readahead buffer: {@value}. + */ + public static final String KEY_READ_BUFFER_SIZE = + S3A_SCALE_TEST + "read.buffer.size"; + + public static final int DEFAULT_READ_BUFFER_SIZE = 16384; + + /** + * Key for a multi MB test file: {@value}. + */ + public static final String KEY_CSVTEST_FILE = + S3A_SCALE_TEST + "csvfile"; + + /** + * Default path for the multi MB test file: {@value}. + */ + public static final String DEFAULT_CSVTEST_FILE + = "s3a://landsat-pds/scene_list.gz"; + + /** + * The default number of operations to perform: {@value}. */ public static final long DEFAULT_OPERATION_COUNT = 2005; protected S3AFileSystem fs; - private static final Logger LOG = + + protected static final Logger LOG = LoggerFactory.getLogger(S3AScaleTestBase.class); private Configuration conf; /** * Configuration generator. May be overridden to inject - * some custom options + * some custom options. * @return a configuration with which to create FS instances */ protected Configuration createConfiguration() { @@ -69,7 +107,7 @@ public class S3AScaleTestBase { } /** - * Get the configuration used to set up the FS + * Get the configuration used to set up the FS. * @return the configuration */ public Configuration getConf() { @@ -79,7 +117,7 @@ public class S3AScaleTestBase { @Before public void setUp() throws Exception { conf = createConfiguration(); - LOG.info("Scale test operation count = {}", getOperationCount()); + LOG.debug("Scale test operation count = {}", getOperationCount()); fs = S3ATestUtils.createTestFileSystem(conf); } @@ -95,4 +133,139 @@ public class S3AScaleTestBase { protected long getOperationCount() { return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT); } + + /** + * Describe a test in the logs + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n{}: {}\n", + methodName.getMethodName(), + String.format(text, args)); + } + + /** + * Get the input stream statistics of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the statistics for the inner stream + */ + protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics( + FSDataInputStream in) { + InputStream inner = in.getWrappedStream(); + if (inner instanceof S3AInputStream) { + S3AInputStream s3a = (S3AInputStream) inner; + return s3a.getS3AStreamStatistics(); + } else { + Assert.fail("Not an S3AInputStream: " + inner); + // never reached + return null; + } + } + + /** + * Make times more readable, by adding a "," every three digits. + * @param nanos nanos or other large number + * @return a string for logging + */ + protected static String toHuman(long nanos) { + return String.format(Locale.ENGLISH, "%,d", nanos); + } + + /** + * Log the bandwidth of a timer as inferred from the number of + * bytes processed. + * @param timer timer + * @param bytes bytes processed in the time period + */ + protected void bandwidth(NanoTimer timer, long bytes) { + LOG.info("Bandwidth = {} MB/S", + timer.bandwidthDescription(bytes)); + } + + /** + * Work out the bandwidth in MB/s + * @param bytes bytes + * @param durationNS duration in nanos + * @return the number of megabytes/second of the recorded operation + */ + public static double bandwidthMBs(long bytes, long durationNS) { + return (bytes * 1000.0 ) / durationNS; + } + + /** + * A simple class for timing operations in nanoseconds, and for + * printing some useful results in the process. + */ + protected static class NanoTimer { + final long startTime; + long endTime; + + public NanoTimer() { + startTime = now(); + } + + /** + * End the operation + * @return the duration of the operation + */ + public long end() { + endTime = now(); + return duration(); + } + + /** + * End the operation; log the duration + * @param format message + * @param args any arguments + * @return the duration of the operation + */ + public long end(String format, Object... args) { + long d = end(); + LOG.info("Duration of {}: {} nS", + String.format(format, args), toHuman(d)); + return d; + } + + long now() { + return System.nanoTime(); + } + + long duration() { + return endTime - startTime; + } + + double bandwidth(long bytes) { + return S3AScaleTestBase.bandwidthMBs(bytes, duration()); + } + + /** + * Bandwidth as bytes per second + * @param bytes bytes in + * @return the number of bytes per second this operation timed. + */ + double bandwidthBytes(long bytes) { + return (bytes * 1.0 ) / duration(); + } + + /** + * How many nanoseconds per byte + * @param bytes bytes processed in this time period + * @return the nanoseconds it took each byte to be processed + */ + long nanosPerByte(long bytes) { + return duration() / bytes; + } + + /** + * Get a description of the bandwidth, even down to fractions of + * a MB + * @param bytes bytes processed + * @return bandwidth + */ + String bandwidthDescription(long bytes) { + return String.format("%,.6f", bandwidth(bytes)); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java index d521ba8ac99..e3c6fa09a07 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java @@ -40,7 +40,6 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestS3ADeleteManyFiles.class); - @Rule public Timeout testTimeout = new Timeout(30 * 60 * 1000); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java new file mode 100644 index 00000000000..0c8b2732c56 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Look at the performance of S3a operations + */ +public class TestS3AInputStreamPerformance extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + TestS3AInputStreamPerformance.class); + + private S3AFileSystem s3aFS; + private Path testData; + private S3AFileStatus testDataStatus; + private FSDataInputStream in; + private S3AInstrumentation.InputStreamStatistics streamStatistics; + public static final int BLOCK_SIZE = 32 * 1024; + public static final int BIG_BLOCK_SIZE = 256 * 1024; + + /** Tests only run if the there is a named test file that can be read */ + private boolean testDataAvailable = true; + private String assumptionMessage = "test file"; + + /** + * Open the FS and the test data. The input stream is always set up here. + * @throws IOException + */ + @Before + public void openFS() throws IOException { + Configuration conf = getConf(); + String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE); + if (testFile.isEmpty()) { + assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE; + testDataAvailable = false; + } else { + testData = new Path(testFile); + s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf); + try { + testDataStatus = s3aFS.getFileStatus(testData); + } catch (IOException e) { + LOG.warn("Failed to read file {} specified in {}", + testFile, KEY_CSVTEST_FILE, e); + throw e; + } + } + } + + /** + * Cleanup: close the stream, close the FS. + */ + @After + public void cleanup() { + IOUtils.closeStream(in); + IOUtils.closeStream(s3aFS); + } + + /** + * Declare that the test requires the CSV test dataset + */ + private void requireCSVTestData() { + Assume.assumeTrue(assumptionMessage, testDataAvailable); + } + + /** + * Open the test file with the read buffer specified in the setting + * {@link #KEY_READ_BUFFER_SIZE} + * @return the stream, wrapping an S3a one + * @throws IOException + */ + FSDataInputStream openTestFile() throws IOException { + int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, + DEFAULT_READ_BUFFER_SIZE); + FSDataInputStream stream = s3aFS.open(testData, bufferSize); + streamStatistics = getInputStreamStatistics(stream); + return stream; + } + + /** + * assert tha the stream was only ever opened once + */ + protected void assertStreamOpenedExactlyOnce() { + assertOpenOperationCount(1); + } + + /** + * Make an assertion count about the number of open operations + * @param expected the expected number + */ + private void assertOpenOperationCount(int expected) { + assertEquals("open operations in " + streamStatistics, + expected, streamStatistics.openOperations); + } + + /** + * Log how long an IOP took, by dividing the total time by the + * count of operations, printing in a human-readable form + * @param timer timing data + * @param count IOP count. + */ + protected void logTimePerIOP(NanoTimer timer, long count) { + LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count)); + } + + @Test + public void testTimeToOpenAndReadWholeFileByByte() throws Throwable { + requireCSVTestData(); + describe("Open the test file %s and read it byte by byte", testData); + long len = testDataStatus.getLen(); + NanoTimer timeOpen = new NanoTimer(); + in = openTestFile(); + timeOpen.end("Open stream"); + NanoTimer readTimer = new NanoTimer(); + long count = 0; + while (in.read() >= 0) { + count ++; + } + readTimer.end("Time to read %d bytes", len); + bandwidth(readTimer, count); + assertEquals("Not enough bytes were read)", len, count); + long nanosPerByte = readTimer.nanosPerByte(count); + LOG.info("An open() call has the equivalent duration of reading {} bytes", + toHuman( timeOpen.duration() / nanosPerByte)); + } + + @Test + public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { + requireCSVTestData(); + describe("Open the test file %s and read it in blocks of size %d", + testData, BLOCK_SIZE); + long len = testDataStatus.getLen(); + in = openTestFile(); + byte[] block = new byte[BLOCK_SIZE]; + NanoTimer timer2 = new NanoTimer(); + long count = 0; + // implicitly rounding down here + long blockCount = len / BLOCK_SIZE; + for (long i = 0; i < blockCount; i++) { + int offset = 0; + int remaining = BLOCK_SIZE; + NanoTimer blockTimer = new NanoTimer(); + int reads = 0; + while (remaining > 0) { + int bytesRead = in.read(block, offset, remaining); + reads ++; + if (bytesRead == 1) { + break; + } + remaining -= bytesRead; + offset += bytesRead; + count += bytesRead; + } + blockTimer.end("Reading block %d in %d reads", i, reads); + } + timer2.end("Time to read %d bytes in %d blocks", len, blockCount ); + bandwidth(timer2, count); + LOG.info("{}", streamStatistics); + } + + @Test + public void testLazySeekEnabled() throws Throwable { + requireCSVTestData(); + describe("Verify that seeks do not trigger any IO"); + long len = testDataStatus.getLen(); + in = openTestFile(); + NanoTimer timer = new NanoTimer(); + long blockCount = len / BLOCK_SIZE; + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + BLOCK_SIZE - 1); + } + in.seek(0); + blockCount++; + timer.end("Time to execute %d seeks", blockCount); + logTimePerIOP(timer, blockCount); + LOG.info("{}", streamStatistics); + assertOpenOperationCount(0); + assertEquals("bytes read", 0, streamStatistics.bytesRead); + } + + @Test + public void testReadAheadDefault() throws Throwable { + requireCSVTestData(); + describe("Verify that a series of forward skips within the readahead" + + " range do not close and reopen the stream"); + executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE); + assertStreamOpenedExactlyOnce(); + } + + @Test + public void testReadaheadOutOfRange() throws Throwable { + requireCSVTestData(); + try { + in = openTestFile(); + in.setReadahead(-1L); + fail("Stream should have rejected the request "+ in); + } catch (IllegalArgumentException e) { + // expected + } + + } + + @Test + public void testReadBigBlocksAvailableReadahead() throws Throwable { + requireCSVTestData(); + describe("set readahead to available bytes only"); + executeSeekReadSequence(BIG_BLOCK_SIZE, 0); + // expect that the stream will have had lots of opens + assertTrue("not enough open operations in " + streamStatistics, + streamStatistics.openOperations > 1); + } + + @Test + public void testReadBigBlocksBigReadahead() throws Throwable { + requireCSVTestData(); + describe("Read big blocks with a big readahead"); + executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2); + assertStreamOpenedExactlyOnce(); + } + + /** + * Execute a seek+read sequence + * @param blockSize block size for seeks + * @param readahead what the readahead value of the stream should be + * @throws IOException IO problems + */ + protected void executeSeekReadSequence(long blockSize, + long readahead) throws IOException { + requireCSVTestData(); + long len = testDataStatus.getLen(); + in = openTestFile(); + in.setReadahead(readahead); + NanoTimer timer = new NanoTimer(); + long blockCount = len / blockSize; + LOG.info("Reading {} blocks, readahead = {}", + blockCount, readahead); + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + blockSize - 1); + // this is the read + assertTrue(in.read() >= 0); + } + timer.end("Time to execute %d seeks of distance %d with readahead = %d", + blockCount, + blockSize, + readahead); + logTimePerIOP(timer, blockCount); + LOG.info("Effective bandwidth {} MB/S", + timer.bandwidthDescription(streamStatistics.bytesRead - + streamStatistics.bytesSkippedOnSeek)); + LOG.info("{}", streamStatistics); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index ced0687caad..bc85425aed8 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -16,3 +16,6 @@ log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +# for debugging low level S3a operations, uncomment this line +# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG