HADOOP-10400. Incorporate new S3A FileSystem implementation. Contributed by Jordan Mendelson and Dave Wang.
This commit is contained in:
parent
fc741b5d78
commit
24d920b80e
|
@ -342,6 +342,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HADOOP-10893. isolated classloader on the client side (Sangjin Lee via
|
||||
jlowe)
|
||||
|
||||
HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan
|
||||
Mendelson and Dave Wang via atm)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-10808. Remove unused native code for munlock. (cnauroth)
|
||||
|
|
|
@ -174,6 +174,11 @@ log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
|
|||
# Jets3t library
|
||||
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
|
||||
|
||||
# AWS SDK & S3A FileSystem
|
||||
log4j.logger.com.amazonaws=ERROR
|
||||
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
|
||||
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
|
||||
|
||||
#
|
||||
# Event Counter Appender
|
||||
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
|
||||
|
|
|
@ -689,6 +689,92 @@ for ldap providers in the same way as above does.
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.access.key</name>
|
||||
<description>AWS access key ID. Omit for Role-based authentication.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.secret.key</name>
|
||||
<description>AWS secret key. Omit for Role-based authentication.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.maximum</name>
|
||||
<value>15</value>
|
||||
<description>Controls the maximum number of simultaneous connections to S3.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.ssl.enabled</name>
|
||||
<value>true</value>
|
||||
<description>Enables or disables SSL connections to S3.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.attempts.maximum</name>
|
||||
<value>10</value>
|
||||
<description>How many times we should retry commands on transient errors.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.timeout</name>
|
||||
<value>5000</value>
|
||||
<description>Socket connection timeout in seconds.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.paging.maximum</name>
|
||||
<value>5000</value>
|
||||
<description>How many keys to request from S3 when doing
|
||||
directory listings at a time.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.size</name>
|
||||
<value>104857600</value>
|
||||
<description>How big (in bytes) to split upload or copy operations up into.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.threshold</name>
|
||||
<value>2147483647</value>
|
||||
<description>Threshold before uploads or copies use parallel multipart operations.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.acl.default</name>
|
||||
<description>Set a canned ACL for newly created and copied objects. Value may be private,
|
||||
public-read, public-read-write, authenticated-read, log-delivery-write,
|
||||
bucket-owner-read, or bucket-owner-full-control.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.purge</name>
|
||||
<value>false</value>
|
||||
<description>True if you want to purge existing multipart uploads that may not have been
|
||||
completed/aborted correctly</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.multipart.purge.age</name>
|
||||
<value>86400</value>
|
||||
<description>Minimum age in seconds of multipart uploads to purge</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.buffer.dir</name>
|
||||
<value>${hadoop.tmp.dir}/s3a</value>
|
||||
<description>Comma separated list of directories that will be used to buffer file
|
||||
uploads to.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
|
||||
<description>The implementation class of the S3A Filesystem</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>io.seqfile.compress.blocksize</name>
|
||||
<value>1000000</value>
|
||||
|
|
|
@ -130,6 +130,10 @@
|
|||
<groupId>net.java.dev.jets3t</groupId>
|
||||
<artifactId>jets3t</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jdt</groupId>
|
||||
<artifactId>core</artifactId>
|
||||
|
@ -169,6 +173,10 @@
|
|||
<groupId>net.java.dev.jets3t</groupId>
|
||||
<artifactId>jets3t</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jdt</groupId>
|
||||
<artifactId>core</artifactId>
|
||||
|
|
|
@ -61,8 +61,9 @@
|
|||
<!-- jersey version -->
|
||||
<jersey.version>1.9</jersey.version>
|
||||
|
||||
<!-- jackson version -->
|
||||
<!-- jackson versions -->
|
||||
<jackson.version>1.9.13</jackson.version>
|
||||
<jackson2.version>2.2.3</jackson2.version>
|
||||
|
||||
<!-- ProtocolBuffer version, used to verify the protoc version and -->
|
||||
<!-- define the protobuf JAR version -->
|
||||
|
@ -581,13 +582,7 @@
|
|||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
<version>1.7.2</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
<version>1.7.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.mina</groupId>
|
||||
|
@ -674,6 +669,21 @@
|
|||
<artifactId>jackson-xc</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson2.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson2.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson2.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
|
|
|
@ -100,6 +100,16 @@
|
|||
<type>test-jar</type>
|
||||
</dependency>
|
||||
|
||||
<!-- see ../../hadoop-project/pom.xml for versions -->
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.AnonymousAWSCredentials;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
|
||||
public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
|
||||
public AWSCredentials getCredentials() {
|
||||
return new AnonymousAWSCredentials();
|
||||
}
|
||||
|
||||
public void refresh() {}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
|
||||
public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
|
||||
private String accessKey;
|
||||
private String secretKey;
|
||||
|
||||
public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
|
||||
this.accessKey = accessKey;
|
||||
this.secretKey = secretKey;
|
||||
}
|
||||
|
||||
public AWSCredentials getCredentials() {
|
||||
if (accessKey != null && secretKey != null) {
|
||||
return new BasicAWSCredentials(accessKey, secretKey);
|
||||
}
|
||||
|
||||
throw new AmazonClientException(
|
||||
"Access key or secret key is null");
|
||||
}
|
||||
|
||||
public void refresh() {}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
|
||||
public class Constants {
|
||||
// s3 access key
|
||||
public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
|
||||
public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
|
||||
|
||||
// s3 secret key
|
||||
public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
|
||||
public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
|
||||
|
||||
// number of simultaneous connections to s3
|
||||
public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
|
||||
public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
||||
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
|
||||
|
||||
// connect to s3 over ssl?
|
||||
public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections";
|
||||
public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
|
||||
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
|
||||
|
||||
// number of times we should retry errors
|
||||
public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
|
||||
public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
|
||||
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
|
||||
|
||||
// seconds until we give up on a connection to s3
|
||||
public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
|
||||
public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
|
||||
public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
|
||||
|
||||
// number of records to get while paging through a directory listing
|
||||
public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
|
||||
public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
|
||||
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
|
||||
|
||||
// size of each of or multipart pieces in bytes
|
||||
public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
|
||||
public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
|
||||
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
|
||||
|
||||
// minimum size in bytes before we start a multipart uploads or copy
|
||||
public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize";
|
||||
public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
|
||||
public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
|
||||
|
||||
// comma separated list of directories
|
||||
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
|
||||
|
||||
// private | public-read | public-read-write | authenticated-read |
|
||||
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
|
||||
public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
|
||||
public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
|
||||
public static final String DEFAULT_CANNED_ACL = "";
|
||||
|
||||
// should we try to purge old multipart uploads when starting up
|
||||
public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart";
|
||||
public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
|
||||
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
|
||||
|
||||
// purge any multipart uploads older than this number of seconds
|
||||
public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge";
|
||||
public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
|
||||
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
|
||||
|
||||
// s3 server-side encryption
|
||||
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
|
||||
"fs.s3a.server-side-encryption-algorithm";
|
||||
|
||||
public static final String S3N_FOLDER_SUFFIX = "_$folder$";
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
public class S3AFileStatus extends FileStatus {
|
||||
private boolean isEmptyDirectory;
|
||||
|
||||
// Directories
|
||||
public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) {
|
||||
super(0, isdir, 1, 0, 0, path);
|
||||
isEmptyDirectory = isemptydir;
|
||||
}
|
||||
|
||||
// Files
|
||||
public S3AFileStatus(long length, long modification_time, Path path) {
|
||||
super(length, false, 1, 0, modification_time, path);
|
||||
isEmptyDirectory = false;
|
||||
}
|
||||
|
||||
public boolean isEmptyDirectory() {
|
||||
return isEmptyDirectory;
|
||||
}
|
||||
|
||||
/** Compare if this object is equal to another object
|
||||
* @param o the object to be compared.
|
||||
* @return true if two file status has the same path name; false if not.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return super.equals(o);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a hash code value for the object, which is defined as
|
||||
* the hash code of the path name.
|
||||
*
|
||||
* @return a hash code value for the path name.
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,207 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.SocketException;
|
||||
|
||||
public class S3AInputStream extends FSInputStream {
|
||||
private long pos;
|
||||
private boolean closed;
|
||||
private S3ObjectInputStream wrappedStream;
|
||||
private S3Object wrappedObject;
|
||||
private FileSystem.Statistics stats;
|
||||
private AmazonS3Client client;
|
||||
private String bucket;
|
||||
private String key;
|
||||
private long contentLength;
|
||||
public static final Logger LOG = S3AFileSystem.LOG;
|
||||
|
||||
|
||||
public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
|
||||
FileSystem.Statistics stats) {
|
||||
this.bucket = bucket;
|
||||
this.key = key;
|
||||
this.contentLength = contentLength;
|
||||
this.client = client;
|
||||
this.stats = stats;
|
||||
this.pos = 0;
|
||||
this.closed = false;
|
||||
this.wrappedObject = null;
|
||||
this.wrappedStream = null;
|
||||
}
|
||||
|
||||
private void openIfNeeded() throws IOException {
|
||||
if (wrappedObject == null) {
|
||||
reopen(0);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void reopen(long pos) throws IOException {
|
||||
if (wrappedStream != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Aborting old stream to open at pos " + pos);
|
||||
}
|
||||
wrappedStream.abort();
|
||||
}
|
||||
|
||||
if (pos < 0) {
|
||||
throw new EOFException("Trying to seek to a negative offset " + pos);
|
||||
}
|
||||
|
||||
if (contentLength > 0 && pos > contentLength-1) {
|
||||
throw new EOFException("Trying to seek to an offset " + pos +
|
||||
" past the end of the file");
|
||||
}
|
||||
|
||||
LOG.info("Actually opening file " + key + " at pos " + pos);
|
||||
|
||||
GetObjectRequest request = new GetObjectRequest(bucket, key);
|
||||
request.setRange(pos, contentLength-1);
|
||||
|
||||
wrappedObject = client.getObject(request);
|
||||
wrappedStream = wrappedObject.getObjectContent();
|
||||
|
||||
if (wrappedStream == null) {
|
||||
throw new IOException("Null IO stream");
|
||||
}
|
||||
|
||||
this.pos = pos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getPos() throws IOException {
|
||||
return pos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void seek(long pos) throws IOException {
|
||||
if (this.pos == pos) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
|
||||
reopen(pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
||||
openIfNeeded();
|
||||
|
||||
int byteRead;
|
||||
try {
|
||||
byteRead = wrappedStream.read();
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
|
||||
reopen(pos);
|
||||
byteRead = wrappedStream.read();
|
||||
} catch (SocketException e) {
|
||||
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
|
||||
reopen(pos);
|
||||
byteRead = wrappedStream.read();
|
||||
}
|
||||
|
||||
if (byteRead >= 0) {
|
||||
pos++;
|
||||
}
|
||||
|
||||
if (stats != null && byteRead >= 0) {
|
||||
stats.incrementBytesRead(1);
|
||||
}
|
||||
return byteRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(byte buf[], int off, int len) throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
||||
openIfNeeded();
|
||||
|
||||
int byteRead;
|
||||
try {
|
||||
byteRead = wrappedStream.read(buf, off, len);
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
|
||||
reopen(pos);
|
||||
byteRead = wrappedStream.read(buf, off, len);
|
||||
} catch (SocketException e) {
|
||||
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
|
||||
reopen(pos);
|
||||
byteRead = wrappedStream.read(buf, off, len);
|
||||
}
|
||||
|
||||
if (byteRead > 0) {
|
||||
pos += byteRead;
|
||||
}
|
||||
|
||||
if (stats != null && byteRead > 0) {
|
||||
stats.incrementBytesRead(byteRead);
|
||||
}
|
||||
|
||||
return byteRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
closed = true;
|
||||
if (wrappedObject != null) {
|
||||
wrappedObject.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int available() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
long remaining = this.contentLength - this.pos;
|
||||
if (remaining > Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
return (int)remaining;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.event.ProgressEvent;
|
||||
import com.amazonaws.event.ProgressListener;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.transfer.TransferManager;
|
||||
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
|
||||
import com.amazonaws.services.s3.transfer.Upload;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
|
||||
public class S3AOutputStream extends OutputStream {
|
||||
private OutputStream backupStream;
|
||||
private File backupFile;
|
||||
private boolean closed;
|
||||
private String key;
|
||||
private String bucket;
|
||||
private AmazonS3Client client;
|
||||
private Progressable progress;
|
||||
private long partSize;
|
||||
private int partSizeThreshold;
|
||||
private S3AFileSystem fs;
|
||||
private CannedAccessControlList cannedACL;
|
||||
private FileSystem.Statistics statistics;
|
||||
private LocalDirAllocator lDirAlloc;
|
||||
private String serverSideEncryptionAlgorithm;
|
||||
|
||||
public static final Logger LOG = S3AFileSystem.LOG;
|
||||
|
||||
public S3AOutputStream(Configuration conf, AmazonS3Client client,
|
||||
S3AFileSystem fs, String bucket, String key, Progressable progress,
|
||||
CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
|
||||
String serverSideEncryptionAlgorithm)
|
||||
throws IOException {
|
||||
this.bucket = bucket;
|
||||
this.key = key;
|
||||
this.client = client;
|
||||
this.progress = progress;
|
||||
this.fs = fs;
|
||||
this.cannedACL = cannedACL;
|
||||
this.statistics = statistics;
|
||||
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
|
||||
|
||||
partSize = conf.getLong(NEW_MULTIPART_SIZE,
|
||||
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
|
||||
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
|
||||
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
|
||||
|
||||
if (conf.get(BUFFER_DIR, null) != null) {
|
||||
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
|
||||
} else {
|
||||
lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
|
||||
}
|
||||
|
||||
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
|
||||
closed = false;
|
||||
|
||||
LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile);
|
||||
|
||||
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
backupStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
backupStream.close();
|
||||
LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
|
||||
LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
|
||||
|
||||
|
||||
try {
|
||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
||||
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
|
||||
|
||||
TransferManager transfers = new TransferManager(client);
|
||||
transfers.setConfiguration(transferConfiguration);
|
||||
|
||||
final ObjectMetadata om = new ObjectMetadata();
|
||||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||
om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
||||
}
|
||||
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile);
|
||||
putObjectRequest.setCannedAcl(cannedACL);
|
||||
putObjectRequest.setMetadata(om);
|
||||
|
||||
Upload upload = transfers.upload(putObjectRequest);
|
||||
|
||||
ProgressableProgressListener listener =
|
||||
new ProgressableProgressListener(upload, progress, statistics);
|
||||
upload.addProgressListener(listener);
|
||||
|
||||
upload.waitForUploadResult();
|
||||
|
||||
long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred();
|
||||
if (statistics != null && delta != 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
|
||||
}
|
||||
statistics.incrementBytesWritten(delta);
|
||||
}
|
||||
|
||||
// This will delete unnecessary fake parent directories
|
||||
fs.finishedWrite(key);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (!backupFile.delete()) {
|
||||
LOG.warn("Could not delete temporary s3a file: " + backupFile);
|
||||
}
|
||||
super.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
LOG.info("OutputStream for key '" + key + "' upload complete");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
backupStream.write(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
backupStream.write(b, off, len);
|
||||
}
|
||||
|
||||
public static class ProgressableProgressListener implements ProgressListener {
|
||||
private Progressable progress;
|
||||
private FileSystem.Statistics statistics;
|
||||
private long lastBytesTransferred;
|
||||
private Upload upload;
|
||||
|
||||
public ProgressableProgressListener(Upload upload, Progressable progress,
|
||||
FileSystem.Statistics statistics) {
|
||||
this.upload = upload;
|
||||
this.progress = progress;
|
||||
this.statistics = statistics;
|
||||
this.lastBytesTransferred = 0;
|
||||
}
|
||||
|
||||
public void progressChanged(ProgressEvent progressEvent) {
|
||||
if (progress != null) {
|
||||
progress.progress();
|
||||
}
|
||||
|
||||
// There are 3 http ops here, but this should be close enough for now
|
||||
if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE ||
|
||||
progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
|
||||
long transferred = upload.getProgress().getBytesTransferred();
|
||||
long delta = transferred - lastBytesTransferred;
|
||||
if (statistics != null && delta != 0) {
|
||||
statistics.incrementBytesWritten(delta);
|
||||
}
|
||||
|
||||
lastBytesTransferred = transferred;
|
||||
}
|
||||
|
||||
public long getLastBytesTransferred() {
|
||||
return lastBytesTransferred;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,3 +15,4 @@
|
|||
|
||||
org.apache.hadoop.fs.s3.S3FileSystem
|
||||
org.apache.hadoop.fs.s3native.NativeS3FileSystem
|
||||
org.apache.hadoop.fs.s3a.S3AFileSystem
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
|
||||
|
||||
/**
|
||||
* The contract of S3A: only enabled if the test bucket is provided
|
||||
*/
|
||||
public class S3AContract extends AbstractBondedFSContract {
|
||||
|
||||
public static final String CONTRACT_XML = "contract/s3a.xml";
|
||||
|
||||
|
||||
public S3AContract(Configuration conf) {
|
||||
super(conf);
|
||||
//insert the base features
|
||||
addConfResource(CONTRACT_XML);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "s3a";
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
||||
public class TestS3AContractCreate extends AbstractContractCreateTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testOverwriteEmptyDirectory() throws Throwable {
|
||||
ContractTestUtils.skip(
|
||||
"blobstores can't distinguish empty directories from files");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
public class TestS3AContractDelete extends AbstractContractDeleteTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
/**
|
||||
* Test dir operations on S3
|
||||
*/
|
||||
public class TestS3AContractMkdir extends AbstractContractMkdirTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
public class TestS3AContractOpen extends AbstractContractOpenTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
|
||||
public class TestS3AContractRename extends AbstractContractRenameTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testRenameDirIntoExistingDir() throws Throwable {
|
||||
describe("Verify renaming a dir into an existing dir puts the files"
|
||||
+" from the source dir into the existing dir"
|
||||
+" and leaves existing files alone");
|
||||
FileSystem fs = getFileSystem();
|
||||
String sourceSubdir = "source";
|
||||
Path srcDir = path(sourceSubdir);
|
||||
Path srcFilePath = new Path(srcDir, "source-256.txt");
|
||||
byte[] srcDataset = dataset(256, 'a', 'z');
|
||||
writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
|
||||
Path destDir = path("dest");
|
||||
|
||||
Path destFilePath = new Path(destDir, "dest-512.txt");
|
||||
byte[] destDateset = dataset(512, 'A', 'Z');
|
||||
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false);
|
||||
assertIsFile(destFilePath);
|
||||
|
||||
boolean rename = fs.rename(srcDir, destDir);
|
||||
Path renamedSrcFilePath = new Path(destDir, "source-256.txt");
|
||||
assertIsFile(destFilePath);
|
||||
assertIsFile(renamedSrcFilePath);
|
||||
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
|
||||
assertTrue("rename returned false though the contents were copied", rename);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
/**
|
||||
* root dir operations against an S3 bucket
|
||||
*/
|
||||
public class TestS3AContractRootDir extends
|
||||
AbstractContractRootDirectoryTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.contract.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
|
||||
public class TestS3AContractSeek extends AbstractContractSeekTest {
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,327 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.junit.Assume.*;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Tests a live S3 system. If you keys and bucket aren't specified, all tests
|
||||
* are marked as passed
|
||||
*
|
||||
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
|
||||
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
|
||||
* properly making it impossible to skip the tests if we don't have a valid
|
||||
* bucket.
|
||||
**/
|
||||
public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest {
|
||||
private static final int TEST_BUFFER_SIZE = 128;
|
||||
private static final int MODULUS = 128;
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class);
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
URI testURI = URI.create(conf.get("test.fs.s3a.name"));
|
||||
|
||||
boolean liveTest = testURI != null && !testURI.equals("s3a:///");
|
||||
|
||||
// This doesn't work with our JUnit 3 style test cases, so instead we'll
|
||||
// make this whole class not run by default
|
||||
assumeTrue(liveTest);
|
||||
|
||||
fs = new S3AFileSystem();
|
||||
fs.initialize(testURI, conf);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.delete(path("/tests3a"), true);
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testMkdirs() throws IOException {
|
||||
// No trailing slash
|
||||
assertTrue(fs.mkdirs(path("/tests3a/a")));
|
||||
assertTrue(fs.exists(path("/tests3a/a")));
|
||||
|
||||
// With trailing slash
|
||||
assertTrue(fs.mkdirs(path("/tests3a/b/")));
|
||||
assertTrue(fs.exists(path("/tests3a/b/")));
|
||||
|
||||
// Two levels deep
|
||||
assertTrue(fs.mkdirs(path("/tests3a/c/a/")));
|
||||
assertTrue(fs.exists(path("/tests3a/c/a/")));
|
||||
|
||||
// Mismatched slashes
|
||||
assertTrue(fs.exists(path("/tests3a/c/a")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testDelete() throws IOException {
|
||||
// Test deleting an empty directory
|
||||
assertTrue(fs.mkdirs(path("/tests3a/d")));
|
||||
assertTrue(fs.delete(path("/tests3a/d"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/d")));
|
||||
|
||||
// Test deleting a deep empty directory
|
||||
assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h")));
|
||||
assertTrue(fs.delete(path("/tests3a/e/f/g"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/e/f/g/h")));
|
||||
assertFalse(fs.exists(path("/tests3a/e/f/g")));
|
||||
assertTrue(fs.exists(path("/tests3a/e/f")));
|
||||
|
||||
// Test delete of just a file
|
||||
writeFile(path("/tests3a/f/f/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/f/f/file")));
|
||||
assertTrue(fs.delete(path("/tests3a/f/f/file"), false));
|
||||
assertFalse(fs.exists(path("/tests3a/f/f/file")));
|
||||
|
||||
|
||||
// Test delete of a path with files in various directories
|
||||
writeFile(path("/tests3a/g/h/i/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/i/file")));
|
||||
writeFile(path("/tests3a/g/h/j/file"), 1000);
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
|
||||
try {
|
||||
assertFalse(fs.delete(path("/tests3a/g/h"), false));
|
||||
fail("Expected delete to fail with recursion turned off");
|
||||
} catch (IOException e) {}
|
||||
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
|
||||
assertTrue(fs.delete(path("/tests3a/g/h"), true));
|
||||
assertFalse(fs.exists(path("/tests3a/g/h/j")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 3600000)
|
||||
public void testOpenCreate() throws IOException {
|
||||
try {
|
||||
createAndReadFileTest(1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
createAndReadFileTest(5 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
createAndReadFileTest(20 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
/*
|
||||
Enable to test the multipart upload
|
||||
try {
|
||||
createAndReadFileTest((long)6 * 1024 * 1024 * 1024);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@Test(timeout = 1200000)
|
||||
public void testRenameFile() throws IOException {
|
||||
Path srcPath = path("/tests3a/a/srcfile");
|
||||
|
||||
final OutputStream outputStream = fs.create(srcPath, false);
|
||||
generateTestData(outputStream, 11 * 1024 * 1024);
|
||||
outputStream.close();
|
||||
|
||||
assertTrue(fs.exists(srcPath));
|
||||
|
||||
Path dstPath = path("/tests3a/b/dstfile");
|
||||
|
||||
assertFalse(fs.rename(srcPath, dstPath));
|
||||
assertTrue(fs.mkdirs(dstPath.getParent()));
|
||||
assertTrue(fs.rename(srcPath, dstPath));
|
||||
assertTrue(fs.exists(dstPath));
|
||||
assertFalse(fs.exists(srcPath));
|
||||
assertTrue(fs.exists(srcPath.getParent()));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testRenameDirectory() throws IOException {
|
||||
Path srcPath = path("/tests3a/a");
|
||||
|
||||
assertTrue(fs.mkdirs(srcPath));
|
||||
writeFile(new Path(srcPath, "b/testfile"), 1024);
|
||||
|
||||
Path nonEmptyPath = path("/tests3a/nonempty");
|
||||
writeFile(new Path(nonEmptyPath, "b/testfile"), 1024);
|
||||
|
||||
assertFalse(fs.rename(srcPath, nonEmptyPath));
|
||||
|
||||
Path dstPath = path("/tests3a/b");
|
||||
assertTrue(fs.rename(srcPath, dstPath));
|
||||
assertFalse(fs.exists(srcPath));
|
||||
assertTrue(fs.exists(new Path(dstPath, "b/testfile")));
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testSeek() throws IOException {
|
||||
Path path = path("/tests3a/testfile.seek");
|
||||
writeFile(path, TEST_BUFFER_SIZE * 10);
|
||||
|
||||
|
||||
FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE);
|
||||
inputStream.seek(inputStream.getPos() + MODULUS);
|
||||
|
||||
testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and reads a file with the given size in S3. The test file is
|
||||
* generated according to a specific pattern.
|
||||
* During the read phase the incoming data stream is also checked against this pattern.
|
||||
*
|
||||
* @param fileSize
|
||||
* the size of the file to be generated in bytes
|
||||
* @throws IOException
|
||||
* thrown if an I/O error occurs while writing or reading the test file
|
||||
*/
|
||||
private void createAndReadFileTest(final long fileSize) throws IOException {
|
||||
final String objectName = UUID.randomUUID().toString();
|
||||
final Path objectPath = new Path("/tests3a/", objectName);
|
||||
|
||||
// Write test file to S3
|
||||
final OutputStream outputStream = fs.create(objectPath, false);
|
||||
generateTestData(outputStream, fileSize);
|
||||
outputStream.close();
|
||||
|
||||
// Now read the same file back from S3
|
||||
final InputStream inputStream = fs.open(objectPath);
|
||||
testReceivedData(inputStream, fileSize);
|
||||
inputStream.close();
|
||||
|
||||
// Delete test file
|
||||
fs.delete(objectPath, false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Receives test data from the given input stream and checks the size of the
|
||||
* data as well as the pattern inside the received data.
|
||||
*
|
||||
* @param inputStream
|
||||
* the input stream to read the test data from
|
||||
* @param expectedSize
|
||||
* the expected size of the data to be read from the input stream in bytes
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while reading the data
|
||||
*/
|
||||
private void testReceivedData(final InputStream inputStream,
|
||||
final long expectedSize) throws IOException {
|
||||
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
|
||||
|
||||
long totalBytesRead = 0;
|
||||
int nextExpectedNumber = 0;
|
||||
while (true) {
|
||||
final int bytesRead = inputStream.read(testBuffer);
|
||||
if (bytesRead < 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
totalBytesRead += bytesRead;
|
||||
|
||||
for (int i = 0; i < bytesRead; ++i) {
|
||||
if (testBuffer[i] != nextExpectedNumber) {
|
||||
throw new IOException("Read number " + testBuffer[i] + " but expected "
|
||||
+ nextExpectedNumber);
|
||||
}
|
||||
|
||||
++nextExpectedNumber;
|
||||
|
||||
if (nextExpectedNumber == MODULUS) {
|
||||
nextExpectedNumber = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (totalBytesRead != expectedSize) {
|
||||
throw new IOException("Expected to read " + expectedSize +
|
||||
" bytes but only received " + totalBytesRead);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generates test data of the given size according to some specific pattern
|
||||
* and writes it to the provided output stream.
|
||||
*
|
||||
* @param outputStream
|
||||
* the output stream to write the data to
|
||||
* @param size
|
||||
* the size of the test data to be generated in bytes
|
||||
* @throws IOException
|
||||
* thrown if an error occurs while writing the data
|
||||
*/
|
||||
private void generateTestData(final OutputStream outputStream,
|
||||
final long size) throws IOException {
|
||||
|
||||
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
|
||||
for (int i = 0; i < testBuffer.length; ++i) {
|
||||
testBuffer[i] = (byte) (i % MODULUS);
|
||||
}
|
||||
|
||||
long bytesWritten = 0;
|
||||
while (bytesWritten < size) {
|
||||
|
||||
final long diff = size - bytesWritten;
|
||||
if (diff < testBuffer.length) {
|
||||
outputStream.write(testBuffer, 0, (int)diff);
|
||||
bytesWritten += diff;
|
||||
} else {
|
||||
outputStream.write(testBuffer);
|
||||
bytesWritten += testBuffer.length;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeFile(Path name, int fileSize) throws IOException {
|
||||
final OutputStream outputStream = fs.create(name, false);
|
||||
generateTestData(outputStream, fileSize);
|
||||
outputStream.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<!--
|
||||
S3A is a blobstore, with very different behavior than a
|
||||
classic filesystem.
|
||||
-->
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.root-tests-enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.test.random-seek-count</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.is-blobstore</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.is-case-sensitive</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rename-returns-false-if-source-missing</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-append</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-atomic-directory-delete</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-atomic-rename</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-block-locality</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-concat</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-seek</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-seek-on-closed-file</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rejects-seek-past-eof</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-strict-exceptions</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-unix-permissions</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rename-overwrites-dest</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -77,6 +77,11 @@
|
|||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.supports-seek-on-closed-file</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.contract.rejects-seek-past-eof</name>
|
||||
<value>true</value>
|
||||
|
|
|
@ -132,14 +132,8 @@
|
|||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-core-asl</artifactId>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
|
Loading…
Reference in New Issue