HADOOP-13131. Add tests to verify that S3A supports SSE-S3 encryption. Contributed by Steve Loughran.

This commit is contained in:
Chris Nauroth 2016-06-01 14:49:22 -07:00
parent 0bc05e40fa
commit 16b1cc7af9
11 changed files with 518 additions and 51 deletions

View File

@ -127,8 +127,15 @@ public final class Constants {
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
// s3 server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";
/**
* The standard encryption algorithm AWS supports.
* Different implementations may support others (or none).
*/
public static final String SERVER_SIDE_ENCRYPTION_AES256 =
"AES256";
//override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";

View File

@ -764,6 +764,28 @@ public class S3AFileSystem extends FileSystem {
return true;
}
/**
* Low-level call to get at the object metadata.
* @param path path to the object
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
return getObjectMetadata(pathToKey(path));
}
/**
* Request object metadata; increments counters in the process.
* @param key key
* @return the metadata
*/
private ObjectMetadata getObjectMetadata(String key) {
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
statistics.incrementReadOps(1);
return meta;
}
/**
* A helper method to delete a list of keys on a s3-backend.
*
@ -1597,7 +1619,7 @@ public class S3AFileSystem extends FileSystem {
}
/**
* Get the threshold for multipart files
* Get the threshold for multipart files.
* @return the value as set during initialization
*/
public long getMultiPartThreshold() {

View File

@ -768,3 +768,25 @@ By default, the `parallel-tests` profile runs 4 test suites concurrently. This
can be tuned by passing the `testsThreadCount` argument.
mvn -Pparallel-tests -DtestsThreadCount=8 clean test
### Testing against non AWS S3 endpoints.
The S3A filesystem is designed to work with storage endpoints which implement
the S3 protocols to the extent that the amazon S3 SDK is capable of talking
to it. We encourage testing against other filesystems and submissions of patches
which address issues. In particular, we encourage testing of Hadoop release
candidates, as these third-party endpoints get even less testing than the
S3 endpoint itself.
**Disabling the encryption tests**
If the endpoint doesn't support server-side-encryption, these will fail
<property>
<name>test.fs.s3a.encryption.enabled</name>
<value>false</value>
</property>
Encryption is only used for those specific test suites with `Encryption` in
their classname.

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
/**
* An extension of the contract test base set up for S3A tests.
*/
public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
implements S3ATestConstants {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Override
public void teardown() throws Exception {
super.teardown();
IOUtils.closeStream(getFileSystem());
}
@Rule
public TestName methodName = new TestName();
@Before
public void nameThread() {
Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
}
protected Configuration getConfiguration() {
return getContract().getConf();
}
/**
* Get the filesystem as an S3A filesystem.
* @return the typecast FS
*/
@Override
public S3AFileSystem getFileSystem() {
return (S3AFileSystem) super.getFileSystem();
}
/**
* Write a file, read it back, validate the dataset. Overwrites the file
* if it is present
* @param name filename (will have the test path prepended to it)
* @param len length of file
* @return the full path to the file
* @throws IOException any IO problem
*/
protected Path writeThenReadFile(String name, int len) throws IOException {
Path path = path(name);
byte[] data = dataset(len, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, true);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
return path;
}
/**
* Assert that an exception failed with a specific status code.
* @param e exception
* @param code expected status code
* @throws AWSS3IOException rethrown if the status code does not match.
*/
protected void assertStatusCode(AWSS3IOException e, int code)
throws AWSS3IOException {
if (e.getStatusCode() != code) {
throw e;
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
/**
* Constants for S3A Testing.
*/
public interface S3ATestConstants {
/**
* Prefix for any cross-filesystem scale test options.
*/
String SCALE_TEST = "scale.test.";
/**
* Prefix for s3a-specific scale tests.
*/
String S3A_SCALE_TEST = "fs.s3a.scale.test.";
/**
* Prefix for FS s3a tests.
*/
String TEST_FS_S3A = "test.fs.s3a.";
/**
* Name of the test filesystem.
*/
String TEST_FS_S3A_NAME = TEST_FS_S3A + "name";
/**
* The number of operations to perform: {@value}.
*/
String KEY_OPERATION_COUNT = SCALE_TEST + "operation.count";
/**
* The readahead buffer: {@value}.
*/
String KEY_READ_BUFFER_SIZE = S3A_SCALE_TEST + "read.buffer.size";
int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
* Key for a multi MB test file: {@value}.
*/
String KEY_CSVTEST_FILE = S3A_SCALE_TEST + "csvfile";
/**
* Default path for the multi MB test file: {@value}.
*/
String DEFAULT_CSVTEST_FILE = "s3a://landsat-pds/scene_list.gz";
/**
* The default number of operations to perform: {@value}.
*/
long DEFAULT_OPERATION_COUNT = 2005;
/**
* Run the encryption tests?
*/
String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled";
}

View File

@ -27,11 +27,30 @@ import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Utilities for the S3A tests.
*/
public class S3ATestUtils {
public static S3AFileSystem createTestFileSystem(Configuration conf) throws
IOException {
String fsname = conf.getTrimmed(TestS3AFileSystemContract.TEST_FS_S3A_NAME, "");
/**
* Create the test filesystem.
*
* If the test.fs.s3a.name property is not set, this will
* trigger a JUnit failure.
*
* Multipart purging is enabled.
* @param conf configuration
* @return the FS
* @throws IOException IO Problems
* @throws AssumptionViolatedException if the FS is not named
*/
public static S3AFileSystem createTestFileSystem(Configuration conf)
throws IOException {
String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
@ -44,19 +63,31 @@ public class S3ATestUtils {
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
throw new AssumptionViolatedException(
"No test filesystem in " + TestS3AFileSystemContract.TEST_FS_S3A_NAME);
"No test filesystem in " + TEST_FS_S3A_NAME);
}
S3AFileSystem fs1 = new S3AFileSystem();
//enable purging in tests
conf.setBoolean(Constants.PURGE_EXISTING_MULTIPART, true);
conf.setInt(Constants.PURGE_EXISTING_MULTIPART_AGE, 0);
conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 0);
fs1.initialize(testURI, conf);
return fs1;
}
public static FileContext createTestFileContext(Configuration conf) throws
IOException {
String fsname = conf.getTrimmed(TestS3AFileSystemContract.TEST_FS_S3A_NAME, "");
/**
* Create a file context for tests.
*
* If the test.fs.s3a.name property is not set, this will
* trigger a JUnit failure.
*
* Multipart purging is enabled.
* @param conf configuration
* @return the FS
* @throws IOException IO Problems
* @throws AssumptionViolatedException if the FS is not named
*/
public static FileContext createTestFileContext(Configuration conf)
throws IOException {
String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
@ -67,8 +98,8 @@ public class S3ATestUtils {
if (!liveTest) {
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
throw new AssumptionViolatedException(
"No test filesystem in " + TestS3AFileSystemContract.TEST_FS_S3A_NAME);
throw new AssumptionViolatedException("No test filesystem in "
+ TEST_FS_S3A_NAME);
}
FileContext fc = FileContext.getFileContext(testURI,conf);
return fc;
@ -139,4 +170,24 @@ public class S3ATestUtils {
return ex;
}
/**
* Turn off FS Caching: use if a filesystem with different options from
* the default is required.
* @param conf configuration to patch
*/
public static void disableFilesystemCaching(Configuration conf) {
conf.setBoolean("fs.s3a.impl.disable.cache", true);
}
/**
* Skip a test if encryption tests are disabled.
* @param configuration configuration to probe
*/
public static void skipIfEncryptionTestsDisabled(
Configuration configuration) {
if (!configuration.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
skip("Skipping encryption tests");
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
* Test whether or not encryption works by turning it on. Some checks
* are made for different file sizes as there have been reports that the
* file length may be rounded up to match word boundaries.
*/
public class TestS3AEncryption extends AbstractS3ATestBase {
private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256;
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
AES256);
return conf;
}
private static final int[] SIZES = {
0, 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 10 - 3, 2 ^ 11 - 2, 2 ^ 12 - 1
};
@Override
public void teardown() throws Exception {
super.teardown();
IOUtils.closeStream(getFileSystem());
}
@Test
public void testEncryption() throws Throwable {
for (int size: SIZES) {
validateEncryptionForFilesize(size);
}
}
@Test
public void testEncryptionOverRename() throws Throwable {
skipIfEncryptionTestsDisabled(getConfiguration());
Path src = path(createFilename(1024));
byte[] data = dataset(1024, 'a', 'z');
S3AFileSystem fs = getFileSystem();
writeDataset(fs, src, data, data.length, 1024 * 1024, true);
ContractTestUtils.verifyFileContents(fs, src, data);
Path dest = path(src.getName() + "-copy");
fs.rename(src, dest);
ContractTestUtils.verifyFileContents(fs, dest, data);
assertEncrypted(dest);
}
protected void validateEncryptionForFilesize(int len) throws IOException {
skipIfEncryptionTestsDisabled(getConfiguration());
describe("Create an encrypted file of size " + len);
String src = createFilename(len);
Path path = writeThenReadFile(src, len);
assertEncrypted(path);
rm(getFileSystem(), path, false, false);
}
private String createFilename(int len) {
return String.format("%s-%04x", methodName.getMethodName(), len);
}
/**
* Assert that at path references an encrypted blob.
* @param path path
* @throws IOException on a failure
*/
private void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
assertEquals(AES256, md.getSSEAlgorithm());
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
* Test whether or not encryption settings propagate by choosing an invalid
* one. We expect the write to fail with a 400 bad request error
*/
public class TestS3AEncryptionAlgorithmPropagation extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
"DES");
return conf;
}
@Override
public void teardown() throws Exception {
super.teardown();
IOUtils.closeStream(getFileSystem());
}
@Test
public void testEncrypt0() throws Throwable {
writeThenReadFileToFailure(0);
}
@Test
public void testEncrypt256() throws Throwable {
writeThenReadFileToFailure(256);
}
/**
* Make this a no-op so test setup doesn't fail.
* @param path path path
* @throws IOException on any failure
*/
@Override
protected void mkdirs(Path path) throws IOException {
}
protected void writeThenReadFileToFailure(int len) throws IOException {
skipIfEncryptionTestsDisabled(getConfiguration());
describe("Create an encrypted file of size " + len);
try {
writeThenReadFile(methodName.getMethodName() + '-' + len, len);
fail("Expected an exception about an illegal encryption algorithm");
} catch (AWSS3IOException e) {
assertStatusCode(e, 400);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
/**
* Run the encryption tests against the Fast output stream.
* This verifies that both file writing paths can encrypt their data.
*/
public class TestS3AEncryptionFastOutputStream extends TestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(Constants.FAST_UPLOAD, true);
return conf;
}
}

View File

@ -37,7 +37,6 @@ public class TestS3AFileSystemContract extends FileSystemContractBaseTest {
protected static final Logger LOG =
LoggerFactory.getLogger(TestS3AFileSystemContract.class);
public static final String TEST_FS_S3A_NAME = "test.fs.s3a.name";
@Override
public void setUp() throws Exception {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.junit.After;
import org.junit.Assert;
@ -45,11 +46,7 @@ import static org.junit.Assume.assumeTrue;
* Base class for scale tests; here is where the common scale configuration
* keys are defined.
*/
public class S3AScaleTestBase extends Assert {
public static final String SCALE_TEST = "scale.test.";
public static final String S3A_SCALE_TEST = "fs.s3a.scale.test.";
public class S3AScaleTestBase extends Assert implements S3ATestConstants {
@Rule
public TestName methodName = new TestName();
@ -59,37 +56,6 @@ public class S3AScaleTestBase extends Assert {
Thread.currentThread().setName("JUnit");
}
/**
* The number of operations to perform: {@value}.
*/
public static final String KEY_OPERATION_COUNT =
SCALE_TEST + "operation.count";
/**
* The readahead buffer: {@value}.
*/
public static final String KEY_READ_BUFFER_SIZE =
S3A_SCALE_TEST + "read.buffer.size";
public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
* Key for a multi MB test file: {@value}.
*/
public static final String KEY_CSVTEST_FILE =
S3A_SCALE_TEST + "csvfile";
/**
* Default path for the multi MB test file: {@value}.
*/
public static final String DEFAULT_CSVTEST_FILE
= "s3a://landsat-pds/scene_list.gz";
/**
* The default number of operations to perform: {@value}.
*/
public static final long DEFAULT_OPERATION_COUNT = 2005;
protected S3AFileSystem fs;
protected static final Logger LOG =