HADOOP-17536. ABFS: Supporting customer provided encryption key (#2707)

Contributed by bilahari t h

Change-Id: I86216e755b81e9d14f5e87844d9fd58e8940560c
This commit is contained in:
bilaharith 2021-04-27 15:45:52 +05:30 committed by Steve Loughran
parent 99e4e9bad6
commit 6649e5888b
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
8 changed files with 1019 additions and 7 deletions

View File

@ -939,6 +939,11 @@ public boolean enableAbfsListIterator() {
return this.enableAbfsListIterator; return this.enableAbfsListIterator;
} }
public String getClientProvidedEncryptionKey() {
String accSpecEncKey = accountConf(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY);
return rawConfig.get(accSpecEncKey, null);
}
@VisibleForTesting @VisibleForTesting
void setReadBufferSize(int bufferSize) { void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize; this.readBufferSize = bufferSize;

View File

@ -142,6 +142,8 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
/** Setting this true will make the driver use it's own RemoteIterator implementation */ /** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key */
public static final String FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY = "fs.azure.client-provided-encryption-key";
/** End point of ABFS account: {@value}. */ /** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint"; public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";

View File

@ -68,6 +68,8 @@ public final class FileSystemConfigurations {
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000; public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";
public static final int MAX_CONCURRENT_READ_THREADS = 12; public static final int MAX_CONCURRENT_READ_THREADS = 12;
public static final int MAX_CONCURRENT_WRITE_THREADS = 8; public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;

View File

@ -59,6 +59,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask"; public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled"; public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency"; public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
public static final String X_MS_ENCRYPTION_ALGORITHM = "x-ms-encryption-algorithm";
public static final String X_MS_REQUEST_SERVER_ENCRYPTED = "x-ms-request-server-encrypted";
public static final String X_MS_SERVER_ENCRYPTED = "x-ms-server-encrypted";
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id"; public static final String X_MS_LEASE_ID = "x-ms-lease-id";

View File

@ -25,8 +25,12 @@
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.UUID; import java.util.UUID;
@ -65,6 +69,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
@ -74,6 +79,7 @@
*/ */
public class AbfsClient implements Closeable { public class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private final URL baseUrl; private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials; private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2019-12-12"; private final String xMsVersion = "2019-12-12";
@ -82,6 +88,8 @@ public class AbfsClient implements Closeable {
private final AbfsConfiguration abfsConfiguration; private final AbfsConfiguration abfsConfiguration;
private final String userAgent; private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker; private final AbfsPerfTracker abfsPerfTracker;
private final String clientProvidedEncryptionKey;
private final String clientProvidedEncryptionKeySHA;
private final String accountName; private final String accountName;
private final AuthType authType; private final AuthType authType;
@ -93,7 +101,8 @@ public class AbfsClient implements Closeable {
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration, final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) { final AbfsClientContext abfsClientContext)
throws IOException {
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials; this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString(); String baseUrlString = baseUrl.toString();
@ -103,6 +112,17 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName); this.authType = abfsConfiguration.getAuthType(accountName);
String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
if (encryptionKey != null) {
this.clientProvidedEncryptionKey = getBase64EncodedString(encryptionKey);
this.clientProvidedEncryptionKeySHA = getBase64EncodedString(
getSHA256Hash(encryptionKey));
} else {
this.clientProvidedEncryptionKey = null;
this.clientProvidedEncryptionKeySHA = null;
}
String sslProviderName = null; String sslProviderName = null;
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) { if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
@ -131,7 +151,8 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration, final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider, final AccessTokenProvider tokenProvider,
final AbfsClientContext abfsClientContext) { final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext); this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.tokenProvider = tokenProvider; this.tokenProvider = tokenProvider;
} }
@ -139,11 +160,29 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration, final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider, final SASTokenProvider sasTokenProvider,
final AbfsClientContext abfsClientContext) { final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext); this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.sasTokenProvider = sasTokenProvider; this.sasTokenProvider = sasTokenProvider;
} }
private byte[] getSHA256Hash(String key) throws IOException {
try {
final MessageDigest digester = MessageDigest.getInstance("SHA-256");
return digester.digest(key.getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
}
private String getBase64EncodedString(String key) {
return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
}
private String getBase64EncodedString(byte[] bytes) {
return Base64.getEncoder().encodeToString(bytes);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (tokenProvider instanceof Closeable) { if (tokenProvider instanceof Closeable) {
@ -180,6 +219,18 @@ List<AbfsHttpHeader> createDefaultHeaders() {
return requestHeaders; return requestHeaders;
} }
private void addCustomerProvidedKeyHeaders(
final List<AbfsHttpHeader> requestHeaders) {
if (clientProvidedEncryptionKey != null) {
requestHeaders.add(
new AbfsHttpHeader(X_MS_ENCRYPTION_KEY, clientProvidedEncryptionKey));
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_KEY_SHA256,
clientProvidedEncryptionKeySHA));
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_ALGORITHM,
SERVER_SIDE_ENCRYPTION_ALGORITHM));
}
}
AbfsUriQueryBuilder createDefaultUriQueryBuilder() { AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT); abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
@ -289,6 +340,9 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
final String permission, final String umask, final String permission, final String umask,
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException { final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (isFile) {
addCustomerProvidedKeyHeaders(requestHeaders);
}
if (!overwrite) { if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
} }
@ -499,6 +553,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
AppendRequestParameters reqParams, final String cachedSasToken) AppendRequestParameters reqParams, final String cachedSasToken)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header. // PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@ -585,6 +640,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r
boolean isClose, final String cachedSasToken, final String leaseId) boolean isClose, final String cachedSasToken, final String leaseId)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header. // PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@ -616,6 +672,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r
public AbfsRestOperation setPathProperties(final String path, final String properties) public AbfsRestOperation setPathProperties(final String path, final String properties)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header. // PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@ -649,6 +706,8 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
// only traversal (execute) permission is required. // only traversal (execute) permission is required.
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
operation = SASTokenProvider.GET_STATUS_OPERATION; operation = SASTokenProvider.GET_STATUS_OPERATION;
} else {
addCustomerProvidedKeyHeaders(requestHeaders);
} }
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
@ -667,6 +726,7 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset, public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException { final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
requestHeaders.add(new AbfsHttpHeader(RANGE, requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1))); String.format("bytes=%d-%d", position, position + bufferLength - 1)));
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));

View File

@ -0,0 +1,936 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 ("License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.EnumSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
import static org.apache.hadoop.fs.permission.AclEntryType.USER;
import static org.apache.hadoop.fs.permission.FsAction.ALL;
public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
private static final Logger LOG = LoggerFactory
.getLogger(ITestCustomerProvidedKey.class);
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int INT_512 = 512;
private static final int INT_50 = 50;
private static final int ENCRYPTION_KEY_LEN = 32;
private static final int FILE_SIZE = 10 * ONE_MB;
private static final int FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS = 24 * ONE_MB;
public ITestCustomerProvidedKey() throws Exception {
boolean isCPKTestsEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_CPK_ENABLED, false);
Assume.assumeTrue(isCPKTestsEnabled);
}
@Test
public void testReadWithCPK() throws Exception {
final AzureBlobFileSystem fs = getAbfs(true);
String fileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, fileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
int length = FILE_SIZE;
byte[] buffer = new byte[length];
final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
final String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
AbfsRestOperation abfsRestOperation = abfsClient
.read(fileName, 0, buffer, 0, length, eTag, null);
assertCPKHeaders(abfsRestOperation, true);
assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
getCPKSha(fs));
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
"true");
assertResponseHeader(abfsRestOperation, false,
X_MS_REQUEST_SERVER_ENCRYPTED, "");
// Trying to read with different CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"different-1234567890123456789012");
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
FSDataInputStream iStream = fs2.open(new Path(fileName))) {
int len = 8 * ONE_MB;
byte[] b = new byte[len];
LambdaTestUtils.intercept(IOException.class, () -> {
iStream.read(b, 0, len);
});
}
// Trying to read with no CPK headers
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
.get(conf); FSDataInputStream iStream = fs3.open(new Path(fileName))) {
int len = 8 * ONE_MB;
byte[] b = new byte[len];
LambdaTestUtils.intercept(IOException.class, () -> {
iStream.read(b, 0, len);
});
}
}
@Test
public void testReadWithoutCPK() throws Exception {
final AzureBlobFileSystem fs = getAbfs(false);
String fileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, fileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
int length = INT_512;
byte[] buffer = new byte[length * 4];
final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
final String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
AbfsRestOperation abfsRestOperation = abfsClient
.read(fileName, 0, buffer, 0, length, eTag, null);
assertCPKHeaders(abfsRestOperation, false);
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
getCPKSha(fs));
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
"true");
assertResponseHeader(abfsRestOperation, false,
X_MS_REQUEST_SERVER_ENCRYPTED, "");
// Trying to read with CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"12345678901234567890123456789012");
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient2.read(fileName, 0, buffer, 0, length, eTag, null);
});
}
}
@Test
public void testAppendWithCPK() throws Exception {
final AzureBlobFileSystem fs = getAbfs(true);
final String fileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, fileName, FILE_SIZE);
// Trying to append with correct CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
0, 0, 5, Mode.APPEND_MODE, false, null);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.append(fileName, buffer, appendRequestParameters, null);
assertCPKHeaders(abfsRestOperation, true);
assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
getCPKSha(fs));
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
"true");
// Trying to append with different CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"different-1234567890123456789012");
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient2.append(fileName, buffer, appendRequestParameters, null);
});
}
// Trying to append with no CPK headers
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
.get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient3.append(fileName, buffer, appendRequestParameters, null);
});
}
}
@Test
public void testAppendWithoutCPK() throws Exception {
final AzureBlobFileSystem fs = getAbfs(false);
final String fileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, fileName, FILE_SIZE);
// Trying to append without CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
0, 0, 5, Mode.APPEND_MODE, false, null);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.append(fileName, buffer, appendRequestParameters, null);
assertCPKHeaders(abfsRestOperation, false);
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
"");
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
"true");
// Trying to append with CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"12345678901234567890123456789012");
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient2.append(fileName, buffer, appendRequestParameters, null);
});
}
}
@Test
public void testSetGetXAttr() throws Exception {
final AzureBlobFileSystem fs = getAbfs(true);
String fileName = methodName.getMethodName();
createFileAndGetContent(fs, fileName, FILE_SIZE);
String valSent = "testValue";
String attrName = "testXAttr";
// set get and verify
fs.setXAttr(new Path(fileName), attrName,
valSent.getBytes(StandardCharsets.UTF_8),
EnumSet.of(XAttrSetFlag.CREATE));
byte[] valBytes = fs.getXAttr(new Path(fileName), attrName);
String valRecieved = new String(valBytes);
assertEquals(valSent, valRecieved);
// set new value get and verify
valSent = "new value";
fs.setXAttr(new Path(fileName), attrName,
valSent.getBytes(StandardCharsets.UTF_8),
EnumSet.of(XAttrSetFlag.REPLACE));
valBytes = fs.getXAttr(new Path(fileName), attrName);
valRecieved = new String(valBytes);
assertEquals(valSent, valRecieved);
// Read without CPK header
LambdaTestUtils.intercept(IOException.class, () -> {
getAbfs(false).getXAttr(new Path(fileName), attrName);
});
// Wrong CPK
LambdaTestUtils.intercept(IOException.class, () -> {
getSameFSWithWrongCPK(fs).getXAttr(new Path(fileName), attrName);
});
}
@Test
public void testCopyBetweenAccounts() throws Exception {
String accountName = getRawConfiguration()
.get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT);
String accountKey = getRawConfiguration()
.get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY);
Assume.assumeTrue(accountName != null && !accountName.isEmpty());
Assume.assumeTrue(accountKey != null && !accountKey.isEmpty());
String fileSystemName = "cpkfs";
// Create fs1 and a file with CPK
AzureBlobFileSystem fs1 = getAbfs(true);
int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs1, "fs1-file.txt", fileContent);
// Create fs2 with different CPK
Configuration conf = new Configuration();
conf.addResource(TEST_CONFIGURATION_FILE_NAME);
conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
conf.unset(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_ABFS_ACCOUNT_NAME, accountName);
conf.set(FS_AZURE_ACCOUNT_KEY + "." + accountName, accountKey);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"123456789012345678901234567890ab");
conf.set("fs.defaultFS", "abfs://" + fileSystemName + "@" + accountName);
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
// Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
Path fs2DestFilePath = new Path("fs2-dest-file.txt");
FSDataOutputStream ops = fs2.create(fs2DestFilePath);
try (FSDataInputStream iStream = fs1.open(testFilePath)) {
long totalBytesRead = 0;
do {
int length = 8 * ONE_MB;
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
totalBytesRead += bytesRead;
ops.write(buffer);
} while (totalBytesRead < fileContent.length);
ops.close();
}
// Trying to read fs2DestFilePath with different CPK headers
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"different-1234567890123456789012");
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
.get(conf); FSDataInputStream iStream = fs3.open(fs2DestFilePath)) {
int length = 8 * ONE_MB;
byte[] buffer = new byte[length];
LambdaTestUtils.intercept(IOException.class, () -> {
iStream.read(buffer, 0, length);
});
}
// Trying to read fs2DestFilePath with no CPK headers
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
try (AzureBlobFileSystem fs4 = (AzureBlobFileSystem) FileSystem
.get(conf); FSDataInputStream iStream = fs4.open(fs2DestFilePath)) {
int length = 8 * ONE_MB;
byte[] buffer = new byte[length];
LambdaTestUtils.intercept(IOException.class, () -> {
iStream.read(buffer, 0, length);
});
}
// Read fs2DestFilePath and verify the content with the initial random
// bytes created and wrote into the source file at fs1
try (FSDataInputStream iStream = fs2.open(fs2DestFilePath)) {
long totalBytesRead = 0;
int pos = 0;
do {
int length = 8 * ONE_MB;
byte[] buffer = new byte[length];
int bytesRead = iStream.read(buffer, 0, length);
totalBytesRead += bytesRead;
for (int i = 0; i < bytesRead; i++) {
assertEquals(fileContent[pos + i], buffer[i]);
}
pos = pos + bytesRead;
} while (totalBytesRead < fileContent.length);
}
}
@Test
public void testListPathWithCPK() throws Exception {
testListPath(true);
}
@Test
public void testListPathWithoutCPK() throws Exception {
testListPath(false);
}
private void testListPath(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
String testDirName = "/" + methodName.getMethodName();
final Path testPath = new Path(testDirName);
fs.mkdirs(testPath);
createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.listPath(testDirName, false, INT_50, null);
assertListstatus(fs, abfsRestOperation, testPath);
// Trying with different CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"different-1234567890123456789012");
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
AbfsClient abfsClient2 = fs2.getAbfsClient();
abfsRestOperation = abfsClient2.listPath(testDirName, false, INT_50, null);
assertListstatus(fs, abfsRestOperation, testPath);
if (isWithCPK) {
// Trying with no CPK headers
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem.get(conf);
AbfsClient abfsClient3 = fs3.getAbfsClient();
abfsRestOperation = abfsClient3
.listPath(testDirName, false, INT_50, null);
assertListstatus(fs, abfsRestOperation, testPath);
}
}
private void assertListstatus(AzureBlobFileSystem fs,
AbfsRestOperation abfsRestOperation, Path testPath) throws IOException {
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
FileStatus[] listStatuses = fs.listStatus(testPath);
Assertions.assertThat(listStatuses.length)
.describedAs("listStatuses should have 2 entries").isEqualTo(2);
listStatuses = getSameFSWithWrongCPK(fs).listStatus(testPath);
Assertions.assertThat(listStatuses.length)
.describedAs("listStatuses should have 2 entries").isEqualTo(2);
}
@Test
public void testCreatePathWithCPK() throws Exception {
testCreatePath(true);
}
@Test
public void testCreatePathWithoutCPK() throws Exception {
testCreatePath(false);
}
private void testCreatePath(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
FsPermission permission = new FsPermission(FsAction.EXECUTE,
FsAction.EXECUTE, FsAction.EXECUTE);
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
FsAction.NONE);
boolean isNamespaceEnabled = fs.getIsNamespaceEnabled();
AbfsRestOperation abfsRestOperation = abfsClient
.createPath(testFileName, true, true,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
assertCPKHeaders(abfsRestOperation, isWithCPK);
assertResponseHeader(abfsRestOperation, isWithCPK,
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
"true");
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
Assertions.assertThat(listStatuses.length)
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
listStatuses = getSameFSWithWrongCPK(fs).listStatus(new Path(testFileName));
Assertions.assertThat(listStatuses.length)
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
}
@Test
public void testRenamePathWithCPK() throws Exception {
testRenamePath(true);
}
@Test
public void testRenamePathWithoutCPK() throws Exception {
testRenamePath(false);
}
private void testRenamePath(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, testFileName, FILE_SIZE);
FileStatus fileStatusBeforeRename = fs
.getFileStatus(new Path(testFileName));
String newName = "/newName";
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null);
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
LambdaTestUtils.intercept(FileNotFoundException.class,
(() -> fs.getFileStatus(new Path(testFileName))));
FileStatus fileStatusAfterRename = fs.getFileStatus(new Path(newName));
Assertions.assertThat(fileStatusAfterRename.getLen())
.describedAs("File size has to be same before and after rename")
.isEqualTo(fileStatusBeforeRename.getLen());
}
@Test
public void testFlushWithCPK() throws Exception {
testFlush(true);
}
@Test
public void testFlushWithoutCPK() throws Exception {
testFlush(false);
}
private void testFlush(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
fs.create(new Path(testFileName));
AbfsClient abfsClient = fs.getAbfsClient();
String expectedCPKSha = getCPKSha(fs);
byte[] fileContent = getRandomBytesArray(FILE_SIZE);
Path testFilePath = new Path(testFileName + "1");
FSDataOutputStream oStream = fs.create(testFilePath);
oStream.write(fileContent);
// Trying to read with different CPK headers
Configuration conf = fs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
"different-1234567890123456789012");
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient2.flush(testFileName, 0, false, false, null, null);
});
}
// Trying to read with no CPK headers
if (isWithCPK) {
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
.get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
LambdaTestUtils.intercept(IOException.class, () -> {
abfsClient3.flush(testFileName, 0, false, false, null, null);
});
}
}
// With correct CPK
AbfsRestOperation abfsRestOperation = abfsClient
.flush(testFileName, 0, false, false, null, null);
assertCPKHeaders(abfsRestOperation, isWithCPK);
assertResponseHeader(abfsRestOperation, isWithCPK,
X_MS_ENCRYPTION_KEY_SHA256, expectedCPKSha);
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
isWithCPK + "");
}
@Test
public void testSetPathPropertiesWithCPK() throws Exception {
testSetPathProperties(true);
}
@Test
public void testSetPathPropertiesWithoutCPK() throws Exception {
testSetPathProperties(false);
}
private void testSetPathProperties(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
final Hashtable<String, String> properties = new Hashtable<>();
properties.put("key", "val");
AbfsRestOperation abfsRestOperation = abfsClient
.setPathProperties(testFileName,
convertXmsPropertiesToCommaSeparatedString(properties));
assertCPKHeaders(abfsRestOperation, isWithCPK);
assertResponseHeader(abfsRestOperation, isWithCPK,
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
"true");
}
@Test
public void testGetPathStatusFileWithCPK() throws Exception {
testGetPathStatusFile(true);
}
@Test
public void testGetPathStatusFileWithoutCPK() throws Exception {
testGetPathStatusFile(false);
}
private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.getPathStatus(testFileName, false);
assertCPKHeaders(abfsRestOperation, false);
assertResponseHeader(abfsRestOperation, isWithCPK,
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
"true");
assertResponseHeader(abfsRestOperation, false,
X_MS_REQUEST_SERVER_ENCRYPTED, "");
abfsRestOperation = abfsClient.getPathStatus(testFileName, true);
assertCPKHeaders(abfsRestOperation, isWithCPK);
assertResponseHeader(abfsRestOperation, isWithCPK,
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
"true");
assertResponseHeader(abfsRestOperation, false,
X_MS_REQUEST_SERVER_ENCRYPTED, "");
}
@Test
public void testDeletePathWithCPK() throws Exception {
testDeletePath(false);
}
@Test
public void testDeletePathWithoutCPK() throws Exception {
testDeletePath(false);
}
private void testDeletePath(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
createFileAndGetContent(fs, testFileName, FILE_SIZE);
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
Assertions.assertThat(listStatuses.length)
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.deletePath(testFileName, false, null);
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
Assertions.assertThatThrownBy(() -> fs.listStatus(new Path(testFileName)))
.isInstanceOf(FileNotFoundException.class);
}
@Test
public void testSetPermissionWithCPK() throws Exception {
testSetPermission(true);
}
@Test
public void testSetPermissionWithoutCPK() throws Exception {
testSetPermission(false);
}
private void testSetPermission(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
FsPermission permission = new FsPermission(FsAction.EXECUTE,
FsAction.EXECUTE, FsAction.EXECUTE);
AbfsRestOperation abfsRestOperation = abfsClient
.setPermission(testFileName, permission.toString());
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
}
@Test
public void testSetAclWithCPK() throws Exception {
testSetAcl(true);
}
@Test
public void testSetAclWithoutCPK() throws Exception {
testSetAcl(false);
}
private void testSetAcl(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(ACCESS, USER, ALL));
final Map<String, String> aclEntries = AbfsAclHelper
.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
AbfsRestOperation abfsRestOperation = abfsClient
.setAcl(testFileName, AbfsAclHelper.serializeAclSpec(aclEntries));
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
}
@Test
public void testGetAclWithCPK() throws Exception {
testGetAcl(true);
}
@Test
public void testGetAclWithoutCPK() throws Exception {
testGetAcl(false);
}
private void testGetAcl(final boolean isWithCPK) throws Exception {
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
Assume.assumeTrue(fs.getIsNamespaceEnabled());
createFileAndGetContent(fs, testFileName, FILE_SIZE);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient.getAclStatus(testFileName);
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
}
@Test
public void testCheckAccessWithCPK() throws Exception {
testCheckAccess(true);
}
@Test
public void testCheckAccessWithoutCPK() throws Exception {
testCheckAccess(false);
}
private void testCheckAccess(final boolean isWithCPK) throws Exception {
boolean isHNSEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
isHNSEnabled);
Assume.assumeTrue("AuthType has to be OAuth",
getAuthType() == AuthType.OAuth);
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
final String testFileName = "/" + methodName.getMethodName();
fs.create(new Path(testFileName));
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.checkAccess(testFileName, "rwx");
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
}
private byte[] createFileAndGetContent(AzureBlobFileSystem fs,
String fileName, int fileSize) throws IOException {
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
ContractTestUtils.verifyFileContents(fs, testFilePath, fileContent);
return fileContent;
}
private void assertCPKHeaders(AbfsRestOperation abfsRestOperation,
boolean isCPKHeaderExpected) {
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY, isCPKHeaderExpected);
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY_SHA256,
isCPKHeaderExpected);
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_ALGORITHM,
isCPKHeaderExpected);
}
private void assertNoCPKResponseHeadersPresent(
AbfsRestOperation abfsRestOperation) {
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, false,
X_MS_REQUEST_SERVER_ENCRYPTED, "");
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
"");
}
private void assertResponseHeader(AbfsRestOperation abfsRestOperation,
boolean isHeaderExpected, String headerName, String expectedValue) {
final AbfsHttpOperation result = abfsRestOperation.getResult();
final String value = result.getResponseHeader(headerName);
if (isHeaderExpected) {
Assertions.assertThat(value).isEqualTo(expectedValue);
} else {
Assertions.assertThat(value).isNull();
}
}
private void assertHeader(AbfsRestOperation abfsRestOperation,
String headerName, boolean isCPKHeaderExpected) {
assertTrue(abfsRestOperation != null);
Optional<AbfsHttpHeader> header = abfsRestOperation.getRequestHeaders()
.stream().filter(abfsHttpHeader -> abfsHttpHeader.getName()
.equalsIgnoreCase(headerName)).findFirst();
String desc;
if (isCPKHeaderExpected) {
desc =
"CPK header " + headerName + " is expected, but the same is absent.";
} else {
desc = "CPK header " + headerName
+ " is not expected, but the same is present.";
}
Assertions.assertThat(header.isPresent()).describedAs(desc)
.isEqualTo(isCPKHeaderExpected);
}
private byte[] getSHA256Hash(String key) throws IOException {
try {
final MessageDigest digester = MessageDigest.getInstance("SHA-256");
return digester.digest(key.getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
}
private String getCPKSha(final AzureBlobFileSystem abfs) throws IOException {
Configuration conf = abfs.getConf();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
String encryptionKey = conf
.get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
if (encryptionKey == null || encryptionKey.isEmpty()) {
return "";
}
return getBase64EncodedString(getSHA256Hash(encryptionKey));
}
private String getBase64EncodedString(byte[] bytes) {
return java.util.Base64.getEncoder().encodeToString(bytes);
}
private Path createFileWithContent(FileSystem fs, String fileName,
byte[] fileContent) throws IOException {
Path testFilePath = new Path(fileName);
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
oStream.write(fileContent);
oStream.flush();
}
return testFilePath;
}
private String convertXmsPropertiesToCommaSeparatedString(
final Hashtable<String, String> properties)
throws CharacterCodingException {
StringBuilder commaSeparatedProperties = new StringBuilder();
final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING)
.newEncoder();
for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
String key = propertyEntry.getKey();
String value = propertyEntry.getValue();
Boolean canEncodeValue = encoder.canEncode(value);
if (!canEncodeValue) {
throw new CharacterCodingException();
}
String encodedPropertyValue = Base64
.encode(encoder.encode(CharBuffer.wrap(value)).array());
commaSeparatedProperties.append(key).append(AbfsHttpConstants.EQUAL)
.append(encodedPropertyValue);
commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
}
if (commaSeparatedProperties.length() != 0) {
commaSeparatedProperties
.deleteCharAt(commaSeparatedProperties.length() - 1);
}
return commaSeparatedProperties.toString();
}
private String getOctalNotation(FsPermission fsPermission) {
Preconditions.checkNotNull(fsPermission, "fsPermission");
return String
.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
}
private byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
private AzureBlobFileSystem getAbfs(boolean withCPK) throws IOException {
return getAbfs(withCPK, "12345678901234567890123456789012");
}
private AzureBlobFileSystem getAbfs(boolean withCPK, String cpk)
throws IOException {
Configuration conf = getRawConfiguration();
if (withCPK) {
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName(),
cpk);
} else {
conf.unset(
FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName());
}
return (AzureBlobFileSystem) FileSystem.newInstance(conf);
}
private AzureBlobFileSystem getSameFSWithWrongCPK(
final AzureBlobFileSystem fs) throws IOException {
AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration();
Configuration conf = abfsConf.getRawConfiguration();
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
String cpk = conf
.get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
if (cpk == null || cpk.isEmpty()) {
cpk = "01234567890123456789012345678912";
}
cpk = "different-" + cpk;
String differentCpk = cpk.substring(0, ENCRYPTION_KEY_LEN - 1);
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
differentCpk);
conf.set("fs.defaultFS",
"abfs://" + getFileSystemName() + "@" + accountName);
AzureBlobFileSystem sameFSWithDifferentCPK =
(AzureBlobFileSystem) FileSystem.newInstance(conf);
return sameFSWithDifferentCPK;
}
}

View File

@ -28,6 +28,7 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled"; public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled"; public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
public static final String FS_AZURE_TEST_CPK_ENABLED = "fs.azure.test.cpk.enabled";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret"; public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
@ -54,6 +55,9 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret"; public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret";
public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT = "fs.azure.test.cpk-enabled-secondary-account";
public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY = "fs.azure.test.cpk-enabled-secondary-account.key";
public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml"; public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-"; public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
public static final int TEST_TIMEOUT = 15 * 60 * 1000; public static final int TEST_TIMEOUT = 15 * 60 * 1000;

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -103,7 +102,7 @@ public TestAbfsClient(){
} }
private String getUserAgentString(AbfsConfiguration config, private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws MalformedURLException { boolean includeSSLProvider) throws IOException {
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build(); AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null, AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
config, (AccessTokenProvider) null, abfsClientContext); config, (AccessTokenProvider) null, abfsClientContext);
@ -250,8 +249,7 @@ public void verifyUserAgentClusterType() throws Exception {
public static AbfsClient createTestClientFromCurrentContext( public static AbfsClient createTestClientFromCurrentContext(
AbfsClient baseAbfsClientInstance, AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) AbfsConfiguration abfsConfig) throws IOException {
throws AzureBlobFileSystemException {
AuthType currentAuthType = abfsConfig.getAuthType( AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName()); abfsConfig.getAccountName());