HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel

patch includes
HADOOP-12844 Recover when S3A fails on IOException in read()
HADOOP-13058 S3A FS fails during init against a read-only FS if multipart purge
HADOOP-13047 S3a Forward seek in stream length to be configurable
This commit is contained in:
Steve Loughran 2016-05-12 19:23:18 +01:00
parent eab6e633ee
commit 1370dfc775
21 changed files with 1768 additions and 677 deletions

View File

@ -234,4 +234,13 @@ public class FSDataInputStream extends DataInputStream
"support unbuffering.");
}
}
/**
* String value. Includes the string value of the inner stream
* @return the stream
*/
@Override
public String toString() {
return super.toString() + ": " + in;
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Build a string dump of the metrics.
*
* The {@link #toString()} operator dumps out all values collected.
*
* Every entry is formatted as
* {@code prefix + name + separator + value + suffix}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MetricStringBuilder extends MetricsRecordBuilder {
private final StringBuilder builder = new StringBuilder(256);
private final String prefix;
private final String suffix;
private final String separator;
private final MetricsCollector parent;
/**
* Build an instance.
* @param parent parent collector. Unused in this instance; only used for
* the {@link #parent()} method
* @param prefix string before each entry
* @param separator separator between name and value
* @param suffix suffix after each entry
*/
public MetricStringBuilder(MetricsCollector parent,
String prefix,
String separator,
String suffix) {
this.parent = parent;
this.prefix = prefix;
this.suffix = suffix;
this.separator = separator;
}
public MetricStringBuilder add(MetricsInfo info, Object value) {
return tuple(info.name(), value.toString());
}
/**
* Add any key,val pair to the string, between the prefix and suffix,
* separated by the separator.
* @param key key
* @param value value
* @return this instance
*/
public MetricStringBuilder tuple(String key, String value) {
builder.append(prefix)
.append(key)
.append(separator)
.append(value)
.append(suffix);
return this;
}
@Override
public MetricsRecordBuilder tag(MetricsInfo info, String value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder add(MetricsTag tag) {
return tuple(tag.name(), tag.value());
}
@Override
public MetricsRecordBuilder add(AbstractMetric metric) {
add(metric.info(), metric.toString());
return this;
}
@Override
public MetricsRecordBuilder setContext(String value) {
return tuple("context", value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
return add(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
return add(info, value);
}
@Override
public MetricsCollector parent() {
return parent;
}
@Override
public String toString() {
return builder.toString();
}
}

View File

@ -34,7 +34,7 @@ public class MutableCounterLong extends MutableCounter {
private AtomicLong value = new AtomicLong();
MutableCounterLong(MetricsInfo info, long initValue) {
public MutableCounterLong(MetricsInfo info, long initValue) {
super(info);
this.value.set(initValue);
}

View File

@ -937,7 +937,15 @@
uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
</property>
<property>
<property>
<name>fs.s3a.readahead.range</name>
<value>65536</value>
<description>Bytes to read ahead during a seek() before closing and
re-opening the S3 HTTP connection. This option will be overridden if
any call to setReadahead() is made to an open stream.</description>
</property>
<property>
<name>fs.s3a.fast.buffer.size</name>
<value>1048576</value>
<description>Size of initial memory buffer in bytes allocated for an

View File

@ -15,361 +15,8 @@
limitations under the License.
-->
<FindBugsFilter>
<Match>
<Package name="org.apache.hadoop.security.proto" />
</Match>
<Match>
<Package name="org.apache.hadoop.tools.proto" />
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="~.*_jsp" />
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<Match>
<Field name="_jspx_dependants" />
<Bug pattern="UWF_UNWRITTEN_FIELD" />
</Match>
<!--
Inconsistent synchronization for Client.Connection.out is
is intentional to make a connection to be closed instantly.
-->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="out" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
Further SaslException should be ignored during cleanup and
original exception should be re-thrown.
-->
<Match>
<Class name="org.apache.hadoop.security.SaslRpcClient" />
<Bug pattern="DE_MIGHT_IGNORE" />
</Match>
<!--
Ignore Cross Scripting Vulnerabilities
-->
<Match>
<Package name="~org.apache.hadoop.mapred.*" />
<Bug code="XSS" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.taskdetails_jsp" />
<Bug code="HRS" />
</Match>
<!--
Ignore warnings where child class has the same name as
super class. Classes based on Old API shadow names from
new API. Should go off after HADOOP-1.0
-->
<Match>
<Class name="~org.apache.hadoop.mapred.*" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
</Match>
<!--
Ignore warnings for usage of System.exit. This is
required and have been well thought out
-->
<Match>
<Class name="org.apache.hadoop.mapred.Child$2" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.JobTracker" />
<Method name="addHostToNodeMapping" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.Task" />
<Or>
<Method name="done" />
<Method name="commit" />
<Method name="statusUpdate" />
</Or>
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.ProgramDriver" />
<Method name="driver" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.RunJar" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<!--
We need to cast objects between old and new api objects
-->
<Match>
<Class name="org.apache.hadoop.mapred.OutputCommitter" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!--
We intentionally do the get name from the inner class
-->
<Match>
<Class name="org.apache.hadoop.mapred.TaskTracker$MapEventsFetcherThread" />
<Method name="run" />
<Bug pattern="IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
<!--
Ignoring this warning as resolving this would need a non-trivial change in code
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor" />
<Method name="configure" />
<Field name="maxNumItems" />
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
</Match>
<!--
Comes from org.apache.jasper.runtime.ResourceInjector. Cannot do much.
-->
<Match>
<Class name="org.apache.hadoop.mapred.jobqueue_005fdetails_jsp" />
<Field name="_jspx_resourceInjector" />
<Bug pattern="SE_BAD_FIELD" />
</Match>
<!--
Storing textInputFormat and then passing it as a parameter. Safe to ignore.
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob" />
<Method name="createValueAggregatorJob" />
<Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
</Match>
<!--
Can remove this after the upgrade to findbugs1.3.8
-->
<Match>
<Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat" />
<Method name="getSplits" />
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<!--
This is a spurious warning. Just ignore
-->
<Match>
<Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
<Field name="kvindex" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
core changes
-->
<Match>
<Class name="~org.apache.hadoop.*" />
<Bug code="MS" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.FileSystem" />
<Method name="checkPath" />
<Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" />
</Match>
<Match>
<Class name="org.apache.hadoop.io.Closeable" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
<Match>
<Class name="org.apache.hadoop.security.AccessControlException" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
</Match>
<Match>
<Class name="org.apache.hadoop.util.ProcfsBasedProcessTree" />
<Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
</Match>
<!--
Streaming, Examples
-->
<Match>
<Class name="org.apache.hadoop.streaming.StreamUtil$TaskId" />
<Bug pattern="URF_UNREAD_FIELD" />
</Match>
<Match>
<Class name="org.apache.hadoop.examples.DBCountPageView" />
<Method name="verify" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<Match>
<Class name="org.apache.hadoop.examples.ContextFactory" />
<Method name="setAttributes" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!--
TFile
-->
<Match>
<Class name="org.apache.hadoop.io.file.tfile.Chunk$ChunkDecoder" />
<Method name="close" />
<Bug pattern="SR_NOT_CHECKED" />
</Match>
<!--
The purpose of skip() is to drain remaining bytes of the chunk-encoded
stream (one chunk at a time). The termination condition is checked by
checkEOF().
-->
<Match>
<Class name="org.apache.hadoop.io.file.tfile.Utils" />
<Method name="writeVLong" />
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
<!--
The switch condition fall through is intentional and for performance
purposes.
-->
<Match>
<Class name="org.apache.hadoop.log.EventCounter"/>
<!-- backward compatibility -->
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<Match>
<Class name="org.apache.hadoop.metrics.jvm.EventCounter"/>
<!-- backward compatibility -->
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.security\.proto\.SecurityProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
</Match>
<!--
Manually checked, misses child thread manually syncing on parent's intrinsic lock.
-->
<Match>
<Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" />
<Field name="previousSnapshot" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The method uses a generic type T that extends two other types
T1 and T2. Findbugs complains of a cast from T1 to T2.
-->
<Match>
<Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
<Method name="removeRenewAction" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Inconsistent synchronization flagged by findbugs is not valid. -->
<Match>
<Class name="org.apache.hadoop.ipc.Client$Connection" />
<Field name="in" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
The switch condition for INITIATE is expected to fallthru to RESPONSE
to process initial sasl response token included in the INITIATE
-->
<Match>
<Class name="org.apache.hadoop.ipc.Server$Connection" />
<Method name="processSaslMessage" />
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
<!-- Synchronization performed on util.concurrent instance. -->
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="stop" />
<Bug code="JLM" />
</Match>
<Match>
<Class name="org.apache.hadoop.service.AbstractService" />
<Method name="waitForServiceToStop" />
<Bug code="JLM" />
</Match>
<!--
OpenStack Swift FS module -closes streams in a different method
from where they are opened.
-->
<Match>
<Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
<Method name="uploadFileAttempt"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
</Match>
<Match>
<Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
<Method name="uploadFilePartAttempt"/>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
</Match>
<!-- code from maven source, null value is checked at callee side. -->
<Match>
<Class name="org.apache.hadoop.util.ComparableVersion$ListItem" />
<Method name="compareTo" />
<Bug code="NP" />
</Match>
<!-- S3n warnings about malicious code aren't that relevant given its limited future. -->
<Match>
<Class name="org.apache.hadoop.util.HttpExceptionUtils"/>
<Method name="validateResponse"/>
<Bug pattern="REC_CATCH_EXCEPTION"/>
<Class name="org.apache.hadoop.fs.s3.INode" />
</Match>
</FindBugsFilter>

View File

@ -55,13 +55,13 @@ public interface FileSystemStore {
/**
* Delete everything. Used for testing.
* @throws IOException
* @throws IOException on any problem
*/
void purge() throws IOException;
/**
* Diagnostic method to dump all INodes to the console.
* @throws IOException
* @throws IOException on any problem
*/
void dump() throws IOException;
}

View File

@ -38,6 +38,8 @@ public class S3Credentials {
private String secretAccessKey;
/**
* @param uri bucket URI optionally containing username and password.
* @param conf configuration
* @throws IllegalArgumentException if credentials for S3 cannot be
* determined.
* @throws IOException if credential providers are misconfigured and we have

View File

@ -21,7 +21,11 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.AWSCredentials;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Stable
public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
public AWSCredentials getCredentials() {
return new AnonymousAWSCredentials();

View File

@ -23,7 +23,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentials;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Stable
public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
private final String accessKey;
private final String secretKey;

View File

@ -18,7 +18,19 @@
package org.apache.hadoop.fs.s3a;
public class Constants {
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* All the constants used with the {@link S3AFileSystem}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class Constants {
private Constants() {
}
// s3 access key
public static final String ACCESS_KEY = "fs.s3a.access.key";
@ -129,4 +141,8 @@ public class Constants {
public static final int S3A_DEFAULT_PORT = -1;
public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
}

View File

@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
@ -64,6 +65,7 @@ import java.util.concurrent.ThreadPoolExecutor;
* <p>
* Unstable: statistics and error handling might evolve
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class S3AFastOutputStream extends OutputStream {
@ -102,7 +104,8 @@ public class S3AFastOutputStream extends OutputStream {
* @param partSize size of a single part in a multi-part upload (except
* last part)
* @param multiPartThreshold files at least this size use multi-part upload
* @throws IOException
* @param threadPoolExecutor thread factory
* @throws IOException on any problem
*/
public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
String bucket, String key, Progressable progress,
@ -159,7 +162,7 @@ public class S3AFastOutputStream extends OutputStream {
* Writes a byte to the memory buffer. If this causes the buffer to reach
* its limit, the actual upload is submitted to the threadpool.
* @param b the int of which the lowest byte is written
* @throws IOException
* @throws IOException on any problem
*/
@Override
public synchronized void write(int b) throws IOException {
@ -177,10 +180,10 @@ public class S3AFastOutputStream extends OutputStream {
* @param b byte array containing
* @param off offset in array where to start
* @param len number of bytes to be written
* @throws IOException
* @throws IOException on any problem
*/
@Override
public synchronized void write(byte b[], int off, int len)
public synchronized void write(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();

View File

@ -17,9 +17,19 @@
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
/**
* File status for an S3A "file".
* Modification time is trouble, see {@link #getModificationTime()}.
*
* The subclass is private as it should not be created directly.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileStatus extends FileStatus {
private boolean isEmptyDirectory;
@ -45,7 +55,7 @@ public class S3AFileStatus extends FileStatus {
return System.getProperty("user.name");
}
/** Compare if this object is equal to another object
/** Compare if this object is equal to another object.
* @param o the object to be compared.
* @return true if two file status has the same path name; false if not.
*/

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
@ -59,8 +60,11 @@ import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -79,9 +83,24 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The core S3A Filesystem implementation.
*
* This subclass is marked as private as code should not be creating it
* directly; use {@link FileSystem#get(Configuration)} and variants to
* create one.
*
* If cast to {@code S3AFileSystem}, extra methods and features may be accessed.
* Consider those private and unstable.
*
* Because it prints some of the state of the instrumentation,
* the output of {@link #toString()} must also be considered unstable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem {
/**
* Default blocksize as used in blocksize and FS status queries
* Default blocksize as used in blocksize and FS status queries.
*/
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri;
@ -97,11 +116,14 @@ public class S3AFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
private long readAhead;
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
@ -110,17 +132,19 @@ public class S3AFileSystem extends FileSystem {
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
final ThreadGroup threadGroup = (s != null)
? s.getThreadGroup()
: Thread.currentThread().getThreadGroup();
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNum = poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;
private final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
final String name = String.format("%s-pool%03d-t%04d",
prefix, poolNum, threadNumber.getAndIncrement());
return new Thread(group, r, name);
}
};
@ -157,10 +181,12 @@ public class S3AFileSystem extends FileSystem {
*/
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
setConf(conf);
instrumentation = new S3AInstrumentation(name);
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
this.getWorkingDirectory());
workingDir = new Path("/user", System.getProperty("user.name"))
.makeQualified(this.uri, this.getWorkingDirectory());
AWSAccessKeys creds = getAWSAccessKeys(name, conf);
@ -174,19 +200,20 @@ public class S3AFileSystem extends FileSystem {
bucket = name.getHost();
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS));
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES));
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT));
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT));
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if(!signerOverride.isEmpty()) {
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
@ -196,34 +223,38 @@ public class S3AFileSystem extends FileSystem {
initAmazonS3Client(conf, credentials, awsConf);
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
if (partSize < 5 * 1024 * 1024) {
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
}
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
multiPartThreshold = 5 * 1024 * 1024;
}
//check but do not store the block size
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
new LinkedBlockingQueue<>(maxThreads *
intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
threadPoolExecutor = new ThreadPoolExecutor(
coreThreads,
maxThreads,
@ -238,19 +269,17 @@ public class S3AFileSystem extends FileSystem {
initCannedAcls(conf);
if (!s3.doesBucketExist(bucket)) {
throw new IOException("Bucket " + bucket + " does not exist");
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
}
initMultipartUploads(conf);
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
setConf(conf);
}
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
boolean secureConnections) throws IllegalArgumentException,
IllegalArgumentException {
boolean secureConnections) throws IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
@ -281,7 +310,8 @@ public class S3AFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPort(),
String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
@ -316,7 +346,7 @@ public class S3AFileSystem extends FileSystem {
AWSCredentialsProviderChain credentials, ClientConfiguration awsConf)
throws IllegalArgumentException {
s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT,"");
String endPoint = conf.getTrimmed(ENDPOINT, "");
if (!endPoint.isEmpty()) {
try {
s3.setEndpoint(endPoint);
@ -359,14 +389,25 @@ public class S3AFileSystem extends FileSystem {
private void initMultipartUploads(Configuration conf) {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = longOption(conf,
PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
if (purgeExistingMultipart) {
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
Date purgeBefore =
new Date(new Date().getTime() - purgeExistingMultipartAge * 1000);
transfers.abortMultipartUploads(bucket, purgeBefore);
try {
transfers.abortMultipartUploads(bucket, purgeBefore);
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 403) {
instrumentation.errorIgnored();
LOG.debug("Failed to abort multipart uploads against {}," +
" FS may be read only", bucket, e);
} else {
throw e;
}
}
}
}
@ -479,16 +520,15 @@ public class S3AFileSystem extends FileSystem {
public FSDataInputStream open(Path f, int bufferSize)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening '{}' for reading.", f);
}
LOG.debug("Opening '{}' for reading.", f);
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
throw new FileNotFoundException("Can't open " + f
+ " because it is a directory");
}
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics));
fileStatus.getLen(), s3, statistics, instrumentation, readAhead));
}
/**
@ -514,16 +554,26 @@ public class S3AFileSystem extends FileSystem {
if (!overwrite && exists(f)) {
throw new FileAlreadyExistsException(f + " already exists");
}
instrumentation.fileCreated();
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
key, progress, statistics, cannedACL,
serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
threadPoolExecutor), statistics);
}
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
bucket, key, progress, cannedACL, statistics,
serverSideEncryptionAlgorithm), null);
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return new FSDataOutputStream(
new S3AOutputStream(getConf(),
transfers,
this,
bucket,
key,
progress,
cannedACL,
statistics,
serverSideEncryptionAlgorithm),
null);
}
/**
@ -534,7 +584,7 @@ public class S3AFileSystem extends FileSystem {
* @throws IOException indicating that append is not supported.
*/
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@ -559,17 +609,13 @@ public class S3AFileSystem extends FileSystem {
* @return true if rename is successful
*/
public boolean rename(Path src, Path dst) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Rename path {} to {}", src, dst);
}
LOG.debug("Rename path {} to {}", src, dst);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
if (srcKey.isEmpty() || dstKey.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: src or dst are empty");
}
LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey);
return false;
}
@ -582,9 +628,8 @@ public class S3AFileSystem extends FileSystem {
}
if (srcKey.equals(dstKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: src and dst refer to the same file or directory");
}
LOG.debug("rename: src and dst refer to the same file or directory: {}",
dst);
return srcStatus.isFile();
}
@ -593,9 +638,8 @@ public class S3AFileSystem extends FileSystem {
dstStatus = getFileStatus(dst);
if (srcStatus.isDirectory() && dstStatus.isFile()) {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: src is a directory and dst is a file");
}
LOG.debug("rename: src {} is a directory and dst {} is a file",
src, dst);
return false;
}
@ -603,6 +647,7 @@ public class S3AFileSystem extends FileSystem {
return false;
}
} catch (FileNotFoundException e) {
LOG.debug("rename: destination path {} not found", dst);
// Parent must exist
Path parent = dst.getParent();
if (!pathToKey(parent).isEmpty()) {
@ -612,6 +657,8 @@ public class S3AFileSystem extends FileSystem {
return false;
}
} catch (FileNotFoundException e2) {
LOG.debug("rename: destination path {} has no parent {}",
dst, parent);
return false;
}
}
@ -619,9 +666,7 @@ public class S3AFileSystem extends FileSystem {
// Ok! Time to start
if (srcStatus.isFile()) {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: renaming file " + src + " to " + dst);
}
LOG.debug("rename: renaming file {} to {}", src, dst);
if (dstStatus != null && dstStatus.isDirectory()) {
String newDstKey = dstKey;
if (!newDstKey.endsWith("/")) {
@ -630,15 +675,13 @@ public class S3AFileSystem extends FileSystem {
String filename =
srcKey.substring(pathToKey(src.getParent()).length()+1);
newDstKey = newDstKey + filename;
copyFile(srcKey, newDstKey);
copyFile(srcKey, newDstKey, srcStatus.getLen());
} else {
copyFile(srcKey, dstKey);
copyFile(srcKey, dstKey, srcStatus.getLen());
}
delete(src, false);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: renaming directory " + src + " to " + dst);
}
LOG.debug("rename: renaming directory {} to {}", src, dst);
// This is a directory to directory copy
if (!dstKey.endsWith("/")) {
@ -651,14 +694,12 @@ public class S3AFileSystem extends FileSystem {
//Verify dest is not a child of the source directory
if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot rename a directory to a subdirectory of self");
}
LOG.debug("cannot rename a directory {}" +
" to a subdirectory of self: {}", srcKey, dstKey);
return false;
}
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
new ArrayList<>();
List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
// delete unnecessary fake directory.
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
@ -676,7 +717,7 @@ public class S3AFileSystem extends FileSystem {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
copyFile(summary.getKey(), newDstKey);
copyFile(summary.getKey(), newDstKey, summary.getSize());
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
removeKeys(keysToDelete, true);
@ -715,6 +756,7 @@ public class S3AFileSystem extends FileSystem {
DeleteObjectsRequest deleteRequest
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
s3.deleteObjects(deleteRequest);
instrumentation.fileDeleted(keysToDelete.size());
statistics.incrementWriteOps(1);
} else {
int writeops = 0;
@ -724,7 +766,7 @@ public class S3AFileSystem extends FileSystem {
new DeleteObjectRequest(bucket, keyVersion.getKey()));
writeops++;
}
instrumentation.fileDeleted(keysToDelete.size());
statistics.incrementWriteOps(writeops);
}
if (clearKeys) {
@ -742,25 +784,20 @@ public class S3AFileSystem extends FileSystem {
* @throws IOException due to inability to delete a directory or file.
*/
public boolean delete(Path f, boolean recursive) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Delete path " + f + " - recursive " + recursive);
}
LOG.debug("Delete path {} - recursive {}", f , recursive);
S3AFileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't delete " + f + " - does not exist");
}
LOG.debug("Couldn't delete {} - does not exist", f);
instrumentation.errorIgnored();
return false;
}
String key = pathToKey(f);
if (status.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("delete: Path is a directory");
}
LOG.debug("delete: Path is a directory: {}", f);
if (!recursive && !status.isEmptyDirectory()) {
throw new IOException("Path is a folder: " + f +
@ -777,15 +814,12 @@ public class S3AFileSystem extends FileSystem {
}
if (status.isEmptyDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting fake empty directory");
}
LOG.debug("Deleting fake empty directory {}", key);
s3.deleteObject(bucket, key);
instrumentation.directoryDeleted();
statistics.incrementWriteOps(1);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting objects for directory prefix " + key + " to delete");
}
LOG.debug("Getting objects for directory prefix {} to delete", key);
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
@ -794,16 +828,13 @@ public class S3AFileSystem extends FileSystem {
//request.setDelimiter("/");
request.setMaxKeys(maxKeys);
List<DeleteObjectsRequest.KeyVersion> keys =
new ArrayList<>();
List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
if (LOG.isDebugEnabled()) {
LOG.debug("Got object to delete " + summary.getKey());
}
LOG.debug("Got object to delete {}", summary.getKey());
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
removeKeys(keys, true);
@ -822,10 +853,9 @@ public class S3AFileSystem extends FileSystem {
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("delete: Path is a file");
}
LOG.debug("delete: Path is a file");
s3.deleteObject(bucket, key);
instrumentation.fileDeleted(1);
statistics.incrementWriteOps(1);
}
@ -837,9 +867,7 @@ public class S3AFileSystem extends FileSystem {
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (!key.isEmpty() && !exists(f)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new fake directory at " + f);
}
LOG.debug("Creating new fake directory at {}", f);
createFakeDirectory(bucket, key);
}
}
@ -856,9 +884,7 @@ public class S3AFileSystem extends FileSystem {
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
String key = pathToKey(f);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + f);
}
LOG.debug("List status for path: {}", f);
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
@ -874,9 +900,7 @@ public class S3AFileSystem extends FileSystem {
request.setDelimiter("/");
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
LOG.debug("listStatus: doing listObjects for directory {}", key);
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
@ -889,24 +913,18 @@ public class S3AFileSystem extends FileSystem {
// Skip over keys that are ourselves and old S3N _$folder$ files
if (keyPath.equals(fQualified) ||
summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
LOG.debug("Ignoring: {}", keyPath);
continue;
}
if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
result.add(new S3AFileStatus(true, true, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fd: " + keyPath);
}
LOG.debug("Adding: fd: {}", keyPath);
} else {
result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(fQualified)));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
LOG.debug("Adding: fi: {}", keyPath);
}
}
@ -916,16 +934,11 @@ public class S3AFileSystem extends FileSystem {
continue;
}
result.add(new S3AFileStatus(true, false, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
LOG.debug("Adding: rd: {}", keyPath);
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
LOG.debug("listStatus: list truncated - getting next batch");
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
} else {
@ -933,9 +946,7 @@ public class S3AFileSystem extends FileSystem {
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + f);
}
LOG.debug("Adding: rd (not a dir): {}", f);
result.add(fileStatus);
}
@ -948,14 +959,14 @@ public class S3AFileSystem extends FileSystem {
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param new_dir the current working directory.
* @param newDir the current working directory.
*/
public void setWorkingDirectory(Path new_dir) {
workingDir = new_dir;
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
/**
* Get the current working directory for the given file system
* Get the current working directory for the given file system.
* @return the directory pathname
*/
public Path getWorkingDirectory() {
@ -972,10 +983,7 @@ public class S3AFileSystem extends FileSystem {
// TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Making directory: " + f);
}
LOG.debug("Making directory: {}", f);
try {
FileStatus fileStatus = getFileStatus(f);
@ -996,6 +1004,7 @@ public class S3AFileSystem extends FileSystem {
fPart));
}
} catch (FileNotFoundException fnfe) {
instrumentation.errorIgnored();
}
fPart = fPart.getParent();
} while (fPart != null);
@ -1015,10 +1024,7 @@ public class S3AFileSystem extends FileSystem {
*/
public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f);
if (LOG.isDebugEnabled()) {
LOG.debug("Getting path status for " + f + " (" + key + ")");
}
LOG.debug("Getting path status for {} ({})", f , key);
if (!key.isEmpty()) {
try {
@ -1026,15 +1032,11 @@ public class S3AFileSystem extends FileSystem {
statistics.incrementReadOps(1);
if (objectRepresentsDirectory(key, meta.getContentLength())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found exact file: fake directory");
}
LOG.debug("Found exact file: fake directory");
return new S3AFileStatus(true, true,
f.makeQualified(uri, workingDir));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Found exact file: normal file");
}
LOG.debug("Found exact file: normal file");
return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()),
f.makeQualified(uri, workingDir),
@ -1042,25 +1044,23 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(e);
printAmazonServiceException(f.toString(), e);
throw e;
}
} catch (AmazonClientException e) {
printAmazonClientException(e);
printAmazonClientException(f.toString(), e);
throw e;
}
// Necessary?
if (!key.endsWith("/")) {
String newKey = key + "/";
try {
String newKey = key + "/";
ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
statistics.incrementReadOps(1);
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found file (with /): fake directory");
}
LOG.debug("Found file (with /): fake directory");
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
} else {
LOG.warn("Found file (with /): real file? should not happen: {}", key);
@ -1072,11 +1072,11 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(e);
printAmazonServiceException(newKey, e);
throw e;
}
} catch (AmazonClientException e) {
printAmazonClientException(e);
printAmazonClientException(newKey, e);
throw e;
}
}
@ -1096,17 +1096,17 @@ public class S3AFileSystem extends FileSystem {
statistics.incrementReadOps(1);
if (!objects.getCommonPrefixes().isEmpty()
|| objects.getObjectSummaries().size() > 0) {
|| !objects.getObjectSummaries().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /): " +
objects.getCommonPrefixes().size() + "/" +
LOG.debug("Found path as directory (with /): {}/{}",
objects.getCommonPrefixes().size() ,
objects.getObjectSummaries().size());
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
}
for (String prefix : objects.getCommonPrefixes()) {
LOG.debug("Prefix: " + prefix);
LOG.debug("Prefix: {}", prefix);
}
}
@ -1118,17 +1118,15 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(e);
printAmazonServiceException(key, e);
throw e;
}
} catch (AmazonClientException e) {
printAmazonClientException(e);
printAmazonClientException(key, e);
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Not Found: " + f);
}
LOG.debug("Not Found: {}", f);
throw new FileNotFoundException("No such file or directory: " + f);
}
@ -1147,15 +1145,13 @@ public class S3AFileSystem extends FileSystem {
*/
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
Path dst) throws IOException {
String key = pathToKey(dst);
if (!overwrite && exists(dst)) {
throw new IOException(dst + " already exists");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Copying local file from " + src + " to " + dst);
throw new FileAlreadyExistsException(dst + " already exists");
}
LOG.debug("Copying local file from {} to {}", src, dst);
// Since we have a local file, we don't need to stream into a temporary file
LocalFileSystem local = getLocal(getConf());
@ -1181,13 +1177,14 @@ public class S3AFileSystem extends FileSystem {
}
};
statistics.incrementWriteOps(1);
Upload up = transfers.upload(putObjectRequest);
up.addProgressListener(progressListener);
try {
up.waitForUploadResult();
statistics.incrementWriteOps(1);
} catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling");
throw new InterruptedIOException("Interrupted copying " + src
+ " to " + dst + ", cancelling");
}
// This will delete unnecessary fake parent directories
@ -1211,7 +1208,7 @@ public class S3AFileSystem extends FileSystem {
}
/**
* Override getCononicalServiceName because we don't support token in S3A
* Override getCanonicalServiceName because we don't support token in S3A.
*/
@Override
public String getCanonicalServiceName() {
@ -1219,17 +1216,17 @@ public class S3AFileSystem extends FileSystem {
return null;
}
private void copyFile(String srcKey, String dstKey) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("copyFile " + srcKey + " -> " + dstKey);
}
private void copyFile(String srcKey, String dstKey, long size)
throws IOException {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
@ -1250,13 +1247,17 @@ public class S3AFileSystem extends FileSystem {
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
instrumentation.filesCopied(1, size);
} catch (InterruptedException e) {
throw new IOException("Got interrupted, cancelling");
throw new InterruptedIOException("Interrupted copying " + srcKey
+ " to " + dstKey + ", cancelling");
}
}
private boolean objectRepresentsDirectory(final String name, final long size) {
return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
return !name.isEmpty()
&& name.charAt(name.length() - 1) == '/'
&& size == 0L;
}
// Handles null Dates that can be returned by AWS
@ -1274,8 +1275,9 @@ public class S3AFileSystem extends FileSystem {
private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
while (true) {
String key = "";
try {
String key = pathToKey(f);
key = pathToKey(f);
if (key.isEmpty()) {
break;
}
@ -1283,13 +1285,13 @@ public class S3AFileSystem extends FileSystem {
S3AFileStatus status = getFileStatus(f);
if (status.isDirectory() && status.isEmptyDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting fake directory " + key + "/");
}
LOG.debug("Deleting fake directory {}/", key);
s3.deleteObject(bucket, key + "/");
statistics.incrementWriteOps(1);
}
} catch (FileNotFoundException | AmazonServiceException e) {
LOG.debug("While deleting key {} ", key, e);
instrumentation.errorIgnored();
}
if (f.isRoot()) {
@ -1325,10 +1327,12 @@ public class S3AFileSystem extends FileSystem {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucketName, objectName, im, om);
putObjectRequest.setCannedAcl(cannedACL);
s3.putObject(putObjectRequest);
statistics.incrementWriteOps(1);
instrumentation.directoryCreated();
}
/**
@ -1400,31 +1404,115 @@ public class S3AFileSystem extends FileSystem {
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time.
* be split into to minimize I/O time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
@Deprecated
public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
}
private void printAmazonServiceException(AmazonServiceException ase) {
LOG.info("Caught an AmazonServiceException, which means your request made it " +
"to Amazon S3, but was rejected with an error response for some reason.");
LOG.info("Error Message: " + ase.getMessage());
LOG.info("HTTP Status Code: " + ase.getStatusCode());
LOG.info("AWS Error Code: " + ase.getErrorCode());
LOG.info("Error Type: " + ase.getErrorType());
LOG.info("Request ID: " + ase.getRequestId());
LOG.info("Class Name: " + ase.getClass().getName());
private void printAmazonServiceException(String target,
AmazonServiceException ase) {
LOG.info("{}: caught an AmazonServiceException {}", target, ase);
LOG.info("This means your request made it to Amazon S3," +
" but was rejected with an error response for some reason.");
LOG.info("Error Message: {}", ase.getMessage());
LOG.info("HTTP Status Code: {}", ase.getStatusCode());
LOG.info("AWS Error Code: {}", ase.getErrorCode());
LOG.info("Error Type: {}", ase.getErrorType());
LOG.info("Request ID: {}", ase.getRequestId());
LOG.info("Class Name: {}", ase.getClass().getName());
LOG.info("Exception", ase);
}
private void printAmazonClientException(AmazonClientException ace) {
LOG.info("Caught an AmazonClientException, which means the client encountered " +
"a serious internal problem while trying to communicate with S3, " +
"such as not being able to access the network.");
LOG.info("Error Message: {}" + ace, ace);
private void printAmazonClientException(String target,
AmazonClientException ace) {
LOG.info("{}: caught an AmazonClientException {}", target, ace);
LOG.info("This means the client encountered " +
"a problem while trying to communicate with S3, " +
"such as not being able to access the network.", ace);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"S3AFileSystem{");
sb.append("uri=").append(uri);
sb.append(", workingDir=").append(workingDir);
sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
sb.append(", maxKeys=").append(maxKeys);
sb.append(", cannedACL=").append(cannedACL.toString());
sb.append(", readAhead=").append(readAhead);
sb.append(", blockSize=").append(getDefaultBlockSize());
sb.append(", multiPartThreshold=").append(multiPartThreshold);
if (serverSideEncryptionAlgorithm != null) {
sb.append(", serverSideEncryptionAlgorithm='")
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
sb.append(", statistics {")
.append(statistics.toString())
.append("}");
sb.append(", metrics {")
.append(instrumentation.dump("{", "=", "} ", true))
.append("}");
sb.append('}');
return sb.toString();
}
/**
* Get the partition size for multipart operations.
* @return the value as set during initialization
*/
public long getPartitionSize() {
return partSize;
}
/**
* Get the threshold for multipart files
* @return the value as set during initialization
*/
public long getMultiPartThreshold() {
return multiPartThreshold;
}
/**
* Get a integer option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static int intOption(Configuration conf, String key, int defVal, int min) {
int v = conf.getInt(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
/**
* Get a long option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static long longOption(Configuration conf,
String key,
long defVal,
long min) {
long v = conf.getLong(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
/**

View File

@ -21,20 +21,50 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.SocketException;
public class S3AInputStream extends FSInputStream {
/**
* The input stream for an S3A object.
*
* As this stream seeks withing an object, it may close then re-open the stream.
* When this happens, any updated stream data may be retrieved, and, given
* the consistency model of Amazon S3, outdated data may in fact be picked up.
*
* As a result, the outcome of reading from a stream of an object which is
* actively manipulated during the read process is "undefined".
*
* The class is marked as private as code should not be creating instances
* themselves. Any extra feature (e.g instrumentation) should be considered
* unstable.
*
* Because it prints some of the state of the instrumentation,
* the output of {@link #toString()} must also be considered unstable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInputStream extends FSInputStream implements CanSetReadahead {
/**
* This is the public position; the one set in {@link #seek(long)}
* and returned in {@link #getPos()}.
*/
private long pos;
private boolean closed;
/**
* Closed bit. Volatile so reads are non-blocking.
* Updates must be in a synchronized block to guarantee an atomic check and
* set
*/
private volatile boolean closed;
private S3ObjectInputStream wrappedStream;
private final FileSystem.Statistics stats;
private final AmazonS3Client client;
@ -44,62 +74,65 @@ public class S3AInputStream extends FSInputStream {
private final String uri;
public static final Logger LOG = S3AFileSystem.LOG;
public static final long CLOSE_THRESHOLD = 4096;
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
private long readahead;
// Used by lazy seek
/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
*/
private long nextReadPos;
//Amount of data requested from the request
/* Amount of data desired from the request */
private long requestedStreamLen;
public S3AInputStream(String bucket, String key, long contentLength,
AmazonS3Client client, FileSystem.Statistics stats) {
public S3AInputStream(String bucket,
String key,
long contentLength,
AmazonS3Client client,
FileSystem.Statistics stats,
S3AInstrumentation instrumentation,
long readahead) {
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
this.bucket = bucket;
this.key = key;
this.contentLength = contentLength;
this.client = client;
this.stats = stats;
this.pos = 0;
this.nextReadPos = 0;
this.closed = false;
this.wrappedStream = null;
this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = instrumentation.newInputStreamStatistics();
setReadahead(readahead);
}
/**
* Opens up the stream at specified target position and for given length.
*
* @param reason reason for reopen
* @param targetPos target position
* @param length length requested
* @throws IOException
*/
private synchronized void reopen(long targetPos, long length)
private synchronized void reopen(String reason, long targetPos, long length)
throws IOException {
requestedStreamLen = (length < 0) ? this.contentLength :
Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length)));
requestedStreamLen = this.contentLength;
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing the previous stream");
}
closeStream(requestedStreamLen);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requesting for "
+ "targetPos=" + targetPos
+ ", length=" + length
+ ", requestedStreamLen=" + requestedStreamLen
+ ", streamPosition=" + pos
+ ", nextReadPosition=" + nextReadPos
);
closeStream("reopen(" + reason + ")", requestedStreamLen);
}
LOG.debug("reopen({}) for {} at targetPos={}, length={}," +
" requestedStreamLen={}, streamPosition={}, nextReadPosition={}",
uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
streamStatistics.streamOpened();
GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, requestedStreamLen);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri);
}
this.pos = targetPos;
@ -128,6 +161,20 @@ public class S3AInputStream extends FSInputStream {
nextReadPos = targetPos;
}
/**
* Seek without raising any exception. This is for use in
* {@code finally} clauses
* @param positiveTargetPos a target position which must be positive.
*/
private void seekQuietly(long positiveTargetPos) {
try {
seek(positiveTargetPos);
} catch (IOException ioe) {
LOG.debug("Ignoring IOE on seek of {} to {}",
uri, positiveTargetPos, ioe);
}
}
/**
* Adjust the stream to a specific position.
*
@ -140,23 +187,50 @@ public class S3AInputStream extends FSInputStream {
if (wrappedStream == null) {
return;
}
// compute how much more to skip
long diff = targetPos - pos;
if (targetPos > pos) {
if ((diff + length) <= wrappedStream.available()) {
// already available in buffer
pos += wrappedStream.skip(diff);
if (pos != targetPos) {
throw new IOException("Failed to seek to " + targetPos
+ ". Current position " + pos);
if (diff > 0) {
// forward seek -this is where data can be skipped
int available = wrappedStream.available();
// always seek at least as far as what is available
long forwardSeekRange = Math.max(readahead, available);
// work out how much is actually left in the stream
// then choose whichever comes first: the range or the EOF
long forwardSeekLimit = Math.min(remaining(), forwardSeekRange);
if (diff <= forwardSeekLimit) {
// the forward seek range is within the limits
LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
streamStatistics.seekForwards(diff);
long skipped = wrappedStream.skip(diff);
if (skipped > 0) {
pos += skipped;
// as these bytes have been read, they are included in the counter
incrementBytesRead(diff);
}
if (pos == targetPos) {
// all is well
return;
} else {
// log a warning; continue to attempt to re-open
LOG.warn("Failed to seek on {} to {}. Current position {}",
uri, targetPos, pos);
}
return;
}
} else if (diff < 0) {
// backwards seek
streamStatistics.seekBackwards(diff);
} else {
// targetPos == pos
// this should never happen as the caller filters it out.
// Retained just in case
LOG.debug("Ignoring seek {} to {} as target position == current",
uri, targetPos);
}
// close the stream; if read the object will be opened at the new pos
closeStream(this.requestedStreamLen);
closeStream("seekInStream()", this.requestedStreamLen);
pos = targetPos;
}
@ -179,7 +253,19 @@ public class S3AInputStream extends FSInputStream {
//re-open at specific location if needed
if (wrappedStream == null) {
reopen(targetPos, len);
reopen("read from new offset", targetPos, len);
}
}
/**
* Increment the bytes read counter if there is a stats instance
* and the number of bytes read is more than zero.
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
streamStatistics.bytesRead(bytesRead);
if (stats != null && bytesRead > 0) {
stats.incrementBytesRead(bytesRead);
}
}
@ -195,13 +281,11 @@ public class S3AInputStream extends FSInputStream {
int byteRead;
try {
byteRead = wrappedStream.read();
} catch (SocketTimeoutException | SocketException e) {
LOG.info("Got exception while trying to read from stream,"
+ " trying to recover " + e);
reopen(pos, 1);
byteRead = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (IOException e) {
onReadFailure(e, 1);
byteRead = wrappedStream.read();
}
if (byteRead >= 0) {
@ -209,12 +293,36 @@ public class S3AInputStream extends FSInputStream {
nextReadPos++;
}
if (stats != null && byteRead >= 0) {
stats.incrementBytesRead(1);
if (byteRead >= 0) {
incrementBytesRead(1);
}
return byteRead;
}
/**
* Handle an IOE on a read by attempting to re-open the stream.
* The filesystem's readException count will be incremented.
* @param ioe exception caught.
* @param length length of data being attempted to read
* @throws IOException any exception thrown on the re-open attempt.
*/
private void onReadFailure(IOException ioe, int length) throws IOException {
LOG.info("Got exception while trying to read from stream {}"
+ " trying to recover: "+ ioe, uri);
LOG.debug("While trying to read from stream {}", uri, ioe);
streamStatistics.readException();
reopen("failure recovery", pos, length);
}
/**
* {@inheritDoc}
*
* This updates the statistics on read operations started and whether
* or not the read operation "completed", that is: returned the exact
* number of bytes requested.
* @throws EOFException if there is no more data
* @throws IOException if there are other problems
*/
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
@ -230,61 +338,85 @@ public class S3AInputStream extends FSInputStream {
}
lazySeek(nextReadPos, len);
streamStatistics.readOperationStarted(nextReadPos, len);
int byteRead;
int bytesRead;
try {
byteRead = wrappedStream.read(buf, off, len);
} catch (SocketTimeoutException | SocketException e) {
LOG.info("Got exception while trying to read from stream,"
+ " trying to recover " + e);
reopen(pos, len);
byteRead = wrappedStream.read(buf, off, len);
bytesRead = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
throw e;
} catch (IOException e) {
onReadFailure(e, len);
bytesRead = wrappedStream.read(buf, off, len);
}
if (byteRead > 0) {
pos += byteRead;
nextReadPos += byteRead;
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
}
if (stats != null && byteRead > 0) {
stats.incrementBytesRead(byteRead);
}
return byteRead;
incrementBytesRead(bytesRead);
streamStatistics.readOperationCompleted(len, bytesRead);
return bytesRead;
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
/**
* Close the stream.
* This triggers publishing of the stream statistics back to the filesystem
* statistics.
* This operation is synchronized, so that only one thread can attempt to
* close the connection; all later/blocked calls are no-ops.
* @throws IOException on any problem
*/
@Override
public synchronized void close() throws IOException {
super.close();
closed = true;
closeStream(this.contentLength);
if (!closed) {
closed = true;
try {
// close or abort the stream
closeStream("close() operation", this.contentLength);
// this is actually a no-op
super.close();
} finally {
// merge the statistics back into the FS statistics.
streamStatistics.close();
}
}
}
/**
* Close a stream: decide whether to abort or close, based on
* the length of the stream and the current position.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* This does not set the {@link #closed} flag.
*
* @param reason reason for stream being closed; used in messages
* @param length length of the stream.
* @throws IOException
*/
private void closeStream(long length) throws IOException {
private void closeStream(String reason, long length) {
if (wrappedStream != null) {
String reason = null;
boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
if (!shouldAbort) {
try {
reason = "Closed stream";
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object
wrappedStream.close();
streamStatistics.streamClose(false);
} catch (IOException e) {
// exception escalates to an abort
LOG.debug("When closing stream", e);
LOG.debug("When closing {} stream for {}", uri, reason, e);
shouldAbort = true;
}
}
@ -292,13 +424,12 @@ public class S3AInputStream extends FSInputStream {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
wrappedStream.abort();
reason = "Closed stream with abort";
}
if (LOG.isDebugEnabled()) {
LOG.debug(reason + "; streamPos=" + pos
+ ", nextReadPos=" + nextReadPos
+ ", contentLength=" + length);
streamStatistics.streamClose(true);
}
LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
" length={}",
uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos,
length);
wrappedStream = null;
}
}
@ -307,19 +438,34 @@ public class S3AInputStream extends FSInputStream {
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = this.contentLength - this.pos;
long remaining = remaining();
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}
/**
* Bytes left in stream.
* @return how many bytes are left to read
*/
protected long remaining() {
return this.contentLength - this.pos;
}
@Override
public boolean markSupported() {
return false;
}
/**
* String value includes statistics as well as stream state.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
* @return a string value for printing in logs/diagnostics
*/
@Override
@InterfaceStability.Unstable
public String toString() {
final StringBuilder sb = new StringBuilder(
"S3AInputStream{");
@ -327,6 +473,7 @@ public class S3AInputStream extends FSInputStream {
sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos);
sb.append(" contentLength=").append(contentLength);
sb.append(" ").append(streamStatistics.toString());
sb.append('}');
return sb.toString();
}
@ -348,6 +495,7 @@ public class S3AInputStream extends FSInputStream {
throws IOException {
checkNotClosed();
validatePositionedReadArgs(position, buffer, offset, length);
streamStatistics.readFullyOperationStarted(position, length);
if (length == 0) {
return;
}
@ -363,10 +511,38 @@ public class S3AInputStream extends FSInputStream {
}
nread += nbytes;
}
} finally {
seek(oldPos);
seekQuietly(oldPos);
}
}
}
/**
* Access the input stream statistics.
* This is for internal testing and may be removed without warning.
* @return the statistics for this input stream
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public S3AInstrumentation.InputStreamStatistics getS3AStreamStatistics() {
return streamStatistics;
}
@Override
public void setReadahead(Long readahead) {
if (readahead == null) {
this.readahead = Constants.DEFAULT_READAHEAD_RANGE;
} else {
Preconditions.checkArgument(readahead >= 0, "Negative readahead value");
this.readahead = readahead;
}
}
/**
* Get the current readahead value.
* @return a non-negative readahead value
*/
public long getReadahead() {
return readahead;
}
}

View File

@ -0,0 +1,457 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Instrumentation of S3a.
* Derived from the {@code AzureFileSystemInstrumentation}
*/
@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
public static final String CONTEXT = "S3AFileSystem";
public static final String STREAM_OPENED = "streamOpened";
public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
public static final String STREAM_CLOSED = "streamClosed";
public static final String STREAM_ABORTED = "streamAborted";
public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
public static final String STREAM_FORWARD_SEEK_OPERATIONS
= "streamForwardSeekOperations";
public static final String STREAM_BACKWARD_SEEK_OPERATIONS
= "streamBackwardSeekOperations";
public static final String STREAM_SEEK_BYTES_SKIPPED =
"streamBytesSkippedOnSeek";
public static final String STREAM_SEEK_BYTES_BACKWARDS =
"streamBytesBackwardsOnSeek";
public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
public static final String STREAM_READ_FULLY_OPERATIONS
= "streamReadFullyOperations";
public static final String STREAM_READ_OPERATIONS_INCOMPLETE
= "streamReadOperationsIncomplete";
public static final String FILES_CREATED = "files_created";
public static final String FILES_COPIED = "files_copied";
public static final String FILES_COPIED_BYTES = "files_copied_bytes";
public static final String FILES_DELETED = "files_deleted";
public static final String DIRECTORIES_CREATED = "directories_created";
public static final String DIRECTORIES_DELETED = "directories_deleted";
public static final String IGNORED_ERRORS = "ignored_errors";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
private final MutableCounterLong streamOpenOperations;
private final MutableCounterLong streamCloseOperations;
private final MutableCounterLong streamClosed;
private final MutableCounterLong streamAborted;
private final MutableCounterLong streamSeekOperations;
private final MutableCounterLong streamReadExceptions;
private final MutableCounterLong streamForwardSeekOperations;
private final MutableCounterLong streamBackwardSeekOperations;
private final MutableCounterLong streamBytesSkippedOnSeek;
private final MutableCounterLong streamBytesBackwardsOnSeek;
private final MutableCounterLong streamBytesRead;
private final MutableCounterLong streamReadOperations;
private final MutableCounterLong streamReadFullyOperations;
private final MutableCounterLong streamReadsIncomplete;
private final MutableCounterLong ignoredErrors;
private final MutableCounterLong numberOfFilesCreated;
private final MutableCounterLong numberOfFilesCopied;
private final MutableCounterLong bytesOfFilesCopied;
private final MutableCounterLong numberOfFilesDeleted;
private final MutableCounterLong numberOfDirectoriesCreated;
private final MutableCounterLong numberOfDirectoriesDeleted;
private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
public S3AInstrumentation(URI name) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag("FileSystemId",
"A unique identifier for the FS ",
fileSystemInstanceId.toString() + "-" + name.getHost());
registry.tag("fsURI",
"URI of this filesystem",
name.toString());
streamOpenOperations = streamCounter(STREAM_OPENED,
"Total count of times an input stream to object store was opened");
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
"Total count of times an attempt to close a data stream was made");
streamClosed = streamCounter(STREAM_CLOSED,
"Count of times the TCP stream was closed");
streamAborted = streamCounter(STREAM_ABORTED,
"Count of times the TCP stream was aborted");
streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
"Number of seek operations invoked on input streams");
streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
"Number of read exceptions caught and attempted to recovered from");
streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
"Number of executed seek operations which went forward in a stream");
streamBackwardSeekOperations = streamCounter(
STREAM_BACKWARD_SEEK_OPERATIONS,
"Number of executed seek operations which went backwards in a stream");
streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
"Count of bytes skipped during forward seek operations");
streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
"Count of bytes moved backwards during seek operations");
streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
"Count of bytes read during seek() in stream operations");
streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
"Count of read() operations in streams");
streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
"Count of readFully() operations in streams");
streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
"Count of incomplete read() operations in streams");
numberOfFilesCreated = counter(FILES_CREATED,
"Total number of files created through the object store.");
numberOfFilesCopied = counter(FILES_COPIED,
"Total number of files copied within the object store.");
bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
"Total number of bytes copied within the object store.");
numberOfFilesDeleted = counter(FILES_DELETED,
"Total number of files deleted through from the object store.");
numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
"Total number of directories created through the object store.");
numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
"Total number of directories deleted through the object store.");
ignoredErrors = counter(IGNORED_ERRORS,
"Total number of errors caught and ingored.");
}
/**
* Create a counter in the registry.
* @param name counter name
* @param desc counter description
* @return a new counter
*/
protected final MutableCounterLong counter(String name, String desc) {
return registry.newCounter(name, desc, 0L);
}
/**
* Create a counter in the stream map: these are unregistered in the public
* metrics.
* @param name counter name
* @param desc counter description
* @return a new counter
*/
protected final MutableCounterLong streamCounter(String name, String desc) {
MutableCounterLong counter = new MutableCounterLong(
Interns.info(name, desc), 0L);
streamMetrics.put(name, counter);
return counter;
}
/**
* Create a gauge in the registry.
* @param name name gauge name
* @param desc description
* @return the gauge
*/
protected final MutableGaugeLong gauge(String name, String desc) {
return registry.newGauge(name, desc, 0L);
}
/**
* Get the metrics registry.
* @return the registry
*/
public MetricsRegistry getRegistry() {
return registry;
}
/**
* Dump all the metrics to a string.
* @param prefix prefix before every entry
* @param separator separator between name and value
* @param suffix suffix
* @param all get all the metrics even if the values are not changed.
* @return a string dump of the metrics
*/
public String dump(String prefix,
String separator,
String suffix,
boolean all) {
MetricStringBuilder metricBuilder = new MetricStringBuilder(null,
prefix,
separator, suffix);
registry.snapshot(metricBuilder, all);
for (Map.Entry<String, MutableCounterLong> entry:
streamMetrics.entrySet()) {
metricBuilder.tuple(entry.getKey(),
Long.toString(entry.getValue().value()));
}
return metricBuilder.toString();
}
/**
* Indicate that S3A created a file.
*/
public void fileCreated() {
numberOfFilesCreated.incr();
}
/**
* Indicate that S3A deleted one or more file.s
* @param count number of files.
*/
public void fileDeleted(int count) {
numberOfFilesDeleted.incr(count);
}
/**
* Indicate that S3A created a directory.
*/
public void directoryCreated() {
numberOfDirectoriesCreated.incr();
}
/**
* Indicate that S3A just deleted a directory.
*/
public void directoryDeleted() {
numberOfDirectoriesDeleted.incr();
}
/**
* Indicate that S3A copied some files within the store.
*
* @param files number of files
* @param size total size in bytes
*/
public void filesCopied(int files, long size) {
numberOfFilesCopied.incr(files);
bytesOfFilesCopied.incr(size);
}
/**
* Note that an error was ignored.
*/
public void errorIgnored() {
ignoredErrors.incr();
}
/**
* Create a stream input statistics instance.
* @return the new instance
*/
InputStreamStatistics newInputStreamStatistics() {
return new InputStreamStatistics();
}
/**
* Merge in the statistics of a single input stream into
* the filesystem-wide statistics.
* @param statistics stream statistics
*/
private void mergeInputStreamStatistics(InputStreamStatistics statistics) {
streamOpenOperations.incr(statistics.openOperations);
streamCloseOperations.incr(statistics.closeOperations);
streamClosed.incr(statistics.closed);
streamAborted.incr(statistics.aborted);
streamSeekOperations.incr(statistics.seekOperations);
streamReadExceptions.incr(statistics.readExceptions);
streamForwardSeekOperations.incr(statistics.forwardSeekOperations);
streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek);
streamBackwardSeekOperations.incr(statistics.backwardSeekOperations);
streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek);
streamBytesRead.incr(statistics.bytesRead);
streamReadOperations.incr(statistics.readOperations);
streamReadFullyOperations.incr(statistics.readFullyOperations);
streamReadsIncomplete.incr(statistics.readsIncomplete);
}
/**
* Statistics updated by an input stream during its actual operation.
* These counters not thread-safe and are for use in a single instance
* of a stream.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class InputStreamStatistics implements AutoCloseable {
public long openOperations;
public long closeOperations;
public long closed;
public long aborted;
public long seekOperations;
public long readExceptions;
public long forwardSeekOperations;
public long backwardSeekOperations;
public long bytesRead;
public long bytesSkippedOnSeek;
public long bytesBackwardsOnSeek;
public long readOperations;
public long readFullyOperations;
public long readsIncomplete;
private InputStreamStatistics() {
}
/**
* Seek backwards, incrementing the seek and backward seek counters.
* @param negativeOffset how far was the seek?
* This is expected to be negative.
*/
public void seekBackwards(long negativeOffset) {
seekOperations++;
backwardSeekOperations++;
bytesBackwardsOnSeek -= negativeOffset;
}
/**
* Record a forward seek, adding a seek operation, a forward
* seek operation, and any bytes skipped.
* @param skipped number of bytes skipped by reading from the stream.
* If the seek was implemented by a close + reopen, set this to zero.
*/
public void seekForwards(long skipped) {
seekOperations++;
forwardSeekOperations++;
if (skipped > 0) {
bytesSkippedOnSeek += skipped;
}
}
/**
* The inner stream was opened.
*/
public void streamOpened() {
openOperations++;
}
/**
* The inner stream was closed.
* @param abortedConnection flag to indicate the stream was aborted,
* rather than closed cleanly
*/
public void streamClose(boolean abortedConnection) {
closeOperations++;
if (abortedConnection) {
this.aborted++;
} else {
closed++;
}
}
/**
* An ignored stream read exception was received.
*/
public void readException() {
readExceptions++;
}
/**
* Increment the bytes read counter by the number of bytes;
* no-op if the argument is negative.
* @param bytes number of bytes read
*/
public void bytesRead(long bytes) {
if (bytes > 0) {
bytesRead += bytes;
}
}
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
* @param pos starting position of the read
* @param len length of bytes to read
*/
public void readOperationStarted(long pos, long len) {
readOperations++;
}
/**
* A {@code PositionedRead.read(position, buffer, offset, length)}
* operation has just started.
* @param pos starting position of the read
* @param len length of bytes to read
*/
public void readFullyOperationStarted(long pos, long len) {
readFullyOperations++;
}
/**
* A read operation has completed.
* @param requested number of requested bytes
* @param actual the actual number of bytes
*/
public void readOperationCompleted(int requested, int actual) {
if (requested > actual) {
readsIncomplete++;
}
}
/**
* Close triggers the merge of statistics into the filesystem's
* instrumentation instance.
*/
@Override
public void close() {
mergeInputStreamStatistics(this);
}
/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
* @return the current values of the stream statistics.
*/
@Override
@InterfaceStability.Unstable
public String toString() {
final StringBuilder sb = new StringBuilder(
"StreamStatistics{");
sb.append("OpenOperations=").append(openOperations);
sb.append(", CloseOperations=").append(closeOperations);
sb.append(", Closed=").append(closed);
sb.append(", Aborted=").append(aborted);
sb.append(", SeekOperations=").append(seekOperations);
sb.append(", ReadExceptions=").append(readExceptions);
sb.append(", ForwardSeekOperations=")
.append(forwardSeekOperations);
sb.append(", BackwardSeekOperations=")
.append(backwardSeekOperations);
sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
sb.append(", BytesRead=").append(bytesRead);
sb.append(", BytesRead excluding skipped=")
.append(bytesRead - bytesSkippedOnSeek);
sb.append(", ReadOperations=").append(readOperations);
sb.append(", ReadFullyOperations=").append(readFullyOperations);
sb.append(", ReadsIncomplete=").append(readsIncomplete);
sb.append('}');
return sb.toString();
}
}
}

View File

@ -21,14 +21,14 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@ -46,6 +46,11 @@ import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Output stream to save data to S3.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream extends OutputStream {
private OutputStream backupStream;
private File backupFile;
@ -65,9 +70,9 @@ public class S3AOutputStream extends OutputStream {
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, TransferManager transfers,
S3AFileSystem fs, String bucket, String key, Progressable progress,
CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
String serverSideEncryptionAlgorithm)
S3AFileSystem fs, String bucket, String key, Progressable progress,
CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
String serverSideEncryptionAlgorithm)
throws IOException {
this.bucket = bucket;
this.key = key;
@ -78,9 +83,8 @@ public class S3AOutputStream extends OutputStream {
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
partSize = fs.getPartitionSize();
partSizeThreshold = fs.getMultiPartThreshold();
if (conf.get(BUFFER_DIR, null) != null) {
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
@ -91,10 +95,8 @@ public class S3AOutputStream extends OutputStream {
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
closed = false;
if (LOG.isDebugEnabled()) {
LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " +
this.backupFile);
}
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
}
@ -111,10 +113,9 @@ public class S3AOutputStream extends OutputStream {
}
backupStream.close();
if (LOG.isDebugEnabled()) {
LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload");
LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
}
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
partSizeThreshold);
try {
@ -129,7 +130,7 @@ public class S3AOutputStream extends OutputStream {
Upload upload = transfers.upload(putObjectRequest);
ProgressableProgressListener listener =
new ProgressableProgressListener(upload, progress, statistics);
new ProgressableProgressListener(upload, progress, statistics);
upload.addProgressListener(listener);
upload.waitForUploadResult();
@ -168,6 +169,9 @@ public class S3AOutputStream extends OutputStream {
backupStream.write(b, off, len);
}
/**
* Listener to progress from AWS regarding transfers.
*/
public static class ProgressableProgressListener implements ProgressListener {
private Progressable progress;
private FileSystem.Statistics statistics;
@ -175,7 +179,7 @@ public class S3AOutputStream extends OutputStream {
private Upload upload;
public ProgressableProgressListener(Upload upload, Progressable progress,
FileSystem.Statistics statistics) {
FileSystem.Statistics statistics) {
this.upload = upload;
this.progress = progress;
this.statistics = statistics;

View File

@ -437,6 +437,14 @@ this capability.
<description>The implementation class of the S3A AbstractFileSystem.</description>
</property>
<property>
<name>fs.s3a.readahead.range</name>
<value>65536</value>
<description>Bytes to read ahead during a seek() before closing and
re-opening the S3 HTTP connection. This option will be overridden if
any call to setReadahead() is made to an open stream.</description>
</property>
### S3AFastOutputStream
**Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
@ -647,7 +655,7 @@ Example:
<configuration>
<include xmlns="http://www.w3.org/2001/XInclude"
href="auth-keys.xml"/>
href="/home/testuser/.ssh/auth-keys.xml"/>
<property>
<name>fs.contract.test.fs.s3</name>
@ -667,7 +675,61 @@ Example:
</configuration>
This example pulls in the `auth-keys.xml` file for the credentials.
This example pulls in the `~/.ssh/auth-keys.xml` file for the credentials.
This provides one single place to keep the keys up to date —and means
that the file `contract-test-options.xml` does not contain any
secret credentials itself.
secret credentials itself. As the auth keys XML file is kept out of the
source code tree, it is not going to get accidentally committed.
### Running Performance Tests against non-AWS storage infrastructures
#### CSV Data source
The `TestS3AInputStreamPerformance` tests require read access to a multi-MB
text file. The default file for these tests is one published by amazon,
[s3a://landsat-pds.s3.amazonaws.com/scene_list.gz](http://landsat-pds.s3.amazonaws.com/scene_list.gz).
This is a gzipped CSV index of other files which amazon serves for open use.
The path to this object is set in the option `fs.s3a.scale.test.csvfile`:
<property>
<name>fs.s3a.scale.test.csvfile</name>
<value>s3a://landsat-pds/scene_list.gz</value>
</property>
1. If the option is not overridden, the default value is used. This
is hosted in Amazon's US-east datacenter.
1. If the property is empty, tests which require it will be skipped.
1. If the data cannot be read for any reason then the test will fail.
1. If the property is set to a different path, then that data must be readable
and "sufficiently" large.
To test on different S3 endpoints, or alternate infrastructures supporting
the same APIs, the option `fs.s3a.scale.test.csvfile` must therefore be
set to " ", or an object of at least 10MB is uploaded to the object store, and
the `fs.s3a.scale.test.csvfile` option set to its path.
<property>
<name>fs.s3a.scale.test.csvfile</name>
<value> </value>
</property>
#### Scale test operation count
Some scale tests perform multiple operations (such as creating many directories).
The exact number of operations to perform is configurable in the option
`scale.test.operation.count`
<property>
<name>scale.test.operation.count</name>
<value>10</value>
</property>
Larger values generate more load, and are recommended when testing locally,
or in batch runs.
Smaller values should result in faster test runs, especially when the object
store is a long way away.

View File

@ -19,18 +19,25 @@
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.io.InputStream;
import java.util.Locale;
import static org.junit.Assume.assumeTrue;
@ -38,30 +45,61 @@ import static org.junit.Assume.assumeTrue;
* Base class for scale tests; here is where the common scale configuration
* keys are defined.
*/
public class S3AScaleTestBase {
public class S3AScaleTestBase extends Assert {
public static final String SCALE_TEST = "scale.test.";
public static final String S3A_SCALE_TEST = "fs.s3a.scale.test.";
@Rule
public TestName methodName = new TestName();
@BeforeClass
public static void nameThread() {
Thread.currentThread().setName("JUnit");
}
/**
* The number of operations to perform: {@value}
* The number of operations to perform: {@value}.
*/
public static final String KEY_OPERATION_COUNT =
SCALE_TEST + "operation.count";
/**
* The default number of operations to perform: {@value}
* The readahead buffer: {@value}.
*/
public static final String KEY_READ_BUFFER_SIZE =
S3A_SCALE_TEST + "read.buffer.size";
public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
* Key for a multi MB test file: {@value}.
*/
public static final String KEY_CSVTEST_FILE =
S3A_SCALE_TEST + "csvfile";
/**
* Default path for the multi MB test file: {@value}.
*/
public static final String DEFAULT_CSVTEST_FILE
= "s3a://landsat-pds/scene_list.gz";
/**
* The default number of operations to perform: {@value}.
*/
public static final long DEFAULT_OPERATION_COUNT = 2005;
protected S3AFileSystem fs;
private static final Logger LOG =
protected static final Logger LOG =
LoggerFactory.getLogger(S3AScaleTestBase.class);
private Configuration conf;
/**
* Configuration generator. May be overridden to inject
* some custom options
* some custom options.
* @return a configuration with which to create FS instances
*/
protected Configuration createConfiguration() {
@ -69,7 +107,7 @@ public class S3AScaleTestBase {
}
/**
* Get the configuration used to set up the FS
* Get the configuration used to set up the FS.
* @return the configuration
*/
public Configuration getConf() {
@ -79,7 +117,7 @@ public class S3AScaleTestBase {
@Before
public void setUp() throws Exception {
conf = createConfiguration();
LOG.info("Scale test operation count = {}", getOperationCount());
LOG.debug("Scale test operation count = {}", getOperationCount());
fs = S3ATestUtils.createTestFileSystem(conf);
}
@ -95,4 +133,139 @@ public class S3AScaleTestBase {
protected long getOperationCount() {
return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
}
/**
* Describe a test in the logs
* @param text text to print
* @param args arguments to format in the printing
*/
protected void describe(String text, Object... args) {
LOG.info("\n\n{}: {}\n",
methodName.getMethodName(),
String.format(text, args));
}
/**
* Get the input stream statistics of an input stream.
* Raises an exception if the inner stream is not an S3A input stream
* @param in wrapper
* @return the statistics for the inner stream
*/
protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
FSDataInputStream in) {
InputStream inner = in.getWrappedStream();
if (inner instanceof S3AInputStream) {
S3AInputStream s3a = (S3AInputStream) inner;
return s3a.getS3AStreamStatistics();
} else {
Assert.fail("Not an S3AInputStream: " + inner);
// never reached
return null;
}
}
/**
* Make times more readable, by adding a "," every three digits.
* @param nanos nanos or other large number
* @return a string for logging
*/
protected static String toHuman(long nanos) {
return String.format(Locale.ENGLISH, "%,d", nanos);
}
/**
* Log the bandwidth of a timer as inferred from the number of
* bytes processed.
* @param timer timer
* @param bytes bytes processed in the time period
*/
protected void bandwidth(NanoTimer timer, long bytes) {
LOG.info("Bandwidth = {} MB/S",
timer.bandwidthDescription(bytes));
}
/**
* Work out the bandwidth in MB/s
* @param bytes bytes
* @param durationNS duration in nanos
* @return the number of megabytes/second of the recorded operation
*/
public static double bandwidthMBs(long bytes, long durationNS) {
return (bytes * 1000.0 ) / durationNS;
}
/**
* A simple class for timing operations in nanoseconds, and for
* printing some useful results in the process.
*/
protected static class NanoTimer {
final long startTime;
long endTime;
public NanoTimer() {
startTime = now();
}
/**
* End the operation
* @return the duration of the operation
*/
public long end() {
endTime = now();
return duration();
}
/**
* End the operation; log the duration
* @param format message
* @param args any arguments
* @return the duration of the operation
*/
public long end(String format, Object... args) {
long d = end();
LOG.info("Duration of {}: {} nS",
String.format(format, args), toHuman(d));
return d;
}
long now() {
return System.nanoTime();
}
long duration() {
return endTime - startTime;
}
double bandwidth(long bytes) {
return S3AScaleTestBase.bandwidthMBs(bytes, duration());
}
/**
* Bandwidth as bytes per second
* @param bytes bytes in
* @return the number of bytes per second this operation timed.
*/
double bandwidthBytes(long bytes) {
return (bytes * 1.0 ) / duration();
}
/**
* How many nanoseconds per byte
* @param bytes bytes processed in this time period
* @return the nanoseconds it took each byte to be processed
*/
long nanosPerByte(long bytes) {
return duration() / bytes;
}
/**
* Get a description of the bandwidth, even down to fractions of
* a MB
* @param bytes bytes processed
* @return bandwidth
*/
String bandwidthDescription(long bytes) {
return String.format("%,.6f", bandwidth(bytes));
}
}
}

View File

@ -40,7 +40,6 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Look at the performance of S3a operations
*/
public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger(
TestS3AInputStreamPerformance.class);
private S3AFileSystem s3aFS;
private Path testData;
private S3AFileStatus testDataStatus;
private FSDataInputStream in;
private S3AInstrumentation.InputStreamStatistics streamStatistics;
public static final int BLOCK_SIZE = 32 * 1024;
public static final int BIG_BLOCK_SIZE = 256 * 1024;
/** Tests only run if the there is a named test file that can be read */
private boolean testDataAvailable = true;
private String assumptionMessage = "test file";
/**
* Open the FS and the test data. The input stream is always set up here.
* @throws IOException
*/
@Before
public void openFS() throws IOException {
Configuration conf = getConf();
String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
if (testFile.isEmpty()) {
assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
testDataAvailable = false;
} else {
testData = new Path(testFile);
s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf);
try {
testDataStatus = s3aFS.getFileStatus(testData);
} catch (IOException e) {
LOG.warn("Failed to read file {} specified in {}",
testFile, KEY_CSVTEST_FILE, e);
throw e;
}
}
}
/**
* Cleanup: close the stream, close the FS.
*/
@After
public void cleanup() {
IOUtils.closeStream(in);
IOUtils.closeStream(s3aFS);
}
/**
* Declare that the test requires the CSV test dataset
*/
private void requireCSVTestData() {
Assume.assumeTrue(assumptionMessage, testDataAvailable);
}
/**
* Open the test file with the read buffer specified in the setting
* {@link #KEY_READ_BUFFER_SIZE}
* @return the stream, wrapping an S3a one
* @throws IOException
*/
FSDataInputStream openTestFile() throws IOException {
int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
DEFAULT_READ_BUFFER_SIZE);
FSDataInputStream stream = s3aFS.open(testData, bufferSize);
streamStatistics = getInputStreamStatistics(stream);
return stream;
}
/**
* assert tha the stream was only ever opened once
*/
protected void assertStreamOpenedExactlyOnce() {
assertOpenOperationCount(1);
}
/**
* Make an assertion count about the number of open operations
* @param expected the expected number
*/
private void assertOpenOperationCount(int expected) {
assertEquals("open operations in " + streamStatistics,
expected, streamStatistics.openOperations);
}
/**
* Log how long an IOP took, by dividing the total time by the
* count of operations, printing in a human-readable form
* @param timer timing data
* @param count IOP count.
*/
protected void logTimePerIOP(NanoTimer timer, long count) {
LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count));
}
@Test
public void testTimeToOpenAndReadWholeFileByByte() throws Throwable {
requireCSVTestData();
describe("Open the test file %s and read it byte by byte", testData);
long len = testDataStatus.getLen();
NanoTimer timeOpen = new NanoTimer();
in = openTestFile();
timeOpen.end("Open stream");
NanoTimer readTimer = new NanoTimer();
long count = 0;
while (in.read() >= 0) {
count ++;
}
readTimer.end("Time to read %d bytes", len);
bandwidth(readTimer, count);
assertEquals("Not enough bytes were read)", len, count);
long nanosPerByte = readTimer.nanosPerByte(count);
LOG.info("An open() call has the equivalent duration of reading {} bytes",
toHuman( timeOpen.duration() / nanosPerByte));
}
@Test
public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
requireCSVTestData();
describe("Open the test file %s and read it in blocks of size %d",
testData, BLOCK_SIZE);
long len = testDataStatus.getLen();
in = openTestFile();
byte[] block = new byte[BLOCK_SIZE];
NanoTimer timer2 = new NanoTimer();
long count = 0;
// implicitly rounding down here
long blockCount = len / BLOCK_SIZE;
for (long i = 0; i < blockCount; i++) {
int offset = 0;
int remaining = BLOCK_SIZE;
NanoTimer blockTimer = new NanoTimer();
int reads = 0;
while (remaining > 0) {
int bytesRead = in.read(block, offset, remaining);
reads ++;
if (bytesRead == 1) {
break;
}
remaining -= bytesRead;
offset += bytesRead;
count += bytesRead;
}
blockTimer.end("Reading block %d in %d reads", i, reads);
}
timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
bandwidth(timer2, count);
LOG.info("{}", streamStatistics);
}
@Test
public void testLazySeekEnabled() throws Throwable {
requireCSVTestData();
describe("Verify that seeks do not trigger any IO");
long len = testDataStatus.getLen();
in = openTestFile();
NanoTimer timer = new NanoTimer();
long blockCount = len / BLOCK_SIZE;
for (long i = 0; i < blockCount; i++) {
in.seek(in.getPos() + BLOCK_SIZE - 1);
}
in.seek(0);
blockCount++;
timer.end("Time to execute %d seeks", blockCount);
logTimePerIOP(timer, blockCount);
LOG.info("{}", streamStatistics);
assertOpenOperationCount(0);
assertEquals("bytes read", 0, streamStatistics.bytesRead);
}
@Test
public void testReadAheadDefault() throws Throwable {
requireCSVTestData();
describe("Verify that a series of forward skips within the readahead" +
" range do not close and reopen the stream");
executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE);
assertStreamOpenedExactlyOnce();
}
@Test
public void testReadaheadOutOfRange() throws Throwable {
requireCSVTestData();
try {
in = openTestFile();
in.setReadahead(-1L);
fail("Stream should have rejected the request "+ in);
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testReadBigBlocksAvailableReadahead() throws Throwable {
requireCSVTestData();
describe("set readahead to available bytes only");
executeSeekReadSequence(BIG_BLOCK_SIZE, 0);
// expect that the stream will have had lots of opens
assertTrue("not enough open operations in " + streamStatistics,
streamStatistics.openOperations > 1);
}
@Test
public void testReadBigBlocksBigReadahead() throws Throwable {
requireCSVTestData();
describe("Read big blocks with a big readahead");
executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2);
assertStreamOpenedExactlyOnce();
}
/**
* Execute a seek+read sequence
* @param blockSize block size for seeks
* @param readahead what the readahead value of the stream should be
* @throws IOException IO problems
*/
protected void executeSeekReadSequence(long blockSize,
long readahead) throws IOException {
requireCSVTestData();
long len = testDataStatus.getLen();
in = openTestFile();
in.setReadahead(readahead);
NanoTimer timer = new NanoTimer();
long blockCount = len / blockSize;
LOG.info("Reading {} blocks, readahead = {}",
blockCount, readahead);
for (long i = 0; i < blockCount; i++) {
in.seek(in.getPos() + blockSize - 1);
// this is the read
assertTrue(in.read() >= 0);
}
timer.end("Time to execute %d seeks of distance %d with readahead = %d",
blockCount,
blockSize,
readahead);
logTimePerIOP(timer, blockCount);
LOG.info("Effective bandwidth {} MB/S",
timer.bandwidthDescription(streamStatistics.bytesRead -
streamStatistics.bytesSkippedOnSeek));
LOG.info("{}", streamStatistics);
}
}

View File

@ -16,3 +16,6 @@ log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
# for debugging low level S3a operations, uncomment this line
# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG