Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-15 15:30:19 -07:00
commit a7814e1aa4
39 changed files with 2791 additions and 69 deletions

View File

@ -342,6 +342,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10893. isolated classloader on the client side (Sangjin Lee via
jlowe)
HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan
Mendelson and Dave Wang via atm)
IMPROVEMENTS
HADOOP-10808. Remove unused native code for munlock. (cnauroth)
@ -705,6 +708,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang
via cmccabe)
HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
(cmccabe)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HADOOP-10734. Implement high-performance secure random number sources.

View File

@ -214,4 +214,13 @@
A special value of "*" means all users are allowed.</description>
</property>
<property>
<name>security.applicationhistory.protocol.acl</name>
<value>*</value>
<description>ACL for ApplicationHistoryProtocol, used by the timeline
server and the generic history service client to communicate with each other.
The ACL is a comma-separated list of user and group names. The user and
group list is separated by a blank. For e.g. "alice,bob users,wheel".
A special value of "*" means all users are allowed.</description>
</property>
</configuration>

View File

@ -174,6 +174,11 @@ log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
# Jets3t library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
# AWS SDK & S3A FileSystem
log4j.logger.com.amazonaws=ERROR
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
#
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.

View File

@ -143,9 +143,11 @@ public abstract class HAAdmin extends Configured implements Tool {
}
/* returns true if other target node is active or some exception occurred
and forceActive was not set */
if(!cmd.hasOption(FORCEACTIVE)) {
if(isOtherTargetNodeActive(argv[0], cmd.hasOption(FORCEACTIVE))) {
return -1;
}
}
HAServiceTarget target = resolveTarget(argv[0]);
if (!checkManualStateManagementOK(target)) {
return -1;

View File

@ -37,6 +37,8 @@
#include <sys/types.h>
#include <unistd.h>
#define ZERO_FULLY_BUF_SIZE 8192
static pthread_mutex_t g_rand_lock = PTHREAD_MUTEX_INITIALIZER;
JNIEXPORT void JNICALL
@ -83,6 +85,24 @@ done:
}
}
static int zero_fully(int fd, jint length)
{
char buf[ZERO_FULLY_BUF_SIZE];
int res;
memset(buf, 0, sizeof(buf));
while (length > 0) {
res = write(fd, buf,
(length > ZERO_FULLY_BUF_SIZE) ? ZERO_FULLY_BUF_SIZE : length);
if (res < 0) {
if (errno == EINTR) continue;
return errno;
}
length -= res;
}
return 0;
}
JNIEXPORT jobject JNICALL
Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0(
JNIEnv *env, jclass clazz, jstring jprefix, jstring jpath, jint length)
@ -136,12 +156,20 @@ Java_org_apache_hadoop_io_nativeio_SharedFileDescriptorFactory_createDescriptor0
(*env)->Throw(env, jthr);
goto done;
}
if (ftruncate(fd, length) < 0) {
jthr = newIOException(env, "ftruncate(%s, %d) failed: error %d (%s)",
ret = zero_fully(fd, length);
if (ret) {
jthr = newIOException(env, "zero_fully(%s, %d) failed: error %d (%s)",
path, length, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
if (lseek(fd, 0, SEEK_SET) < 0) {
ret = errno;
jthr = newIOException(env, "lseek(%s, 0, SEEK_SET) failed: error %d (%s)",
path, ret, terror(ret));
(*env)->Throw(env, jthr);
goto done;
}
jret = fd_create(env, fd); // throws exception on error.
done:

View File

@ -689,6 +689,92 @@ for ldap providers in the same way as above does.
</description>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>AWS access key ID. Omit for Role-based authentication.</description>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>AWS secret key. Omit for Role-based authentication.</description>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
<property>
<name>fs.s3a.attempts.maximum</name>
<value>10</value>
<description>How many times we should retry commands on transient errors.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>5000</value>
<description>Socket connection timeout in seconds.</description>
</property>
<property>
<name>fs.s3a.paging.maximum</name>
<value>5000</value>
<description>How many keys to request from S3 when doing
directory listings at a time.</description>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
<description>How big (in bytes) to split upload or copy operations up into.</description>
</property>
<property>
<name>fs.s3a.multipart.threshold</name>
<value>2147483647</value>
<description>Threshold before uploads or copies use parallel multipart operations.</description>
</property>
<property>
<name>fs.s3a.acl.default</name>
<description>Set a canned ACL for newly created and copied objects. Value may be private,
public-read, public-read-write, authenticated-read, log-delivery-write,
bucket-owner-read, or bucket-owner-full-control.</description>
</property>
<property>
<name>fs.s3a.multipart.purge</name>
<value>false</value>
<description>True if you want to purge existing multipart uploads that may not have been
completed/aborted correctly</description>
</property>
<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
<description>Minimum age in seconds of multipart uploads to purge</description>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value>
<description>Comma separated list of directories that will be used to buffer file
uploads to.</description>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
<description>The implementation class of the S3A Filesystem</description>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>

View File

@ -130,6 +130,10 @@
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
@ -169,6 +173,10 @@
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>

View File

@ -455,6 +455,12 @@ Release 2.6.0 - UNRELEASED
HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via
Colin Patrick McCabe)
HDFS-7061. Add test to verify encryption zone creation after NameNode
restart without saving namespace. (Stephen Chu via wang)
HDFS-7059. HAadmin transtionToActive with forceActive option can show
confusing message.
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -644,6 +650,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7045. Fix NameNode deadlock when opening file under /.reserved path.
(Yi Liu via wang)
HDFS-7032. Add WebHDFS support for reading and writing to encryption zones.
(clamb via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -231,11 +231,13 @@ public class DatanodeWebHdfsMethods {
DFSClient dfsclient = newDfsClient(nnId, conf);
FSDataOutputStream out = null;
try {
out = new FSDataOutputStream(dfsclient.create(
out = dfsclient.createWrappedOutputStream(dfsclient.create(
fullpath, permission.getFsPermission(),
overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE),
replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
overwrite.getValue() ?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE),
replication.getValue(conf), blockSize.getValue(conf), null,
b, null), null);
IOUtils.copyBytes(in, out, b);
out.close();
out = null;
@ -418,7 +420,8 @@ public class DatanodeWebHdfsMethods {
final DFSClient dfsclient = newDfsClient(nnId, conf);
HdfsDataInputStream in = null;
try {
in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
in = dfsclient.createWrappedInputStream(
dfsclient.open(fullpath, b, true));
in.seek(offset.getValue());
} catch(IOException ioe) {
IOUtils.cleanup(LOG, in);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSTestWrapper;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestWrapper;
@ -62,6 +63,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -338,6 +341,16 @@ public class TestEncryptionZones {
cluster.restartNameNode(true);
assertNumZones(numZones);
assertZonePresent(null, zone1.toString());
// Verify newly added ez is present after restarting the NameNode
// without persisting the namespace.
Path nonpersistZone = new Path("/nonpersistZone");
fsWrapper.mkdir(nonpersistZone, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(nonpersistZone, TEST_KEY);
numZones++;
cluster.restartNameNode(true);
assertNumZones(numZones);
assertZonePresent(null, nonpersistZone.toString());
}
/**
@ -560,6 +573,55 @@ public class TestEncryptionZones {
verifyFilesEqual(fs, encFile1, encFile2, len);
}
@Test(timeout = 120000)
public void testReadWriteUsingWebHdfs() throws Exception {
final HdfsAdmin dfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
final FileSystem webHdfsFs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsFileSystem.SCHEME);
final Path zone = new Path("/zone");
fs.mkdirs(zone);
dfsAdmin.createEncryptionZone(zone, TEST_KEY);
/* Create an unencrypted file for comparison purposes. */
final Path unencFile = new Path("/unenc");
final int len = 8192;
DFSTestUtil.createFile(webHdfsFs, unencFile, len, (short) 1, 0xFEED);
/*
* Create the same file via webhdfs, but this time encrypted. Compare it
* using both webhdfs and DFS.
*/
final Path encFile1 = new Path(zone, "myfile");
DFSTestUtil.createFile(webHdfsFs, encFile1, len, (short) 1, 0xFEED);
verifyFilesEqual(webHdfsFs, unencFile, encFile1, len);
verifyFilesEqual(fs, unencFile, encFile1, len);
/*
* Same thing except this time create the encrypted file using DFS.
*/
final Path encFile2 = new Path(zone, "myfile2");
DFSTestUtil.createFile(fs, encFile2, len, (short) 1, 0xFEED);
verifyFilesEqual(webHdfsFs, unencFile, encFile2, len);
verifyFilesEqual(fs, unencFile, encFile2, len);
/* Verify appending to files works correctly. */
appendOneByte(fs, unencFile);
appendOneByte(webHdfsFs, encFile1);
appendOneByte(fs, encFile2);
verifyFilesEqual(webHdfsFs, unencFile, encFile1, len);
verifyFilesEqual(fs, unencFile, encFile1, len);
verifyFilesEqual(webHdfsFs, unencFile, encFile2, len);
verifyFilesEqual(fs, unencFile, encFile2, len);
}
private void appendOneByte(FileSystem fs, Path p) throws IOException {
final FSDataOutputStream out = fs.append(p);
out.write((byte) 0x123);
out.close();
}
@Test(timeout = 60000)
public void testCipherSuiteNegotiation() throws Exception {
final HdfsAdmin dfsAdmin =

View File

@ -232,26 +232,6 @@ public class TestDFSHAAdminMiniCluster {
assertFalse("Both namenodes cannot be active", nn1.isActiveState()
&& nn2.isActiveState());
/* This test case doesn't allow nn2 to transition to Active even with
forceActive switch since nn1 is already active */
if(nn1.getState() != null && !nn1.getState().
equals(HAServiceState.STANDBY.name()) ) {
cluster.transitionToStandby(0);
}
if(nn2.getState() != null && !nn2.getState().
equals(HAServiceState.STANDBY.name()) ) {
cluster.transitionToStandby(1);
}
//Making sure both the namenode are in standby state
assertTrue(nn1.isStandbyState());
assertTrue(nn2.isStandbyState());
runTool("-transitionToActive", "nn1");
runTool("-transitionToActive", "nn2","--forceactive");
assertFalse("Both namenodes cannot be active even though with forceActive",
nn1.isActiveState() && nn2.isActiveState());
/* In this test case, we have deliberately shut down nn1 and this will
cause HAAAdmin#isOtherTargetNodeActive to throw an Exception
and transitionToActive for nn2 with forceActive switch will succeed

View File

@ -61,8 +61,9 @@
<!-- jersey version -->
<jersey.version>1.9</jersey.version>
<!-- jackson version -->
<!-- jackson versions -->
<jackson.version>1.9.13</jackson.version>
<jackson2.version>2.2.3</jackson2.version>
<!-- ProtocolBuffer version, used to verify the protoc version and -->
<!-- define the protobuf JAR version -->
@ -581,13 +582,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.2</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
@ -674,6 +669,21 @@
<artifactId>jackson-xc</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson2.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson2.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson2.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>

View File

@ -100,6 +100,16 @@
<type>test-jar</type>
</dependency>
<!-- see ../../hadoop-project/pom.xml for versions -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.AWSCredentials;
public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
public AWSCredentials getCredentials() {
return new AnonymousAWSCredentials();
}
public void refresh() {}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentials;
public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
private String accessKey;
private String secretKey;
public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}
public AWSCredentials getCredentials() {
if (accessKey != null && secretKey != null) {
return new BasicAWSCredentials(accessKey, secretKey);
}
throw new AmazonClientException(
"Access key or secret key is null");
}
public void refresh() {}
@Override
public String toString() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
public class Constants {
// s3 access key
public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
// s3 secret key
public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
// number of simultaneous connections to s3
public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
// connect to s3 over ssl?
public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections";
public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
// number of times we should retry errors
public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
// seconds until we give up on a connection to s3
public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
// number of records to get while paging through a directory listing
public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
// size of each of or multipart pieces in bytes
public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
// minimum size in bytes before we start a multipart uploads or copy
public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize";
public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
// private | public-read | public-read-write | authenticated-read |
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
public static final String DEFAULT_CANNED_ACL = "";
// should we try to purge old multipart uploads when starting up
public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart";
public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
// purge any multipart uploads older than this number of seconds
public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge";
public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
// s3 server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";
public static final String S3N_FOLDER_SUFFIX = "_$folder$";
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
public class S3AFileStatus extends FileStatus {
private boolean isEmptyDirectory;
// Directories
public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) {
super(0, isdir, 1, 0, 0, path);
isEmptyDirectory = isemptydir;
}
// Files
public S3AFileStatus(long length, long modification_time, Path path) {
super(length, false, 1, 0, modification_time, path);
isEmptyDirectory = false;
}
public boolean isEmptyDirectory() {
return isEmptyDirectory;
}
/** Compare if this object is equal to another object
* @param o the object to be compared.
* @return true if two file status has the same path name; false if not.
*/
@Override
public boolean equals(Object o) {
return super.equals(o);
}
/**
* Returns a hash code value for the object, which is defined as
* the hash code of the path name.
*
* @return a hash code value for the path name.
*/
@Override
public int hashCode() {
return super.hashCode();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.SocketException;
public class S3AInputStream extends FSInputStream {
private long pos;
private boolean closed;
private S3ObjectInputStream wrappedStream;
private S3Object wrappedObject;
private FileSystem.Statistics stats;
private AmazonS3Client client;
private String bucket;
private String key;
private long contentLength;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
FileSystem.Statistics stats) {
this.bucket = bucket;
this.key = key;
this.contentLength = contentLength;
this.client = client;
this.stats = stats;
this.pos = 0;
this.closed = false;
this.wrappedObject = null;
this.wrappedStream = null;
}
private void openIfNeeded() throws IOException {
if (wrappedObject == null) {
reopen(0);
}
}
private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.abort();
}
if (pos < 0) {
throw new EOFException("Trying to seek to a negative offset " + pos);
}
if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException("Trying to seek to an offset " + pos +
" past the end of the file");
}
LOG.info("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1);
wrappedObject = client.getObject(request);
wrappedStream = wrappedObject.getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
this.pos = pos;
}
@Override
public synchronized long getPos() throws IOException {
return pos;
}
@Override
public synchronized void seek(long pos) throws IOException {
if (this.pos == pos) {
return;
}
LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
reopen(pos);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
openIfNeeded();
int byteRead;
try {
byteRead = wrappedStream.read();
} catch (SocketTimeoutException e) {
LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read();
} catch (SocketException e) {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read();
}
if (byteRead >= 0) {
pos++;
}
if (stats != null && byteRead >= 0) {
stats.incrementBytesRead(1);
}
return byteRead;
}
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
openIfNeeded();
int byteRead;
try {
byteRead = wrappedStream.read(buf, off, len);
} catch (SocketTimeoutException e) {
LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read(buf, off, len);
} catch (SocketException e) {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read(buf, off, len);
}
if (byteRead > 0) {
pos += byteRead;
}
if (stats != null && byteRead > 0) {
stats.incrementBytesRead(byteRead);
}
return byteRead;
}
@Override
public synchronized void close() throws IOException {
super.close();
closed = true;
if (wrappedObject != null) {
wrappedObject.close();
}
}
@Override
public synchronized int available() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
long remaining = this.contentLength - this.pos;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}
@Override
public boolean markSupported() {
return false;
}
}

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import static org.apache.hadoop.fs.s3a.Constants.*;
public class S3AOutputStream extends OutputStream {
private OutputStream backupStream;
private File backupFile;
private boolean closed;
private String key;
private String bucket;
private AmazonS3Client client;
private Progressable progress;
private long partSize;
private int partSizeThreshold;
private S3AFileSystem fs;
private CannedAccessControlList cannedACL;
private FileSystem.Statistics statistics;
private LocalDirAllocator lDirAlloc;
private String serverSideEncryptionAlgorithm;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, AmazonS3Client client,
S3AFileSystem fs, String bucket, String key, Progressable progress,
CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
String serverSideEncryptionAlgorithm)
throws IOException {
this.bucket = bucket;
this.key = key;
this.client = client;
this.progress = progress;
this.fs = fs;
this.cannedACL = cannedACL;
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = conf.getLong(NEW_MULTIPART_SIZE,
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
if (conf.get(BUFFER_DIR, null) != null) {
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
} else {
lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
}
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
closed = false;
LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
}
@Override
public void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
backupStream.close();
LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
try {
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
TransferManager transfers = new TransferManager(client);
transfers.setConfiguration(transferConfiguration);
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(om);
Upload upload = transfers.upload(putObjectRequest);
ProgressableProgressListener listener =
new ProgressableProgressListener(upload, progress, statistics);
upload.addProgressListener(listener);
upload.waitForUploadResult();
long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred();
if (statistics != null && delta != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
}
statistics.incrementBytesWritten(delta);
}
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: " + backupFile);
}
super.close();
closed = true;
}
LOG.info("OutputStream for key '" + key + "' upload complete");
}
@Override
public void write(int b) throws IOException {
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
backupStream.write(b, off, len);
}
public static class ProgressableProgressListener implements ProgressListener {
private Progressable progress;
private FileSystem.Statistics statistics;
private long lastBytesTransferred;
private Upload upload;
public ProgressableProgressListener(Upload upload, Progressable progress,
FileSystem.Statistics statistics) {
this.upload = upload;
this.progress = progress;
this.statistics = statistics;
this.lastBytesTransferred = 0;
}
public void progressChanged(ProgressEvent progressEvent) {
if (progress != null) {
progress.progress();
}
// There are 3 http ops here, but this should be close enough for now
if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE ||
progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
statistics.incrementWriteOps(1);
}
long transferred = upload.getProgress().getBytesTransferred();
long delta = transferred - lastBytesTransferred;
if (statistics != null && delta != 0) {
statistics.incrementBytesWritten(delta);
}
lastBytesTransferred = transferred;
}
public long getLastBytesTransferred() {
return lastBytesTransferred;
}
}
}

View File

@ -15,3 +15,4 @@
org.apache.hadoop.fs.s3.S3FileSystem
org.apache.hadoop.fs.s3native.NativeS3FileSystem
org.apache.hadoop.fs.s3a.S3AFileSystem

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
/**
* The contract of S3A: only enabled if the test bucket is provided
*/
public class S3AContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "contract/s3a.xml";
public S3AContract(Configuration conf) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
}
@Override
public String getScheme() {
return "s3a";
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
public class TestS3AContractCreate extends AbstractContractCreateTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Override
public void testOverwriteEmptyDirectory() throws Throwable {
ContractTestUtils.skip(
"blobstores can't distinguish empty directories from files");
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestS3AContractDelete extends AbstractContractDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Test dir operations on S3
*/
public class TestS3AContractMkdir extends AbstractContractMkdirTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestS3AContractOpen extends AbstractContractOpenTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
public class TestS3AContractRename extends AbstractContractRenameTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Override
public void testRenameDirIntoExistingDir() throws Throwable {
describe("Verify renaming a dir into an existing dir puts the files"
+" from the source dir into the existing dir"
+" and leaves existing files alone");
FileSystem fs = getFileSystem();
String sourceSubdir = "source";
Path srcDir = path(sourceSubdir);
Path srcFilePath = new Path(srcDir, "source-256.txt");
byte[] srcDataset = dataset(256, 'a', 'z');
writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
Path destDir = path("dest");
Path destFilePath = new Path(destDir, "dest-512.txt");
byte[] destDateset = dataset(512, 'A', 'Z');
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false);
assertIsFile(destFilePath);
boolean rename = fs.rename(srcDir, destDir);
Path renamedSrcFilePath = new Path(destDir, "source-256.txt");
assertIsFile(destFilePath);
assertIsFile(renamedSrcFilePath);
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
assertTrue("rename returned false though the contents were copied", rename);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* root dir operations against an S3 bucket
*/
public class TestS3AContractRootDir extends
AbstractContractRootDirectoryTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestS3AContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
}

View File

@ -0,0 +1,327 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.junit.Assume.*;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.UUID;
/**
* Tests a live S3 system. If you keys and bucket aren't specified, all tests
* are marked as passed
*
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
* properly making it impossible to skip the tests if we don't have a valid
* bucket.
**/
public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest {
private static final int TEST_BUFFER_SIZE = 128;
private static final int MODULUS = 128;
protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class);
@Override
public void setUp() throws Exception {
Configuration conf = new Configuration();
URI testURI = URI.create(conf.get("test.fs.s3a.name"));
boolean liveTest = testURI != null && !testURI.equals("s3a:///");
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
assumeTrue(liveTest);
fs = new S3AFileSystem();
fs.initialize(testURI, conf);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
if (fs != null) {
fs.delete(path("/tests3a"), true);
}
super.tearDown();
}
@Test(timeout = 10000)
public void testMkdirs() throws IOException {
// No trailing slash
assertTrue(fs.mkdirs(path("/tests3a/a")));
assertTrue(fs.exists(path("/tests3a/a")));
// With trailing slash
assertTrue(fs.mkdirs(path("/tests3a/b/")));
assertTrue(fs.exists(path("/tests3a/b/")));
// Two levels deep
assertTrue(fs.mkdirs(path("/tests3a/c/a/")));
assertTrue(fs.exists(path("/tests3a/c/a/")));
// Mismatched slashes
assertTrue(fs.exists(path("/tests3a/c/a")));
}
@Test(timeout=20000)
public void testDelete() throws IOException {
// Test deleting an empty directory
assertTrue(fs.mkdirs(path("/tests3a/d")));
assertTrue(fs.delete(path("/tests3a/d"), true));
assertFalse(fs.exists(path("/tests3a/d")));
// Test deleting a deep empty directory
assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h")));
assertTrue(fs.delete(path("/tests3a/e/f/g"), true));
assertFalse(fs.exists(path("/tests3a/e/f/g/h")));
assertFalse(fs.exists(path("/tests3a/e/f/g")));
assertTrue(fs.exists(path("/tests3a/e/f")));
// Test delete of just a file
writeFile(path("/tests3a/f/f/file"), 1000);
assertTrue(fs.exists(path("/tests3a/f/f/file")));
assertTrue(fs.delete(path("/tests3a/f/f/file"), false));
assertFalse(fs.exists(path("/tests3a/f/f/file")));
// Test delete of a path with files in various directories
writeFile(path("/tests3a/g/h/i/file"), 1000);
assertTrue(fs.exists(path("/tests3a/g/h/i/file")));
writeFile(path("/tests3a/g/h/j/file"), 1000);
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
try {
assertFalse(fs.delete(path("/tests3a/g/h"), false));
fail("Expected delete to fail with recursion turned off");
} catch (IOException e) {}
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
assertTrue(fs.delete(path("/tests3a/g/h"), true));
assertFalse(fs.exists(path("/tests3a/g/h/j")));
}
@Test(timeout = 3600000)
public void testOpenCreate() throws IOException {
try {
createAndReadFileTest(1024);
} catch (IOException e) {
fail(e.getMessage());
}
try {
createAndReadFileTest(5 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
try {
createAndReadFileTest(20 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
/*
Enable to test the multipart upload
try {
createAndReadFileTest((long)6 * 1024 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
*/
}
@Test(timeout = 1200000)
public void testRenameFile() throws IOException {
Path srcPath = path("/tests3a/a/srcfile");
final OutputStream outputStream = fs.create(srcPath, false);
generateTestData(outputStream, 11 * 1024 * 1024);
outputStream.close();
assertTrue(fs.exists(srcPath));
Path dstPath = path("/tests3a/b/dstfile");
assertFalse(fs.rename(srcPath, dstPath));
assertTrue(fs.mkdirs(dstPath.getParent()));
assertTrue(fs.rename(srcPath, dstPath));
assertTrue(fs.exists(dstPath));
assertFalse(fs.exists(srcPath));
assertTrue(fs.exists(srcPath.getParent()));
}
@Test(timeout = 10000)
public void testRenameDirectory() throws IOException {
Path srcPath = path("/tests3a/a");
assertTrue(fs.mkdirs(srcPath));
writeFile(new Path(srcPath, "b/testfile"), 1024);
Path nonEmptyPath = path("/tests3a/nonempty");
writeFile(new Path(nonEmptyPath, "b/testfile"), 1024);
assertFalse(fs.rename(srcPath, nonEmptyPath));
Path dstPath = path("/tests3a/b");
assertTrue(fs.rename(srcPath, dstPath));
assertFalse(fs.exists(srcPath));
assertTrue(fs.exists(new Path(dstPath, "b/testfile")));
}
@Test(timeout=10000)
public void testSeek() throws IOException {
Path path = path("/tests3a/testfile.seek");
writeFile(path, TEST_BUFFER_SIZE * 10);
FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE);
inputStream.seek(inputStream.getPos() + MODULUS);
testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS);
}
/**
* Creates and reads a file with the given size in S3. The test file is
* generated according to a specific pattern.
* During the read phase the incoming data stream is also checked against this pattern.
*
* @param fileSize
* the size of the file to be generated in bytes
* @throws IOException
* thrown if an I/O error occurs while writing or reading the test file
*/
private void createAndReadFileTest(final long fileSize) throws IOException {
final String objectName = UUID.randomUUID().toString();
final Path objectPath = new Path("/tests3a/", objectName);
// Write test file to S3
final OutputStream outputStream = fs.create(objectPath, false);
generateTestData(outputStream, fileSize);
outputStream.close();
// Now read the same file back from S3
final InputStream inputStream = fs.open(objectPath);
testReceivedData(inputStream, fileSize);
inputStream.close();
// Delete test file
fs.delete(objectPath, false);
}
/**
* Receives test data from the given input stream and checks the size of the
* data as well as the pattern inside the received data.
*
* @param inputStream
* the input stream to read the test data from
* @param expectedSize
* the expected size of the data to be read from the input stream in bytes
* @throws IOException
* thrown if an error occurs while reading the data
*/
private void testReceivedData(final InputStream inputStream,
final long expectedSize) throws IOException {
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
long totalBytesRead = 0;
int nextExpectedNumber = 0;
while (true) {
final int bytesRead = inputStream.read(testBuffer);
if (bytesRead < 0) {
break;
}
totalBytesRead += bytesRead;
for (int i = 0; i < bytesRead; ++i) {
if (testBuffer[i] != nextExpectedNumber) {
throw new IOException("Read number " + testBuffer[i] + " but expected "
+ nextExpectedNumber);
}
++nextExpectedNumber;
if (nextExpectedNumber == MODULUS) {
nextExpectedNumber = 0;
}
}
}
if (totalBytesRead != expectedSize) {
throw new IOException("Expected to read " + expectedSize +
" bytes but only received " + totalBytesRead);
}
}
/**
* Generates test data of the given size according to some specific pattern
* and writes it to the provided output stream.
*
* @param outputStream
* the output stream to write the data to
* @param size
* the size of the test data to be generated in bytes
* @throws IOException
* thrown if an error occurs while writing the data
*/
private void generateTestData(final OutputStream outputStream,
final long size) throws IOException {
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
for (int i = 0; i < testBuffer.length; ++i) {
testBuffer[i] = (byte) (i % MODULUS);
}
long bytesWritten = 0;
while (bytesWritten < size) {
final long diff = size - bytesWritten;
if (diff < testBuffer.length) {
outputStream.write(testBuffer, 0, (int)diff);
bytesWritten += diff;
} else {
outputStream.write(testBuffer);
bytesWritten += testBuffer.length;
}
}
}
private void writeFile(Path name, int fileSize) throws IOException {
final OutputStream outputStream = fs.create(name, false);
generateTestData(outputStream, fileSize);
outputStream.close();
}
}

View File

@ -0,0 +1,105 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
S3A is a blobstore, with very different behavior than a
classic filesystem.
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>10</value>
</property>
<property>
<name>fs.contract.is-blobstore</name>
<value>true</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
</property>
</configuration>

View File

@ -77,6 +77,11 @@
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>

View File

@ -132,14 +132,8 @@
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
</dependency>

View File

@ -8,6 +8,8 @@ Trunk - Unreleased
IMPROVEMENTS
YARN-2438. yarn-env.sh cleanup (aw)
OPTIMIZATIONS
BUG FIXES
@ -359,6 +361,12 @@ Release 2.6.0 - UNRELEASED
header and made it accept multiple origins in CrossOriginFilter. (Jonathan
Eagles via zjshen)
YARN-2549. TestContainerLaunch fails due to classpath problem with hamcrest
classes. (cnauroth)
YARN-2529. Generic history service RPC interface doesn't work when service
authorization is enabled. (Zhijie Shen via jianhe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -1,3 +1,4 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@ -25,27 +26,21 @@
##
###
# Generic settings for YARN
# YARN-specific overrides for generic settings
###
# User for YARN daemons
export HADOOP_YARN_USER=${HADOOP_YARN_USER:-yarn}
# By default, YARN will use HADOOP_LOG_DIR for YARN logging. Specify a custom
# log directory for YARN things here:
# export YARN_LOG_DIR="${HADOOP_LOG_DIR}"
#
# By default, YARN will use HADOOP_CONF_DIR. Specify a custom
# YARN_CONF_DIR here
# export YARN_CONF_DIR="${YARN_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
#
# By default, YARN will use the value of HADOOP_LOGFILE as the 'fallback' log
# file # when log4j settings are not defined. Specify a custom YARN log file
# here:
# export YARN_LOGFILE=${HADOOP_LOGFILE}
# Override Hadoop's log directory & file
# export YARN_LOG_DIR="$HADOOP_YARN_HOME/logs"
# export YARN_LOGFILE='yarn.log'
# Need a custom-to-YARN service-level authorization policy file?
# export YARN_POLICYFILE="yarn-policy.xml"
#Override the log4j settings for all YARN apps
# export YARN_ROOT_LOGGER="INFO,console"
#Override the log4j settings for all YARN apps By default, YARN will use
# HADOOP_ROOT_LOGGER.
# export YARN_ROOT_LOGGER=${HADOOP_ROOT_LOGGER}
###
# Resource Manager specific parameters
@ -125,3 +120,26 @@ export HADOOP_YARN_USER=${HADOOP_YARN_USER:-yarn}
#
#export YARN_TIMELINESERVER_OPTS=
###
# Web App Proxy Server specifc parameters
###
# Specify the max Heapsize for the proxy server using a numerical value
# in the scale of MB. For example, to specify an jvm option of -Xmx1000m, set
# the value to 1000.
# This value will be overridden by an Xmx setting specified in either YARN_OPTS,
# HADOOP_OPTS, and/or YARN_PROXYSERVER_OPTS.
# If not specified, the default value will be picked from either YARN_HEAPMAX
# or JAVA_HEAP_MAX with YARN_HEAPMAX as the preferred option of the two.
#
#export YARN_PROXYSERVER_HEAPSIZE=1000
# Specify the JVM options to be used when starting the proxy server.
# These options will be appended to the options specified as YARN_OPTS
# and therefore may override any similar flags set in YARN_OPTS
#
# See ResourceManager for some examples
#
#export YARN_PROXYSERVER_OPTS=

View File

@ -991,6 +991,10 @@ public class YarnConfiguration extends Configuration {
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONHISTORY_PROTOCOL =
"security.applicationhistory.protocol.acl";
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */
public static final String NM_SLEEP_DELAY_BEFORE_SIGKILL_MS =

View File

@ -26,7 +26,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@ -56,8 +58,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.timeline.security.authorize.TimelinePolicyProvider;
public class ApplicationHistoryClientService extends AbstractService {
private static final Log LOG = LogFactory
@ -88,6 +90,12 @@ public class ApplicationHistoryClientService extends AbstractService {
YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
refreshServiceAcls(conf, new TimelinePolicyProvider());
}
server.start();
this.bindAddress =
conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
@ -118,6 +126,11 @@ public class ApplicationHistoryClientService extends AbstractService {
return this.bindAddress;
}
private void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
private class ApplicationHSClientProtocolHandler implements
ApplicationHistoryProtocol {

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.security.authorize;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocolPB;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* {@link PolicyProvider} for YARN timeline server protocols.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelinePolicyProvider extends PolicyProvider {
@Override
public Service[] getServices() {
return new Service[] {
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONHISTORY_PROTOCOL,
ApplicationHistoryProtocolPB.class)
};
}
}

View File

@ -102,11 +102,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@ -122,11 +117,20 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<!--
junit must be before mockito-all on the classpath. mockito-all bundles its
own copy of the hamcrest classes, but they don't match our junit version.
-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>