HADOOP-10714. AmazonS3Client.deleteObjects() need to be limited to 1000 entries per call. Contributed by Juan Yu.

(cherry picked from commit 6ba52d88ec)
This commit is contained in:
Aaron T. Myers 2014-11-05 17:17:04 -08:00
parent f92ff24f5e
commit 9082fe4e20
29 changed files with 1263 additions and 474 deletions

1
.gitignore vendored
View File

@ -21,3 +21,4 @@ hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml
hadoop-tools/hadoop-openstack/src/test/resources/contract-test-options.xml hadoop-tools/hadoop-openstack/src/test/resources/contract-test-options.xml
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.toolbox hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.toolbox
yarnregistry.pdf yarnregistry.pdf
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml

View File

@ -63,6 +63,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11267. TestSecurityUtil fails when run with JDK8 because of empty HADOOP-11267. TestSecurityUtil fails when run with JDK8 because of empty
principal names. (Stephen Chu via wheat9) principal names. (Stephen Chu via wheat9)
HADOOP-10714. AmazonS3Client.deleteObjects() need to be limited to 1000
entries per call. (Juan Yu via atm)
Release 2.6.0 - UNRELEASED Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -28,53 +28,6 @@ These filesystem bindings must be defined in an XML configuration file, usually
`hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml`. `hadoop-common-project/hadoop-common/src/test/resources/contract-test-options.xml`.
This file is excluded should not be checked in. This file is excluded should not be checked in.
### s3://
In `contract-test-options.xml`, the filesystem name must be defined in the property `fs.contract.test.fs.s3`. The standard configuration options to define the S3 authentication details must also be provided.
Example:
<configuration>
<property>
<name>fs.contract.test.fs.s3</name>
<value>s3://tests3hdfs/</value>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
</configuration>
### s3n://
In `contract-test-options.xml`, the filesystem name must be defined in the property `fs.contract.test.fs.s3n`. The standard configuration options to define the S3N authentication details muse also be provided.
Example:
<configuration>
<property>
<name>fs.contract.test.fs.s3n</name>
<value>s3n://tests3contract</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
### ftp:// ### ftp://

View File

@ -464,11 +464,11 @@ protected void createFile(Path path) throws IOException {
out.close(); out.close();
} }
private void rename(Path src, Path dst, boolean renameSucceeded, protected void rename(Path src, Path dst, boolean renameSucceeded,
boolean srcExists, boolean dstExists) throws IOException { boolean srcExists, boolean dstExists) throws IOException {
assertEquals("Rename result", renameSucceeded, fs.rename(src, dst)); assertEquals("Rename result", renameSucceeded, fs.rename(src, dst));
assertEquals("Source exists", srcExists, fs.exists(src)); assertEquals("Source exists", srcExists, fs.exists(src));
assertEquals("Destination exists", dstExists, fs.exists(dst)); assertEquals("Destination exists" + dst, dstExists, fs.exists(dst));
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.contract; package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -94,4 +95,30 @@ public void testDeleteNonEmptyDirRecursive() throws Throwable {
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file); ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file);
} }
@Test
public void testDeleteDeepEmptyDir() throws Throwable {
mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true);
FileSystem fs = getFileSystem();
ContractTestUtils.assertPathDoesNotExist(fs,
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
ContractTestUtils.assertPathDoesNotExist(fs,
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3"));
ContractTestUtils.assertPathExists(fs, "parent dir is deleted",
path("testDeleteDeepEmptyDir/d1/d2"));
}
@Test
public void testDeleteSingleFile() throws Throwable {
// Test delete of just a file
Path path = path("testDeleteSingleFile/d1/d2");
mkdirs(path);
Path file = new Path(path, "childfile");
ContractTestUtils.writeTextFile(getFileSystem(), file,
"single file to be deleted.", true);
ContractTestUtils.assertPathExists(getFileSystem(),
"single file not created", file);
assertDeleted(file, false);
}
} }

View File

@ -112,4 +112,23 @@ public void testMkdirOverParentFile() throws Throwable {
assertPathExists("mkdir failed", path); assertPathExists("mkdir failed", path);
assertDeleted(path, true); assertDeleted(path, true);
} }
@Test
public void testMkdirSlashHandling() throws Throwable {
describe("verify mkdir slash handling");
FileSystem fs = getFileSystem();
// No trailing slash
assertTrue(fs.mkdirs(path("testmkdir/a")));
assertPathExists("mkdir without trailing slash failed",
path("testmkdir/a"));
// With trailing slash
assertTrue(fs.mkdirs(path("testmkdir/b/")));
assertPathExists("mkdir with trailing slash failed", path("testmkdir/b/"));
// Mismatched slashes
assertPathExists("check path existence without trailing slash failed",
path("testmkdir/b"));
}
} }

View File

@ -182,4 +182,45 @@ public void testRenameFileNonexistentDir() throws Throwable {
assertFalse(renameCreatesDestDirs); assertFalse(renameCreatesDestDirs);
} }
} }
@Test
public void testRenameWithNonEmptySubDir() throws Throwable {
final Path renameTestDir = path("testRenameWithNonEmptySubDir");
final Path srcDir = new Path(renameTestDir, "src1");
final Path srcSubDir = new Path(srcDir, "sub");
final Path finalDir = new Path(renameTestDir, "dest");
FileSystem fs = getFileSystem();
boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR);
ContractTestUtils.rm(fs, renameTestDir, true, false);
fs.mkdirs(srcDir);
fs.mkdirs(finalDir);
ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"),
"this is the file in src dir", false);
ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
"this is the file in src/sub dir", false);
ContractTestUtils.assertPathExists(fs, "not created in src dir",
new Path(srcDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not created in src/sub dir",
new Path(srcSubDir, "subfile.txt"));
fs.rename(srcDir, finalDir);
// Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) {
// POSIX rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
new Path(finalDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
new Path(finalDir, "sub/subfile.txt"));
} else {
// CLI rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
new Path(finalDir, "src1/source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
new Path(finalDir, "src1/sub/subfile.txt"));
}
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted",
new Path(srcDir, "source.txt"));
}
} }

View File

@ -79,6 +79,13 @@ public interface ContractOptions {
String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING = String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING =
"rename-returns-false-if-source-missing"; "rename-returns-false-if-source-missing";
/**
* Flag to indicate that the FS remove dest first if it is an empty directory
* mean the FS honors POSIX rename behavior.
* @{value}
*/
String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir";
/** /**
* Flag to indicate that append is supported * Flag to indicate that append is supported
* @{value} * @{value}

View File

@ -31,8 +31,11 @@
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties; import java.util.Properties;
import java.util.UUID;
/** /**
* Utilities used across test cases * Utilities used across test cases
@ -44,6 +47,13 @@ public class ContractTestUtils extends Assert {
public static final String IO_FILE_BUFFER_SIZE = "io.file.buffer.size"; public static final String IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
// For scale testing, we can repeatedly write small chunk data to generate
// a large file.
public static final String IO_CHUNK_BUFFER_SIZE = "io.chunk.buffer.size";
public static final int DEFAULT_IO_CHUNK_BUFFER_SIZE = 128;
public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;
/** /**
* Assert that a property in the property set matches the expected value * Assert that a property in the property set matches the expected value
* @param props property set * @param props property set
@ -755,5 +765,134 @@ public static void validateFileContent(byte[] concat, byte[][] bytes) {
mismatch); mismatch);
} }
/**
* Receives test data from the given input file and checks the size of the
* data as well as the pattern inside the received data.
*
* @param fs FileSystem
* @param path Input file to be checked
* @param expectedSize the expected size of the data to be read from the
* input file in bytes
* @param bufferLen Pattern length
* @param modulus Pattern modulus
* @throws IOException
* thrown if an error occurs while reading the data
*/
public static void verifyReceivedData(FileSystem fs, Path path,
final long expectedSize,
final int bufferLen,
final int modulus) throws IOException {
final byte[] testBuffer = new byte[bufferLen];
long totalBytesRead = 0;
int nextExpectedNumber = 0;
final InputStream inputStream = fs.open(path);
try {
while (true) {
final int bytesRead = inputStream.read(testBuffer);
if (bytesRead < 0) {
break;
}
totalBytesRead += bytesRead;
for (int i = 0; i < bytesRead; ++i) {
if (testBuffer[i] != nextExpectedNumber) {
throw new IOException("Read number " + testBuffer[i]
+ " but expected " + nextExpectedNumber);
}
++nextExpectedNumber;
if (nextExpectedNumber == modulus) {
nextExpectedNumber = 0;
}
}
}
if (totalBytesRead != expectedSize) {
throw new IOException("Expected to read " + expectedSize +
" bytes but only received " + totalBytesRead);
}
} finally {
inputStream.close();
}
}
/**
* Generates test data of the given size according to some specific pattern
* and writes it to the provided output file.
*
* @param fs FileSystem
* @param path Test file to be generated
* @param size The size of the test data to be generated in bytes
* @param bufferLen Pattern length
* @param modulus Pattern modulus
* @throws IOException
* thrown if an error occurs while writing the data
*/
public static long generateTestFile(FileSystem fs, Path path,
final long size,
final int bufferLen,
final int modulus) throws IOException {
final byte[] testBuffer = new byte[bufferLen];
for (int i = 0; i < testBuffer.length; ++i) {
testBuffer[i] = (byte) (i % modulus);
}
final OutputStream outputStream = fs.create(path, false);
long bytesWritten = 0;
try {
while (bytesWritten < size) {
final long diff = size - bytesWritten;
if (diff < testBuffer.length) {
outputStream.write(testBuffer, 0, (int) diff);
bytesWritten += diff;
} else {
outputStream.write(testBuffer);
bytesWritten += testBuffer.length;
}
}
return bytesWritten;
} finally {
outputStream.close();
}
}
/**
* Creates and reads a file with the given size. The test file is generated
* according to a specific pattern so it can be easily verified even if it's
* a multi-GB one.
* During the read phase the incoming data stream is also checked against
* this pattern.
*
* @param fs FileSystem
* @param parent Test file parent dir path
* @throws IOException
* thrown if an I/O error occurs while writing or reading the test file
*/
public static void createAndVerifyFile(FileSystem fs, Path parent, final long fileSize)
throws IOException {
int testBufferSize = fs.getConf()
.getInt(IO_CHUNK_BUFFER_SIZE, DEFAULT_IO_CHUNK_BUFFER_SIZE);
int modulus = fs.getConf()
.getInt(IO_CHUNK_MODULUS_SIZE, DEFAULT_IO_CHUNK_MODULUS_SIZE);
final String objectName = UUID.randomUUID().toString();
final Path objectPath = new Path(parent, objectName);
// Write test file in a specific pattern
assertEquals(fileSize,
generateTestFile(fs, objectPath, fileSize, testBufferSize, modulus));
assertPathExists(fs, "not created successful", objectPath);
// Now read the same file back and verify its content
try {
verifyReceivedData(fs, objectPath, fileSize, testBufferSize, modulus);
} finally {
// Delete test file
fs.delete(objectPath, false);
}
}
} }

View File

@ -57,6 +57,10 @@ case sensitivity and permission options are determined at run time from OS type
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>true</value>
</property>
<!-- <!--
checksummed filesystems do not support append; see HADOOP-4292 checksummed filesystems do not support append; see HADOOP-4292

View File

@ -83,6 +83,13 @@
<dependencyLocationsEnabled>false</dependencyLocationsEnabled> <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@ -61,10 +61,10 @@ public void initialize(URI uri, Configuration conf) {
String secretAccessKeyProperty = String secretAccessKeyProperty =
String.format("fs.%s.awsSecretAccessKey", scheme); String.format("fs.%s.awsSecretAccessKey", scheme);
if (accessKey == null) { if (accessKey == null) {
accessKey = conf.get(accessKeyProperty); accessKey = conf.getTrimmed(accessKeyProperty);
} }
if (secretAccessKey == null) { if (secretAccessKey == null) {
secretAccessKey = conf.get(secretAccessKeyProperty); secretAccessKey = conf.getTrimmed(secretAccessKeyProperty);
} }
if (accessKey == null && secretAccessKey == null) { if (accessKey == null && secretAccessKey == null) {
throw new IllegalArgumentException("AWS " + throw new IllegalArgumentException("AWS " +

View File

@ -22,10 +22,11 @@
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import org.apache.commons.lang.StringUtils;
public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
private String accessKey; private final String accessKey;
private String secretKey; private final String secretKey;
public BasicAWSCredentialsProvider(String accessKey, String secretKey) { public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
this.accessKey = accessKey; this.accessKey = accessKey;
@ -33,10 +34,9 @@ public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
} }
public AWSCredentials getCredentials() { public AWSCredentials getCredentials() {
if (accessKey != null && secretKey != null) { if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
return new BasicAWSCredentials(accessKey, secretKey); return new BasicAWSCredentials(accessKey, secretKey);
} }
throw new AmazonClientException( throw new AmazonClientException(
"Access key or secret key is null"); "Access key or secret key is null");
} }

View File

@ -20,11 +20,6 @@
public class Constants { public class Constants {
// s3 access key
public static final String ACCESS_KEY = "fs.s3a.access.key";
// s3 secret key
public static final String SECRET_KEY = "fs.s3a.secret.key";
// number of simultaneous connections to s3 // number of simultaneous connections to s3
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
@ -75,4 +70,6 @@ public class Constants {
"fs.s3a.server-side-encryption-algorithm"; "fs.s3a.server-side-encryption-algorithm";
public static final String S3N_FOLDER_SUFFIX = "_$folder$"; public static final String S3N_FOLDER_SUFFIX = "_$folder$";
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
public static final String FS_S3A = "s3a";
} }

View File

@ -27,6 +27,8 @@
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.s3.S3Credentials;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
@ -80,6 +82,8 @@ public class S3AFileSystem extends FileSystem {
private CannedAccessControlList cannedACL; private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm; private String serverSideEncryptionAlgorithm;
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
/** Called after a new FileSystem instance is constructed. /** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc. * @param name a uri whose authority section names the host, port, etc.
@ -95,22 +99,12 @@ public void initialize(URI name, Configuration conf) throws IOException {
this.getWorkingDirectory()); this.getWorkingDirectory());
// Try to get our credentials or just connect anonymously // Try to get our credentials or just connect anonymously
String accessKey = conf.get(ACCESS_KEY, null); S3Credentials s3Credentials = new S3Credentials();
String secretKey = conf.get(SECRET_KEY, null); s3Credentials.initialize(name, conf);
String userInfo = name.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey), new BasicAWSCredentialsProvider(s3Credentials.getAccessKey(),
s3Credentials.getSecretAccessKey()),
new InstanceProfileCredentialsProvider(), new InstanceProfileCredentialsProvider(),
new AnonymousAWSCredentialsProvider() new AnonymousAWSCredentialsProvider()
); );
@ -295,15 +289,12 @@ public boolean rename(Path src, Path dst) throws IOException {
String dstKey = pathToKey(dst); String dstKey = pathToKey(dst);
if (srcKey.length() == 0 || dstKey.length() == 0) { if (srcKey.length() == 0 || dstKey.length() == 0) {
LOG.info("rename: src or dst are empty"); if (LOG.isDebugEnabled()) {
LOG.debug("rename: src or dst are empty");
}
return false; return false;
} }
if (srcKey.equals(dstKey)) {
LOG.info("rename: src and dst refer to the same file");
return true;
}
S3AFileStatus srcStatus; S3AFileStatus srcStatus;
try { try {
srcStatus = getFileStatus(src); srcStatus = getFileStatus(src);
@ -312,20 +303,27 @@ public boolean rename(Path src, Path dst) throws IOException {
return false; return false;
} }
if (srcKey.equals(dstKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("rename: src and dst refer to the same file or directory");
}
return srcStatus.isFile();
}
S3AFileStatus dstStatus = null; S3AFileStatus dstStatus = null;
try { try {
dstStatus = getFileStatus(dst); dstStatus = getFileStatus(dst);
if (srcStatus.isFile() && dstStatus.isDirectory()) {
LOG.info("rename: src is a file and dst is a directory");
return false;
}
if (srcStatus.isDirectory() && dstStatus.isFile()) { if (srcStatus.isDirectory() && dstStatus.isFile()) {
LOG.info("rename: src is a directory and dst is a file"); if (LOG.isDebugEnabled()) {
LOG.debug("rename: src is a directory and dst is a file");
}
return false; return false;
} }
if (dstStatus.isDirectory() && !dstStatus.isEmptyDirectory()) {
return false;
}
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// Parent must exist // Parent must exist
Path parent = dst.getParent(); Path parent = dst.getParent();
@ -346,7 +344,18 @@ public boolean rename(Path src, Path dst) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("rename: renaming file " + src + " to " + dst); LOG.debug("rename: renaming file " + src + " to " + dst);
} }
copyFile(srcKey, dstKey); if (dstStatus != null && dstStatus.isDirectory()) {
String newDstKey = dstKey;
if (!newDstKey.endsWith("/")) {
newDstKey = newDstKey + "/";
}
String filename =
srcKey.substring(pathToKey(src.getParent()).length()+1);
newDstKey = newDstKey + filename;
copyFile(srcKey, newDstKey);
} else {
copyFile(srcKey, dstKey);
}
delete(src, false); delete(src, false);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -362,12 +371,19 @@ public boolean rename(Path src, Path dst) throws IOException {
srcKey = srcKey + "/"; srcKey = srcKey + "/";
} }
//Verify dest is not a child of the source directory
if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot rename a directory to a subdirectory of self");
}
return false;
}
List<DeleteObjectsRequest.KeyVersion> keysToDelete = List<DeleteObjectsRequest.KeyVersion> keysToDelete =
new ArrayList<DeleteObjectsRequest.KeyVersion>(); new ArrayList<DeleteObjectsRequest.KeyVersion>();
if (dstStatus != null && dstStatus.isEmptyDirectory()) { if (dstStatus != null && dstStatus.isEmptyDirectory()) {
copyFile(srcKey, dstKey); // delete unnecessary fake directory.
statistics.incrementWriteOps(1); keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey));
} }
ListObjectsRequest request = new ListObjectsRequest(); ListObjectsRequest request = new ListObjectsRequest();
@ -383,23 +399,29 @@ public boolean rename(Path src, Path dst) throws IOException {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
copyFile(summary.getKey(), newDstKey); copyFile(summary.getKey(), newDstKey);
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
s3.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
keysToDelete.clear();
}
} }
if (objects.isTruncated()) { if (objects.isTruncated()) {
objects = s3.listNextBatchOfObjects(objects); objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
} else { } else {
if (keysToDelete.size() > 0) {
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
s3.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
}
break; break;
} }
} }
if (!keysToDelete.isEmpty()) {
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
deleteRequest.setKeys(keysToDelete);
s3.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
}
} }
if (src.getParent() != dst.getParent()) { if (src.getParent() != dst.getParent()) {
@ -419,7 +441,9 @@ public boolean rename(Path src, Path dst) throws IOException {
* @throws IOException * @throws IOException
*/ */
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
LOG.info("Delete path " + f + " - recursive " + recursive); if (LOG.isDebugEnabled()) {
LOG.debug("Delete path " + f + " - recursive " + recursive);
}
S3AFileStatus status; S3AFileStatus status;
try { try {
status = getFileStatus(f); status = getFileStatus(f);
@ -479,18 +503,26 @@ public boolean delete(Path f, boolean recursive) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Got object to delete " + summary.getKey()); LOG.debug("Got object to delete " + summary.getKey());
} }
}
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket); if (keys.size() == MAX_ENTRIES_TO_DELETE) {
deleteRequest.setKeys(keys); DeleteObjectsRequest deleteRequest =
s3.deleteObjects(deleteRequest); new DeleteObjectsRequest(bucket).withKeys(keys);
statistics.incrementWriteOps(1); s3.deleteObjects(deleteRequest);
keys.clear(); statistics.incrementWriteOps(1);
keys.clear();
}
}
if (objects.isTruncated()) { if (objects.isTruncated()) {
objects = s3.listNextBatchOfObjects(objects); objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
} else { } else {
if (keys.size() > 0) {
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucket).withKeys(keys);
s3.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
}
break; break;
} }
} }
@ -530,7 +562,9 @@ private void createFakeDirectoryIfNecessary(Path f) throws IOException {
public FileStatus[] listStatus(Path f) throws FileNotFoundException, public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException { IOException {
String key = pathToKey(f); String key = pathToKey(f);
LOG.info("List status for path: " + f); if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + f);
}
final List<FileStatus> result = new ArrayList<FileStatus>(); final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f); final FileStatus fileStatus = getFileStatus(f);
@ -640,7 +674,10 @@ public Path getWorkingDirectory() {
// TODO: If we have created an empty file at /foo/bar and we then call // TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
public boolean mkdirs(Path f, FsPermission permission) throws IOException { public boolean mkdirs(Path f, FsPermission permission) throws IOException {
LOG.info("Making directory: " + f); if (LOG.isDebugEnabled()) {
LOG.debug("Making directory: " + f);
}
try { try {
FileStatus fileStatus = getFileStatus(f); FileStatus fileStatus = getFileStatus(f);
@ -680,8 +717,10 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
*/ */
public S3AFileStatus getFileStatus(Path f) throws IOException { public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f); String key = pathToKey(f);
if (LOG.isDebugEnabled()) {
LOG.debug("Getting path status for " + f + " (" + key + ")");
}
LOG.info("Getting path status for " + f + " (" + key + ")");
if (!key.isEmpty()) { if (!key.isEmpty()) {
try { try {
@ -723,7 +762,7 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
} }
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
} else { } else {
LOG.warn("Found file (with /): real file? should not happen: " + key); LOG.warn("Found file (with /): real file? should not happen: {}", key);
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
f.makeQualified(uri, workingDir)); f.makeQualified(uri, workingDir));
@ -753,7 +792,8 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
ObjectListing objects = s3.listObjects(request); ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
if (objects.getCommonPrefixes().size() > 0 || objects.getObjectSummaries().size() > 0) { if (!objects.getCommonPrefixes().isEmpty()
|| objects.getObjectSummaries().size() > 0) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /): " + LOG.debug("Found path as directory (with /): " +
objects.getCommonPrefixes().size() + "/" + objects.getCommonPrefixes().size() + "/" +
@ -806,8 +846,9 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
if (!overwrite && exists(dst)) { if (!overwrite && exists(dst)) {
throw new IOException(dst + " already exists"); throw new IOException(dst + " already exists");
} }
if (LOG.isDebugEnabled()) {
LOG.info("Copying local file from " + src + " to " + dst); LOG.debug("Copying local file from " + src + " to " + dst);
}
// Since we have a local file, we don't need to stream into a temporary file // Since we have a local file, we don't need to stream into a temporary file
LocalFileSystem local = getLocal(getConf()); LocalFileSystem local = getLocal(getConf());
@ -992,7 +1033,7 @@ public int read() throws IOException {
@Deprecated @Deprecated
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks // default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024); return getConf().getLong(FS_S3A_BLOCK_SIZE, 32 * 1024 * 1024);
} }
private void printAmazonServiceException(AmazonServiceException ase) { private void printAmazonServiceException(AmazonServiceException ase) {
@ -1010,6 +1051,6 @@ private void printAmazonClientException(AmazonClientException ace) {
LOG.info("Caught an AmazonClientException, which means the client encountered " + LOG.info("Caught an AmazonClientException, which means the client encountered " +
"a serious internal problem while trying to communicate with S3, " + "a serious internal problem while trying to communicate with S3, " +
"such as not being able to access the network."); "such as not being able to access the network.");
LOG.info("Error Message: " + ace.getMessage()); LOG.info("Error Message: {}" + ace, ace);
} }
} }

View File

@ -22,6 +22,7 @@
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -65,6 +66,7 @@ private void openIfNeeded() throws IOException {
} }
private synchronized void reopen(long pos) throws IOException { private synchronized void reopen(long pos) throws IOException {
if (wrappedStream != null) { if (wrappedStream != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos); LOG.debug("Aborting old stream to open at pos " + pos);
@ -73,15 +75,17 @@ private synchronized void reopen(long pos) throws IOException {
} }
if (pos < 0) { if (pos < 0) {
throw new EOFException("Trying to seek to a negative offset " + pos); throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+" " + pos);
} }
if (contentLength > 0 && pos > contentLength-1) { if (contentLength > 0 && pos > contentLength-1) {
throw new EOFException("Trying to seek to an offset " + pos + throw new EOFException(
" past the end of the file"); FSExceptionMessages.CANNOT_SEEK_PAST_EOF
+ " " + pos);
} }
LOG.info("Actually opening file " + key + " at pos " + pos); LOG.debug("Actually opening file " + key + " at pos " + pos);
GetObjectRequest request = new GetObjectRequest(bucket, key); GetObjectRequest request = new GetObjectRequest(bucket, key);
request.setRange(pos, contentLength-1); request.setRange(pos, contentLength-1);
@ -103,11 +107,14 @@ public synchronized long getPos() throws IOException {
@Override @Override
public synchronized void seek(long pos) throws IOException { public synchronized void seek(long pos) throws IOException {
checkNotClosed();
if (this.pos == pos) { if (this.pos == pos) {
return; return;
} }
LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos)); LOG.debug(
"Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
reopen(pos); reopen(pos);
} }
@ -118,9 +125,7 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
if (closed) { checkNotClosed();
throw new IOException("Stream closed");
}
openIfNeeded(); openIfNeeded();
@ -148,10 +153,8 @@ public synchronized int read() throws IOException {
} }
@Override @Override
public synchronized int read(byte buf[], int off, int len) throws IOException { public synchronized int read(byte[] buf, int off, int len) throws IOException {
if (closed) { checkNotClosed();
throw new IOException("Stream closed");
}
openIfNeeded(); openIfNeeded();
@ -179,6 +182,12 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
return byteRead; return byteRead;
} }
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
super.close(); super.close();
@ -190,9 +199,8 @@ public synchronized void close() throws IOException {
@Override @Override
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
if (closed) { checkNotClosed();
throw new IOException("Stream closed");
}
long remaining = this.contentLength - this.pos; long remaining = this.contentLength - this.pos;
if (remaining > Integer.MAX_VALUE) { if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;

View File

@ -87,7 +87,10 @@ public S3AOutputStream(Configuration conf, AmazonS3Client client,
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
closed = false; closed = false;
LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile); if (LOG.isDebugEnabled()) {
LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " +
this.backupFile);
}
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
} }
@ -104,8 +107,10 @@ public synchronized void close() throws IOException {
} }
backupStream.close(); backupStream.close();
LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); if (LOG.isDebugEnabled()) {
LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold); LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload");
LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
}
try { try {
@ -146,13 +151,14 @@ public synchronized void close() throws IOException {
throw new IOException(e); throw new IOException(e);
} finally { } finally {
if (!backupFile.delete()) { if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: " + backupFile); LOG.warn("Could not delete temporary s3a file: {}", backupFile);
} }
super.close(); super.close();
closed = true; closed = true;
} }
if (LOG.isDebugEnabled()) {
LOG.info("OutputStream for key '" + key + "' upload complete"); LOG.debug("OutputStream for key '" + key + "' upload complete");
}
} }
@Override @Override

View File

@ -0,0 +1,417 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Hadoop-AWS module: Integration with Amazon Web Services
The `hadoop-aws` module provides support for AWS integration. The generated
JAR file, `hadoop-aws.jar` also declares a transitive dependency on all
external artifacts which are needed for this support —enabling downstream
applications to easily use this support.
Features
1. The "classic" `s3:` filesystem for storing objects in Amazon S3 Storage
1. The second-generation, `s3n:` filesystem, making it easy to share
data between hadoop and other applications via the S3 object store
1. The third generation, `s3a:` filesystem. Designed to be a switch in
replacement for `s3n:`, this filesystem binding supports larger files and promises
higher performance.
The specifics of using these filesystems are documented below.
## Warning: Object Stores are not filesystems.
Amazon S3 is an example of "an object store". In order to achieve scalalablity
and especially high availability, S3 has —as many other cloud object stores have
done— relaxed some of the constraints which classic "POSIX" filesystems promise.
Specifically
1. Files that are newly created from the Hadoop Filesystem APIs may not be
immediately visible.
2. File delete and update operations may not immediately propagate. Old
copies of the file may exist for an indeterminate time period.
3. Directory operations: `delete()` and `rename()` are implemented by
recursive file-by-file operations. They take time at least proportional to
the number of files, during which time partial updates may be visible. If
the operations are interrupted, the filesystem is left in an intermediate state.
For further discussion on these topics, please consult
[/filesystem](The Hadoop FileSystem API Definition).
## Warning #2: your AWS credentials are valuable
Your AWS credentials not only pay for services, they offer read and write
access to the data. Anyone with the credentials can not only read your datasets
—they can delete them.
Do not inadvertently share these credentials through means such as
1. Checking in Hadoop configuration files containing the credentials.
1. Logging them to a console, as they invariably end up being seen.
If you do any of these: change your credentials immediately!
## S3
### Authentication properties
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>AWS access key ID</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>AWS secret key</description>
</property>
## S3N
### Authentication properties
<property>
<name>fs.s3n.awsAccessKeyId</name>
<description>AWS access key ID</description>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<description>AWS secret key</description>
</property>
### Other properties
<property>
<name>fs.s3n.block.size</name>
<value>67108864</value>
<description>Block size to use when reading files using the native S3
filesystem (s3n: URIs).</description>
</property>
<property>
<name>fs.s3n.multipart.uploads.enabled</name>
<value>false</value>
<description>Setting this property to true enables multiple uploads to
native S3 filesystem. When uploading a file, it is split into blocks
if the size is larger than fs.s3n.multipart.uploads.block.size.
</description>
</property>
<property>
<name>fs.s3n.multipart.uploads.block.size</name>
<value>67108864</value>
<description>The block size for multipart uploads to native S3 filesystem.
Default size is 64MB.
</description>
</property>
<property>
<name>fs.s3n.multipart.copy.block.size</name>
<value>5368709120</value>
<description>The block size for multipart copy in native S3 filesystem.
Default size is 5GB.
</description>
</property>
<property>
<name>fs.s3n.server-side-encryption-algorithm</name>
<value></value>
<description>Specify a server-side encryption algorithm for S3.
The default is NULL, and the only other currently allowable value is AES256.
</description>
</property>
## S3A
### Authentication properties
<property>
<name>fs.s3a.awsAccessKeyId</name>
<description>AWS access key ID. Omit for Role-based authentication.</description>
</property>
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<description>AWS secret key. Omit for Role-based authentication.</description>
</property>
### Other properties
<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
<description>Controls the maximum number of simultaneous connections to S3.</description>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
<property>
<name>fs.s3a.attempts.maximum</name>
<value>10</value>
<description>How many times we should retry commands on transient errors.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>5000</value>
<description>Socket connection timeout in seconds.</description>
</property>
<property>
<name>fs.s3a.paging.maximum</name>
<value>5000</value>
<description>How many keys to request from S3 when doing
directory listings at a time.</description>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
<description>How big (in bytes) to split upload or copy operations up into.</description>
</property>
<property>
<name>fs.s3a.multipart.threshold</name>
<value>2147483647</value>
<description>Threshold before uploads or copies use parallel multipart operations.</description>
</property>
<property>
<name>fs.s3a.acl.default</name>
<description>Set a canned ACL for newly created and copied objects. Value may be private,
public-read, public-read-write, authenticated-read, log-delivery-write,
bucket-owner-read, or bucket-owner-full-control.</description>
</property>
<property>
<name>fs.s3a.multipart.purge</name>
<value>false</value>
<description>True if you want to purge existing multipart uploads that may not have been
completed/aborted correctly</description>
</property>
<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
<description>Minimum age in seconds of multipart uploads to purge</description>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value>
<description>Comma separated list of directories that will be used to buffer file
uploads to.</description>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
<description>The implementation class of the S3A Filesystem</description>
</property>
## Testing the S3 filesystem clients
To test the S3* filesystem clients, you need to provide two files
which pass in authentication details to the test runner
1. `auth-keys.xml`
1. `core-site.xml`
These are both Hadoop XML configuration files, which must be placed into
`hadoop-tools/hadoop-aws/src/test/resources`.
### `auth-keys.xml`
The presence of this file triggers the testing of the S3 classes.
Without this file, *none of the tests in this module will be executed*
The XML file must contain all the ID/key information needed to connect
each of the filesystem clients to the object stores, and a URL for
each filesystem for its testing.
1. `test.fs.s3n.name` : the URL of the bucket for S3n tests
1. `test.fs.s3a.name` : the URL of the bucket for S3a tests
2. `test.fs.s3.name` : the URL of the bucket for "S3" tests
The contents of each bucket will be destroyed during the test process:
do not use the bucket for any purpose other than testing.
Example:
<configuration>
<property>
<name>test.fs.s3n.name</name>
<value>s3n://test-aws-s3n/</value>
</property>
<property>
<name>test.fs.s3a.name</name>
<value>s3a://test-aws-s3a/</value>
</property>
<property>
<name>test.fs.s3.name</name>
<value>s3a://test-aws-s3/</value>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
<property>
<name>fs.s3a.awsAccessKeyId</name>
<description>AWS access key ID. Omit for Role-based authentication.</description>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<description>AWS secret key. Omit for Role-based authentication.</description>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
</configuration>
## File `contract-test-options.xml`
The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
must be created and configured for the test fileystems.
If a specific file `fs.contract.test.fs.*` test path is not defined for
any of the filesystems, those tests will be skipped.
The standard S3 authentication details must also be provided. This can be
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
through direct XInclude inclustion.
#### s3://
The filesystem name must be defined in the property `fs.contract.test.fs.s3`.
Example:
<property>
<name>fs.contract.test.fs.s3</name>
<value>s3://test-aws-s3/</value>
</property>
### s3n://
In the file `src/test/resources/contract-test-options.xml`, the filesystem
name must be defined in the property `fs.contract.test.fs.s3n`.
The standard configuration options to define the S3N authentication details
must also be provided.
Example:
<property>
<name>fs.contract.test.fs.s3n</name>
<value>s3n://test-aws-s3n/</value>
</property>
### s3a://
In the file `src/test/resources/contract-test-options.xml`, the filesystem
name must be defined in the property `fs.contract.test.fs.s3a`.
The standard configuration options to define the S3N authentication details
must also be provided.
Example:
<property>
<name>fs.contract.test.fs.s3a</name>
<value>s3a://test-aws-s3a/</value>
</property>
### Complete example of `contract-test-options.xml`
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<include xmlns="http://www.w3.org/2001/XInclude"
href="auth-keys.xml"/>
<property>
<name>fs.contract.test.fs.s3</name>
<value>s3://test-aws-s3/</value>
</property>
<property>
<name>fs.contract.test.fs.s3a</name>
<value>s3a://test-aws-s3a/</value>
</property>
<property>
<name>fs.contract.test.fs.s3n</name>
<value>s3n://test-aws-s3n/</value>
</property>
</configuration>
This example pulls in the `auth-keys.xml` file for the credentials.
This provides one single place to keep the keys up to date —and means
that the file `contract-test-options.xml` does not contain any
secret credentials itself.

View File

@ -21,10 +21,10 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest; import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
@ -51,14 +51,11 @@ public void testRenameDirIntoExistingDir() throws Throwable {
Path destFilePath = new Path(destDir, "dest-512.txt"); Path destFilePath = new Path(destDir, "dest-512.txt");
byte[] destDateset = dataset(512, 'A', 'Z'); byte[] destDateset = dataset(512, 'A', 'Z');
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false); writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024,
false);
assertIsFile(destFilePath); assertIsFile(destFilePath);
boolean rename = fs.rename(srcDir, destDir); boolean rename = fs.rename(srcDir, destDir);
Path renamedSrcFilePath = new Path(destDir, "source-256.txt"); assertFalse("s3a doesn't support rename to non-empty directory", rename);
assertIsFile(destFilePath);
assertIsFile(renamedSrcFilePath);
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
assertTrue("rename returned false though the contents were copied", rename);
} }
} }

View File

@ -21,13 +21,15 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemContractBaseTest; import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path; import org.junit.internal.AssumptionViolatedException;
public abstract class S3FileSystemContractBaseTest public abstract class S3FileSystemContractBaseTest
extends FileSystemContractBaseTest { extends FileSystemContractBaseTest {
public static final String KEY_TEST_FS = "test.fs.s3.name";
private FileSystemStore store; private FileSystemStore store;
abstract FileSystemStore getFileSystemStore() throws IOException; abstract FileSystemStore getFileSystemStore() throws IOException;
@ -37,7 +39,12 @@ protected void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
store = getFileSystemStore(); store = getFileSystemStore();
fs = new S3FileSystem(store); fs = new S3FileSystem(store);
fs.initialize(URI.create(conf.get("test.fs.s3.name")), conf); String fsname = conf.get(KEY_TEST_FS);
if (StringUtils.isEmpty(fsname)) {
throw new AssumptionViolatedException(
"No test FS defined in :" + KEY_TEST_FS);
}
fs.initialize(URI.create(fsname), conf);
} }
@Override @Override

View File

@ -1,327 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.junit.Assume.*;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.UUID;
/**
* Tests a live S3 system. If you keys and bucket aren't specified, all tests
* are marked as passed
*
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
* properly making it impossible to skip the tests if we don't have a valid
* bucket.
**/
public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest {
private static final int TEST_BUFFER_SIZE = 128;
private static final int MODULUS = 128;
protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class);
@Override
public void setUp() throws Exception {
Configuration conf = new Configuration();
URI testURI = URI.create(conf.get("test.fs.s3a.name"));
boolean liveTest = testURI != null && !testURI.equals("s3a:///");
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
assumeTrue(liveTest);
fs = new S3AFileSystem();
fs.initialize(testURI, conf);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
if (fs != null) {
fs.delete(path("/tests3a"), true);
}
super.tearDown();
}
@Test(timeout = 10000)
public void testMkdirs() throws IOException {
// No trailing slash
assertTrue(fs.mkdirs(path("/tests3a/a")));
assertTrue(fs.exists(path("/tests3a/a")));
// With trailing slash
assertTrue(fs.mkdirs(path("/tests3a/b/")));
assertTrue(fs.exists(path("/tests3a/b/")));
// Two levels deep
assertTrue(fs.mkdirs(path("/tests3a/c/a/")));
assertTrue(fs.exists(path("/tests3a/c/a/")));
// Mismatched slashes
assertTrue(fs.exists(path("/tests3a/c/a")));
}
@Test(timeout=20000)
public void testDelete() throws IOException {
// Test deleting an empty directory
assertTrue(fs.mkdirs(path("/tests3a/d")));
assertTrue(fs.delete(path("/tests3a/d"), true));
assertFalse(fs.exists(path("/tests3a/d")));
// Test deleting a deep empty directory
assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h")));
assertTrue(fs.delete(path("/tests3a/e/f/g"), true));
assertFalse(fs.exists(path("/tests3a/e/f/g/h")));
assertFalse(fs.exists(path("/tests3a/e/f/g")));
assertTrue(fs.exists(path("/tests3a/e/f")));
// Test delete of just a file
writeFile(path("/tests3a/f/f/file"), 1000);
assertTrue(fs.exists(path("/tests3a/f/f/file")));
assertTrue(fs.delete(path("/tests3a/f/f/file"), false));
assertFalse(fs.exists(path("/tests3a/f/f/file")));
// Test delete of a path with files in various directories
writeFile(path("/tests3a/g/h/i/file"), 1000);
assertTrue(fs.exists(path("/tests3a/g/h/i/file")));
writeFile(path("/tests3a/g/h/j/file"), 1000);
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
try {
assertFalse(fs.delete(path("/tests3a/g/h"), false));
fail("Expected delete to fail with recursion turned off");
} catch (IOException e) {}
assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
assertTrue(fs.delete(path("/tests3a/g/h"), true));
assertFalse(fs.exists(path("/tests3a/g/h/j")));
}
@Test(timeout = 3600000)
public void testOpenCreate() throws IOException {
try {
createAndReadFileTest(1024);
} catch (IOException e) {
fail(e.getMessage());
}
try {
createAndReadFileTest(5 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
try {
createAndReadFileTest(20 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
/*
Enable to test the multipart upload
try {
createAndReadFileTest((long)6 * 1024 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
*/
}
@Test(timeout = 1200000)
public void testRenameFile() throws IOException {
Path srcPath = path("/tests3a/a/srcfile");
final OutputStream outputStream = fs.create(srcPath, false);
generateTestData(outputStream, 11 * 1024 * 1024);
outputStream.close();
assertTrue(fs.exists(srcPath));
Path dstPath = path("/tests3a/b/dstfile");
assertFalse(fs.rename(srcPath, dstPath));
assertTrue(fs.mkdirs(dstPath.getParent()));
assertTrue(fs.rename(srcPath, dstPath));
assertTrue(fs.exists(dstPath));
assertFalse(fs.exists(srcPath));
assertTrue(fs.exists(srcPath.getParent()));
}
@Test(timeout = 10000)
public void testRenameDirectory() throws IOException {
Path srcPath = path("/tests3a/a");
assertTrue(fs.mkdirs(srcPath));
writeFile(new Path(srcPath, "b/testfile"), 1024);
Path nonEmptyPath = path("/tests3a/nonempty");
writeFile(new Path(nonEmptyPath, "b/testfile"), 1024);
assertFalse(fs.rename(srcPath, nonEmptyPath));
Path dstPath = path("/tests3a/b");
assertTrue(fs.rename(srcPath, dstPath));
assertFalse(fs.exists(srcPath));
assertTrue(fs.exists(new Path(dstPath, "b/testfile")));
}
@Test(timeout=10000)
public void testSeek() throws IOException {
Path path = path("/tests3a/testfile.seek");
writeFile(path, TEST_BUFFER_SIZE * 10);
FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE);
inputStream.seek(inputStream.getPos() + MODULUS);
testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS);
}
/**
* Creates and reads a file with the given size in S3. The test file is
* generated according to a specific pattern.
* During the read phase the incoming data stream is also checked against this pattern.
*
* @param fileSize
* the size of the file to be generated in bytes
* @throws IOException
* thrown if an I/O error occurs while writing or reading the test file
*/
private void createAndReadFileTest(final long fileSize) throws IOException {
final String objectName = UUID.randomUUID().toString();
final Path objectPath = new Path("/tests3a/", objectName);
// Write test file to S3
final OutputStream outputStream = fs.create(objectPath, false);
generateTestData(outputStream, fileSize);
outputStream.close();
// Now read the same file back from S3
final InputStream inputStream = fs.open(objectPath);
testReceivedData(inputStream, fileSize);
inputStream.close();
// Delete test file
fs.delete(objectPath, false);
}
/**
* Receives test data from the given input stream and checks the size of the
* data as well as the pattern inside the received data.
*
* @param inputStream
* the input stream to read the test data from
* @param expectedSize
* the expected size of the data to be read from the input stream in bytes
* @throws IOException
* thrown if an error occurs while reading the data
*/
private void testReceivedData(final InputStream inputStream,
final long expectedSize) throws IOException {
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
long totalBytesRead = 0;
int nextExpectedNumber = 0;
while (true) {
final int bytesRead = inputStream.read(testBuffer);
if (bytesRead < 0) {
break;
}
totalBytesRead += bytesRead;
for (int i = 0; i < bytesRead; ++i) {
if (testBuffer[i] != nextExpectedNumber) {
throw new IOException("Read number " + testBuffer[i] + " but expected "
+ nextExpectedNumber);
}
++nextExpectedNumber;
if (nextExpectedNumber == MODULUS) {
nextExpectedNumber = 0;
}
}
}
if (totalBytesRead != expectedSize) {
throw new IOException("Expected to read " + expectedSize +
" bytes but only received " + totalBytesRead);
}
}
/**
* Generates test data of the given size according to some specific pattern
* and writes it to the provided output stream.
*
* @param outputStream
* the output stream to write the data to
* @param size
* the size of the test data to be generated in bytes
* @throws IOException
* thrown if an error occurs while writing the data
*/
private void generateTestData(final OutputStream outputStream,
final long size) throws IOException {
final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
for (int i = 0; i < testBuffer.length; ++i) {
testBuffer[i] = (byte) (i % MODULUS);
}
long bytesWritten = 0;
while (bytesWritten < size) {
final long diff = size - bytesWritten;
if (diff < testBuffer.length) {
outputStream.write(testBuffer, 0, (int)diff);
bytesWritten += diff;
} else {
outputStream.write(testBuffer);
bytesWritten += testBuffer.length;
}
}
}
private void writeFile(Path name, int fileSize) throws IOException {
final OutputStream outputStream = fs.create(name, false);
generateTestData(outputStream, fileSize);
outputStream.close();
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.internal.AssumptionViolatedException;
import java.io.IOException;
import java.net.URI;
public class S3ATestUtils {
public static S3AFileSystem createTestFileSystem(Configuration conf) throws
IOException {
String fsname = conf.getTrimmed(TestS3AFileSystemContract.TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
if (liveTest) {
testURI = URI.create(fsname);
liveTest = testURI.getScheme().equals(Constants.FS_S3A);
}
if (!liveTest) {
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
throw new AssumptionViolatedException(
"No test filesystem in " + TestS3AFileSystemContract.TEST_FS_S3A_NAME);
}
S3AFileSystem fs1 = new S3AFileSystem();
fs1.initialize(testURI, conf);
return fs1;
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
/**
* Tests a live S3 system. If your keys and bucket aren't specified, all tests
* are marked as passed.
*
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
* properly making it impossible to skip the tests if we don't have a valid
* bucket.
**/
public class TestS3AFileSystemContract extends FileSystemContractBaseTest {
protected static final Logger LOG =
LoggerFactory.getLogger(TestS3AFileSystemContract.class);
public static final String TEST_FS_S3A_NAME = "test.fs.s3a.name";
@Override
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf);
super.setUp();
}
@Override
protected void tearDown() throws Exception {
if (fs != null) {
fs.delete(path("test"), true);
}
super.tearDown();
}
@Override
public void testMkdirsWithUmask() throws Exception {
// not supported
}
@Override
public void testRenameFileAsExistingFile() throws Exception {
if (!renameSupported()) return;
Path src = path("/test/hadoop/file");
createFile(src);
Path dst = path("/test/new/newfile");
createFile(dst);
// s3 doesn't support rename option
// rename-overwrites-dest is always allowed.
rename(src, dst, true, false, true);
}
@Override
public void testRenameDirectoryAsExistingDirectory() throws Exception {
if (!renameSupported()) {
return;
}
Path src = path("/test/hadoop/dir");
fs.mkdirs(src);
createFile(path("/test/hadoop/dir/file1"));
createFile(path("/test/hadoop/dir/subdir/file2"));
Path dst = path("/test/new/newdir");
fs.mkdirs(dst);
rename(src, dst, true, false, true);
assertFalse("Nested file1 exists",
fs.exists(path("/test/hadoop/dir/file1")));
assertFalse("Nested file2 exists",
fs.exists(path("/test/hadoop/dir/subdir/file2")));
assertTrue("Renamed nested file1 exists",
fs.exists(path("/test/new/newdir/file1")));
assertTrue("Renamed nested exists",
fs.exists(path("/test/new/newdir/subdir/file2")));
}
// @Override
public void testMoveDirUnderParent() throws Throwable {
// not support because
// Fails if dst is a directory that is not empty.
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assume.assumeTrue;
/**
* Base class for scale tests; here is where the common scale configuration
* keys are defined
*/
public class S3AScaleTestBase {
public static final String SCALE_TEST = "scale.test.";
public static final String KEY_OPERATION_COUNT =
SCALE_TEST + "operation.count";
public static final long DEFAULT_OPERATION_COUNT = 2005;
protected S3AFileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(S3AScaleTestBase.class);
private Configuration conf;
/**
* Configuration generator. May be overridden to inject
* some custom options
* @return a configuration with which to create FS instances
*/
protected Configuration createConfiguration() {
return new Configuration();
}
/**
* Get the configuration used to set up the FS
* @return the configuration
*/
public Configuration getConf() {
return conf;
}
@Before
public void setUp() throws Exception {
conf = createConfiguration();
fs = S3ATestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
ContractTestUtils.rm(fs, getTestPath(), true, true);
}
protected Path getTestPath() {
return new Path("/tests3a");
}
protected long getOperationCount() {
return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Test
public void testBulkRenameAndDelete() throws Throwable {
final Path scaleTestDir = getTestPath();
final Path srcDir = new Path(scaleTestDir, "src");
final Path finalDir = new Path(scaleTestDir, "final");
final long count = getOperationCount();
ContractTestUtils.rm(fs, scaleTestDir, true, false);
fs.mkdirs(srcDir);
fs.mkdirs(finalDir);
int testBufferSize = fs.getConf()
.getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE,
ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE);
// use Executor to speed up file creation
ExecutorService exec = Executors.newFixedThreadPool(16);
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(exec);
try {
final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z');
for (int i = 0; i < count; ++i) {
final String fileName = "foo-" + i;
completionService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
ContractTestUtils.createFile(fs, new Path(srcDir, fileName),
false, data);
return fs.exists(new Path(srcDir, fileName));
}
});
}
for (int i = 0; i < count; ++i) {
final Future<Boolean> future = completionService.take();
try {
if (!future.get()) {
LOG.warn("cannot create file");
}
} catch (ExecutionException e) {
LOG.warn("Error while uploading file", e.getCause());
throw e;
}
}
} finally {
exec.shutdown();
}
int nSrcFiles = fs.listStatus(srcDir).length;
fs.rename(srcDir, finalDir);
assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, "foo-" + 0));
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, "foo-" + count / 2));
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, "foo-" + (count - 1)));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, "foo-" + 0));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, "foo-" + count/2));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, "foo-" + (count-1)));
ContractTestUtils.assertDeleted(fs, finalDir, true, false);
}
@Test
public void testOpenCreate() throws IOException {
Path dir = new Path("/tests3a");
ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
/*
Enable to test the multipart upload
try {
ContractTestUtils.createAndVerifyFile(fs, dir,
(long)6 * 1024 * 1024 * 1024);
} catch (IOException e) {
fail(e.getMessage());
}
*/
}
}

View File

@ -22,15 +22,17 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystemContractBaseTest; import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream; import org.apache.hadoop.fs.s3native.NativeS3FileSystem.NativeS3FsInputStream;
import org.junit.internal.AssumptionViolatedException;
public abstract class NativeS3FileSystemContractBaseTest public abstract class NativeS3FileSystemContractBaseTest
extends FileSystemContractBaseTest { extends FileSystemContractBaseTest {
public static final String KEY_TEST_FS = "test.fs.s3n.name";
private NativeFileSystemStore store; private NativeFileSystemStore store;
abstract NativeFileSystemStore getNativeFileSystemStore() throws IOException; abstract NativeFileSystemStore getNativeFileSystemStore() throws IOException;
@ -40,7 +42,12 @@ protected void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
store = getNativeFileSystemStore(); store = getNativeFileSystemStore();
fs = new NativeS3FileSystem(store); fs = new NativeS3FileSystem(store);
fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf); String fsname = conf.get(KEY_TEST_FS);
if (StringUtils.isEmpty(fsname)) {
throw new AssumptionViolatedException(
"No test FS defined in :" + KEY_TEST_FS);
}
fs.initialize(URI.create(fsname), conf);
} }
@Override @Override

View File

@ -117,10 +117,13 @@ public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB
} }
/*
Enable Multipart upload to run this test
@Test @Test
public void testExtraLargeUpload() public void testExtraLargeUpload()
throws IOException, NoSuchAlgorithmException { throws IOException, NoSuchAlgorithmException {
// Multipart upload, multipart copy // Multipart upload, multipart copy
writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
} }
*/
} }

View File

@ -47,6 +47,11 @@
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>true</value>
</property>
<property> <property>
<name>fs.contract.supports-append</name> <name>fs.contract.supports-append</name>
<value>false</value> <value>false</value>

View File

@ -0,0 +1,51 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!-- Values used when running unit tests. Specify any values in here that
should override the default values. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>target/build/test</value>
<description>A base for other temporary directories.</description>
<final>true</final>
</property>
<!-- Turn security off for tests by default -->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<!--
To run these tests.
# Create a file auth-keys.xml - DO NOT ADD TO REVISION CONTROL
# add the property test.fs.s3n.name to point to an S3 filesystem URL
# Add the credentials for the service you are testing against
-->
<include xmlns="http://www.w3.org/2001/XInclude"
href="auth-keys.xml"/>
</configuration>