HADOOP-17536. ABFS: Supporting customer provided encryption key (#2707)
Contributed by bilahari t h
This commit is contained in:
parent
ef13f8ad6b
commit
f54e7646cf
|
@ -948,6 +948,11 @@ public class AbfsConfiguration{
|
||||||
return this.enableAbfsListIterator;
|
return this.enableAbfsListIterator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getClientProvidedEncryptionKey() {
|
||||||
|
String accSpecEncKey = accountConf(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY);
|
||||||
|
return rawConfig.get(accSpecEncKey, null);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setReadBufferSize(int bufferSize) {
|
void setReadBufferSize(int bufferSize) {
|
||||||
this.readBufferSize = bufferSize;
|
this.readBufferSize = bufferSize;
|
||||||
|
|
|
@ -143,6 +143,8 @@ public final class ConfigurationKeys {
|
||||||
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
|
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
|
||||||
/** Setting this true will make the driver use it's own RemoteIterator implementation */
|
/** Setting this true will make the driver use it's own RemoteIterator implementation */
|
||||||
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
|
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
|
||||||
|
/** Server side encryption key */
|
||||||
|
public static final String FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY = "fs.azure.client-provided-encryption-key";
|
||||||
|
|
||||||
/** End point of ABFS account: {@value}. */
|
/** End point of ABFS account: {@value}. */
|
||||||
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
|
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
|
||||||
|
|
|
@ -68,6 +68,8 @@ public final class FileSystemConfigurations {
|
||||||
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
|
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
|
||||||
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
|
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
|
||||||
|
|
||||||
|
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";
|
||||||
|
|
||||||
public static final int MAX_CONCURRENT_READ_THREADS = 12;
|
public static final int MAX_CONCURRENT_READ_THREADS = 12;
|
||||||
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
|
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
|
||||||
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
|
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
|
||||||
|
|
|
@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
|
||||||
public static final String X_MS_UMASK = "x-ms-umask";
|
public static final String X_MS_UMASK = "x-ms-umask";
|
||||||
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
|
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
|
||||||
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
|
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
|
||||||
|
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
|
||||||
|
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
|
||||||
|
public static final String X_MS_ENCRYPTION_ALGORITHM = "x-ms-encryption-algorithm";
|
||||||
|
public static final String X_MS_REQUEST_SERVER_ENCRYPTED = "x-ms-request-server-encrypted";
|
||||||
|
public static final String X_MS_SERVER_ENCRYPTED = "x-ms-server-encrypted";
|
||||||
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
|
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
|
||||||
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
|
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
|
||||||
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
|
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
|
||||||
|
|
|
@ -25,8 +25,12 @@ import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -65,6 +69,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||||
|
@ -74,6 +79,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||||
*/
|
*/
|
||||||
public class AbfsClient implements Closeable {
|
public class AbfsClient implements Closeable {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
|
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
|
||||||
|
|
||||||
private final URL baseUrl;
|
private final URL baseUrl;
|
||||||
private final SharedKeyCredentials sharedKeyCredentials;
|
private final SharedKeyCredentials sharedKeyCredentials;
|
||||||
private final String xMsVersion = "2019-12-12";
|
private final String xMsVersion = "2019-12-12";
|
||||||
|
@ -82,6 +88,8 @@ public class AbfsClient implements Closeable {
|
||||||
private final AbfsConfiguration abfsConfiguration;
|
private final AbfsConfiguration abfsConfiguration;
|
||||||
private final String userAgent;
|
private final String userAgent;
|
||||||
private final AbfsPerfTracker abfsPerfTracker;
|
private final AbfsPerfTracker abfsPerfTracker;
|
||||||
|
private final String clientProvidedEncryptionKey;
|
||||||
|
private final String clientProvidedEncryptionKeySHA;
|
||||||
|
|
||||||
private final String accountName;
|
private final String accountName;
|
||||||
private final AuthType authType;
|
private final AuthType authType;
|
||||||
|
@ -93,7 +101,8 @@ public class AbfsClient implements Closeable {
|
||||||
|
|
||||||
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final AbfsClientContext abfsClientContext) {
|
final AbfsClientContext abfsClientContext)
|
||||||
|
throws IOException {
|
||||||
this.baseUrl = baseUrl;
|
this.baseUrl = baseUrl;
|
||||||
this.sharedKeyCredentials = sharedKeyCredentials;
|
this.sharedKeyCredentials = sharedKeyCredentials;
|
||||||
String baseUrlString = baseUrl.toString();
|
String baseUrlString = baseUrl.toString();
|
||||||
|
@ -103,6 +112,17 @@ public class AbfsClient implements Closeable {
|
||||||
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
||||||
this.authType = abfsConfiguration.getAuthType(accountName);
|
this.authType = abfsConfiguration.getAuthType(accountName);
|
||||||
|
|
||||||
|
String encryptionKey = this.abfsConfiguration
|
||||||
|
.getClientProvidedEncryptionKey();
|
||||||
|
if (encryptionKey != null) {
|
||||||
|
this.clientProvidedEncryptionKey = getBase64EncodedString(encryptionKey);
|
||||||
|
this.clientProvidedEncryptionKeySHA = getBase64EncodedString(
|
||||||
|
getSHA256Hash(encryptionKey));
|
||||||
|
} else {
|
||||||
|
this.clientProvidedEncryptionKey = null;
|
||||||
|
this.clientProvidedEncryptionKeySHA = null;
|
||||||
|
}
|
||||||
|
|
||||||
String sslProviderName = null;
|
String sslProviderName = null;
|
||||||
|
|
||||||
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
|
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
|
||||||
|
@ -131,7 +151,8 @@ public class AbfsClient implements Closeable {
|
||||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final AccessTokenProvider tokenProvider,
|
final AccessTokenProvider tokenProvider,
|
||||||
final AbfsClientContext abfsClientContext) {
|
final AbfsClientContext abfsClientContext)
|
||||||
|
throws IOException {
|
||||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||||
this.tokenProvider = tokenProvider;
|
this.tokenProvider = tokenProvider;
|
||||||
}
|
}
|
||||||
|
@ -139,11 +160,29 @@ public class AbfsClient implements Closeable {
|
||||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||||
final AbfsConfiguration abfsConfiguration,
|
final AbfsConfiguration abfsConfiguration,
|
||||||
final SASTokenProvider sasTokenProvider,
|
final SASTokenProvider sasTokenProvider,
|
||||||
final AbfsClientContext abfsClientContext) {
|
final AbfsClientContext abfsClientContext)
|
||||||
|
throws IOException {
|
||||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
|
||||||
this.sasTokenProvider = sasTokenProvider;
|
this.sasTokenProvider = sasTokenProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private byte[] getSHA256Hash(String key) throws IOException {
|
||||||
|
try {
|
||||||
|
final MessageDigest digester = MessageDigest.getInstance("SHA-256");
|
||||||
|
return digester.digest(key.getBytes(StandardCharsets.UTF_8));
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBase64EncodedString(String key) {
|
||||||
|
return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBase64EncodedString(byte[] bytes) {
|
||||||
|
return Base64.getEncoder().encodeToString(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (tokenProvider instanceof Closeable) {
|
if (tokenProvider instanceof Closeable) {
|
||||||
|
@ -180,6 +219,18 @@ public class AbfsClient implements Closeable {
|
||||||
return requestHeaders;
|
return requestHeaders;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addCustomerProvidedKeyHeaders(
|
||||||
|
final List<AbfsHttpHeader> requestHeaders) {
|
||||||
|
if (clientProvidedEncryptionKey != null) {
|
||||||
|
requestHeaders.add(
|
||||||
|
new AbfsHttpHeader(X_MS_ENCRYPTION_KEY, clientProvidedEncryptionKey));
|
||||||
|
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
clientProvidedEncryptionKeySHA));
|
||||||
|
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_ALGORITHM,
|
||||||
|
SERVER_SIDE_ENCRYPTION_ALGORITHM));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
|
AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
|
||||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
|
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
|
||||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
|
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
|
||||||
|
@ -289,6 +340,9 @@ public class AbfsClient implements Closeable {
|
||||||
final String permission, final String umask,
|
final String permission, final String umask,
|
||||||
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
|
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
|
if (isFile) {
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
|
}
|
||||||
if (!overwrite) {
|
if (!overwrite) {
|
||||||
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
|
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
|
||||||
}
|
}
|
||||||
|
@ -510,6 +564,7 @@ public class AbfsClient implements Closeable {
|
||||||
AppendRequestParameters reqParams, final String cachedSasToken)
|
AppendRequestParameters reqParams, final String cachedSasToken)
|
||||||
throws AzureBlobFileSystemException {
|
throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||||
|
@ -596,6 +651,7 @@ public class AbfsClient implements Closeable {
|
||||||
boolean isClose, final String cachedSasToken, final String leaseId)
|
boolean isClose, final String cachedSasToken, final String leaseId)
|
||||||
throws AzureBlobFileSystemException {
|
throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||||
|
@ -627,6 +683,7 @@ public class AbfsClient implements Closeable {
|
||||||
public AbfsRestOperation setPathProperties(final String path, final String properties)
|
public AbfsRestOperation setPathProperties(final String path, final String properties)
|
||||||
throws AzureBlobFileSystemException {
|
throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||||
|
@ -660,6 +717,8 @@ public class AbfsClient implements Closeable {
|
||||||
// only traversal (execute) permission is required.
|
// only traversal (execute) permission is required.
|
||||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
|
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
|
||||||
operation = SASTokenProvider.GET_STATUS_OPERATION;
|
operation = SASTokenProvider.GET_STATUS_OPERATION;
|
||||||
|
} else {
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
}
|
}
|
||||||
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
|
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
|
||||||
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
|
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
|
||||||
|
@ -678,6 +737,7 @@ public class AbfsClient implements Closeable {
|
||||||
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
|
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
|
||||||
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
|
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
|
||||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||||
|
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||||
requestHeaders.add(new AbfsHttpHeader(RANGE,
|
requestHeaders.add(new AbfsHttpHeader(RANGE,
|
||||||
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
|
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
|
||||||
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
|
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
|
||||||
|
|
|
@ -0,0 +1,936 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 ("License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.CharBuffer;
|
||||||
|
import java.nio.charset.CharacterCodingException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.CharsetEncoder;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Hashtable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
||||||
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
|
||||||
|
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
|
||||||
|
import static org.apache.hadoop.fs.permission.AclEntryType.USER;
|
||||||
|
import static org.apache.hadoop.fs.permission.FsAction.ALL;
|
||||||
|
|
||||||
|
public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(ITestCustomerProvidedKey.class);
|
||||||
|
|
||||||
|
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
||||||
|
private static final int INT_512 = 512;
|
||||||
|
private static final int INT_50 = 50;
|
||||||
|
private static final int ENCRYPTION_KEY_LEN = 32;
|
||||||
|
private static final int FILE_SIZE = 10 * ONE_MB;
|
||||||
|
private static final int FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS = 24 * ONE_MB;
|
||||||
|
|
||||||
|
public ITestCustomerProvidedKey() throws Exception {
|
||||||
|
boolean isCPKTestsEnabled = getConfiguration()
|
||||||
|
.getBoolean(FS_AZURE_TEST_CPK_ENABLED, false);
|
||||||
|
Assume.assumeTrue(isCPKTestsEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadWithCPK() throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
|
String fileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
int length = FILE_SIZE;
|
||||||
|
byte[] buffer = new byte[length];
|
||||||
|
final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
|
||||||
|
final String eTag = op.getResult()
|
||||||
|
.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.read(fileName, 0, buffer, 0, length, eTag, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, true);
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
assertResponseHeader(abfsRestOperation, false,
|
||||||
|
X_MS_REQUEST_SERVER_ENCRYPTED, "");
|
||||||
|
|
||||||
|
// Trying to read with different CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"different-1234567890123456789012");
|
||||||
|
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
FSDataInputStream iStream = fs2.open(new Path(fileName))) {
|
||||||
|
int len = 8 * ONE_MB;
|
||||||
|
byte[] b = new byte[len];
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
iStream.read(b, 0, len);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trying to read with no CPK headers
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
|
||||||
|
.get(conf); FSDataInputStream iStream = fs3.open(new Path(fileName))) {
|
||||||
|
int len = 8 * ONE_MB;
|
||||||
|
byte[] b = new byte[len];
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
iStream.read(b, 0, len);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadWithoutCPK() throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(false);
|
||||||
|
String fileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
int length = INT_512;
|
||||||
|
byte[] buffer = new byte[length * 4];
|
||||||
|
final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
|
||||||
|
final String eTag = op.getResult()
|
||||||
|
.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.read(fileName, 0, buffer, 0, length, eTag, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
assertResponseHeader(abfsRestOperation, false,
|
||||||
|
X_MS_REQUEST_SERVER_ENCRYPTED, "");
|
||||||
|
|
||||||
|
// Trying to read with CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"12345678901234567890123456789012");
|
||||||
|
|
||||||
|
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient2.read(fileName, 0, buffer, 0, length, eTag, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppendWithCPK() throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
|
final String fileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
|
// Trying to append with correct CPK headers
|
||||||
|
AppendRequestParameters appendRequestParameters =
|
||||||
|
new AppendRequestParameters(
|
||||||
|
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||||
|
byte[] buffer = getRandomBytesArray(5);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.append(fileName, buffer, appendRequestParameters, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, true);
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
|
||||||
|
// Trying to append with different CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"different-1234567890123456789012");
|
||||||
|
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient2.append(fileName, buffer, appendRequestParameters, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trying to append with no CPK headers
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
|
||||||
|
.get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient3.append(fileName, buffer, appendRequestParameters, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppendWithoutCPK() throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(false);
|
||||||
|
final String fileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
|
// Trying to append without CPK headers
|
||||||
|
AppendRequestParameters appendRequestParameters =
|
||||||
|
new AppendRequestParameters(
|
||||||
|
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||||
|
byte[] buffer = getRandomBytesArray(5);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.append(fileName, buffer, appendRequestParameters, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
"");
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
|
||||||
|
// Trying to append with CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"12345678901234567890123456789012");
|
||||||
|
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient2.append(fileName, buffer, appendRequestParameters, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetGetXAttr() throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(true);
|
||||||
|
String fileName = methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, fileName, FILE_SIZE);
|
||||||
|
|
||||||
|
String valSent = "testValue";
|
||||||
|
String attrName = "testXAttr";
|
||||||
|
|
||||||
|
// set get and verify
|
||||||
|
fs.setXAttr(new Path(fileName), attrName,
|
||||||
|
valSent.getBytes(StandardCharsets.UTF_8),
|
||||||
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
|
byte[] valBytes = fs.getXAttr(new Path(fileName), attrName);
|
||||||
|
String valRecieved = new String(valBytes);
|
||||||
|
assertEquals(valSent, valRecieved);
|
||||||
|
|
||||||
|
// set new value get and verify
|
||||||
|
valSent = "new value";
|
||||||
|
fs.setXAttr(new Path(fileName), attrName,
|
||||||
|
valSent.getBytes(StandardCharsets.UTF_8),
|
||||||
|
EnumSet.of(XAttrSetFlag.REPLACE));
|
||||||
|
valBytes = fs.getXAttr(new Path(fileName), attrName);
|
||||||
|
valRecieved = new String(valBytes);
|
||||||
|
assertEquals(valSent, valRecieved);
|
||||||
|
|
||||||
|
// Read without CPK header
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
getAbfs(false).getXAttr(new Path(fileName), attrName);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wrong CPK
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
getSameFSWithWrongCPK(fs).getXAttr(new Path(fileName), attrName);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyBetweenAccounts() throws Exception {
|
||||||
|
String accountName = getRawConfiguration()
|
||||||
|
.get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT);
|
||||||
|
String accountKey = getRawConfiguration()
|
||||||
|
.get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY);
|
||||||
|
Assume.assumeTrue(accountName != null && !accountName.isEmpty());
|
||||||
|
Assume.assumeTrue(accountKey != null && !accountKey.isEmpty());
|
||||||
|
String fileSystemName = "cpkfs";
|
||||||
|
|
||||||
|
// Create fs1 and a file with CPK
|
||||||
|
AzureBlobFileSystem fs1 = getAbfs(true);
|
||||||
|
int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
|
||||||
|
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||||
|
Path testFilePath = createFileWithContent(fs1, "fs1-file.txt", fileContent);
|
||||||
|
|
||||||
|
// Create fs2 with different CPK
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||||
|
conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
|
||||||
|
conf.unset(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_ABFS_ACCOUNT_NAME, accountName);
|
||||||
|
conf.set(FS_AZURE_ACCOUNT_KEY + "." + accountName, accountKey);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"123456789012345678901234567890ab");
|
||||||
|
conf.set("fs.defaultFS", "abfs://" + fileSystemName + "@" + accountName);
|
||||||
|
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
|
||||||
|
// Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
|
||||||
|
Path fs2DestFilePath = new Path("fs2-dest-file.txt");
|
||||||
|
FSDataOutputStream ops = fs2.create(fs2DestFilePath);
|
||||||
|
try (FSDataInputStream iStream = fs1.open(testFilePath)) {
|
||||||
|
long totalBytesRead = 0;
|
||||||
|
do {
|
||||||
|
int length = 8 * ONE_MB;
|
||||||
|
byte[] buffer = new byte[length];
|
||||||
|
int bytesRead = iStream.read(buffer, 0, length);
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
ops.write(buffer);
|
||||||
|
} while (totalBytesRead < fileContent.length);
|
||||||
|
ops.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trying to read fs2DestFilePath with different CPK headers
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"different-1234567890123456789012");
|
||||||
|
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
|
||||||
|
.get(conf); FSDataInputStream iStream = fs3.open(fs2DestFilePath)) {
|
||||||
|
int length = 8 * ONE_MB;
|
||||||
|
byte[] buffer = new byte[length];
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
iStream.read(buffer, 0, length);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trying to read fs2DestFilePath with no CPK headers
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
try (AzureBlobFileSystem fs4 = (AzureBlobFileSystem) FileSystem
|
||||||
|
.get(conf); FSDataInputStream iStream = fs4.open(fs2DestFilePath)) {
|
||||||
|
int length = 8 * ONE_MB;
|
||||||
|
byte[] buffer = new byte[length];
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
iStream.read(buffer, 0, length);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read fs2DestFilePath and verify the content with the initial random
|
||||||
|
// bytes created and wrote into the source file at fs1
|
||||||
|
try (FSDataInputStream iStream = fs2.open(fs2DestFilePath)) {
|
||||||
|
long totalBytesRead = 0;
|
||||||
|
int pos = 0;
|
||||||
|
do {
|
||||||
|
int length = 8 * ONE_MB;
|
||||||
|
byte[] buffer = new byte[length];
|
||||||
|
int bytesRead = iStream.read(buffer, 0, length);
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
for (int i = 0; i < bytesRead; i++) {
|
||||||
|
assertEquals(fileContent[pos + i], buffer[i]);
|
||||||
|
}
|
||||||
|
pos = pos + bytesRead;
|
||||||
|
} while (totalBytesRead < fileContent.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListPathWithCPK() throws Exception {
|
||||||
|
testListPath(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListPathWithoutCPK() throws Exception {
|
||||||
|
testListPath(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testListPath(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
String testDirName = "/" + methodName.getMethodName();
|
||||||
|
final Path testPath = new Path(testDirName);
|
||||||
|
fs.mkdirs(testPath);
|
||||||
|
createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
|
||||||
|
createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.listPath(testDirName, false, INT_50, null);
|
||||||
|
assertListstatus(fs, abfsRestOperation, testPath);
|
||||||
|
|
||||||
|
// Trying with different CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"different-1234567890123456789012");
|
||||||
|
AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
AbfsClient abfsClient2 = fs2.getAbfsClient();
|
||||||
|
abfsRestOperation = abfsClient2.listPath(testDirName, false, INT_50, null);
|
||||||
|
assertListstatus(fs, abfsRestOperation, testPath);
|
||||||
|
|
||||||
|
if (isWithCPK) {
|
||||||
|
// Trying with no CPK headers
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem.get(conf);
|
||||||
|
AbfsClient abfsClient3 = fs3.getAbfsClient();
|
||||||
|
abfsRestOperation = abfsClient3
|
||||||
|
.listPath(testDirName, false, INT_50, null);
|
||||||
|
assertListstatus(fs, abfsRestOperation, testPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertListstatus(AzureBlobFileSystem fs,
|
||||||
|
AbfsRestOperation abfsRestOperation, Path testPath) throws IOException {
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
|
||||||
|
FileStatus[] listStatuses = fs.listStatus(testPath);
|
||||||
|
Assertions.assertThat(listStatuses.length)
|
||||||
|
.describedAs("listStatuses should have 2 entries").isEqualTo(2);
|
||||||
|
|
||||||
|
listStatuses = getSameFSWithWrongCPK(fs).listStatus(testPath);
|
||||||
|
Assertions.assertThat(listStatuses.length)
|
||||||
|
.describedAs("listStatuses should have 2 entries").isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreatePathWithCPK() throws Exception {
|
||||||
|
testCreatePath(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreatePathWithoutCPK() throws Exception {
|
||||||
|
testCreatePath(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCreatePath(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
FsPermission permission = new FsPermission(FsAction.EXECUTE,
|
||||||
|
FsAction.EXECUTE, FsAction.EXECUTE);
|
||||||
|
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
|
||||||
|
FsAction.NONE);
|
||||||
|
boolean isNamespaceEnabled = fs.getIsNamespaceEnabled();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.createPath(testFileName, true, true,
|
||||||
|
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||||
|
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, isWithCPK);
|
||||||
|
assertResponseHeader(abfsRestOperation, isWithCPK,
|
||||||
|
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
|
||||||
|
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
|
||||||
|
Assertions.assertThat(listStatuses.length)
|
||||||
|
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
|
||||||
|
|
||||||
|
listStatuses = getSameFSWithWrongCPK(fs).listStatus(new Path(testFileName));
|
||||||
|
Assertions.assertThat(listStatuses.length)
|
||||||
|
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenamePathWithCPK() throws Exception {
|
||||||
|
testRenamePath(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenamePathWithoutCPK() throws Exception {
|
||||||
|
testRenamePath(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testRenamePath(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
|
FileStatus fileStatusBeforeRename = fs
|
||||||
|
.getFileStatus(new Path(testFileName));
|
||||||
|
|
||||||
|
String newName = "/newName";
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.renamePath(testFileName, newName, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
|
||||||
|
LambdaTestUtils.intercept(FileNotFoundException.class,
|
||||||
|
(() -> fs.getFileStatus(new Path(testFileName))));
|
||||||
|
|
||||||
|
FileStatus fileStatusAfterRename = fs.getFileStatus(new Path(newName));
|
||||||
|
Assertions.assertThat(fileStatusAfterRename.getLen())
|
||||||
|
.describedAs("File size has to be same before and after rename")
|
||||||
|
.isEqualTo(fileStatusBeforeRename.getLen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlushWithCPK() throws Exception {
|
||||||
|
testFlush(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlushWithoutCPK() throws Exception {
|
||||||
|
testFlush(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testFlush(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
fs.create(new Path(testFileName));
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
String expectedCPKSha = getCPKSha(fs);
|
||||||
|
|
||||||
|
byte[] fileContent = getRandomBytesArray(FILE_SIZE);
|
||||||
|
Path testFilePath = new Path(testFileName + "1");
|
||||||
|
FSDataOutputStream oStream = fs.create(testFilePath);
|
||||||
|
oStream.write(fileContent);
|
||||||
|
|
||||||
|
// Trying to read with different CPK headers
|
||||||
|
Configuration conf = fs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
"different-1234567890123456789012");
|
||||||
|
try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
AbfsClient abfsClient2 = fs2.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient2.flush(testFileName, 0, false, false, null, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trying to read with no CPK headers
|
||||||
|
if (isWithCPK) {
|
||||||
|
conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
|
||||||
|
.get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, () -> {
|
||||||
|
abfsClient3.flush(testFileName, 0, false, false, null, null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// With correct CPK
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.flush(testFileName, 0, false, false, null, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, isWithCPK);
|
||||||
|
assertResponseHeader(abfsRestOperation, isWithCPK,
|
||||||
|
X_MS_ENCRYPTION_KEY_SHA256, expectedCPKSha);
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
|
||||||
|
isWithCPK + "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetPathPropertiesWithCPK() throws Exception {
|
||||||
|
testSetPathProperties(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetPathPropertiesWithoutCPK() throws Exception {
|
||||||
|
testSetPathProperties(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSetPathProperties(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
final Hashtable<String, String> properties = new Hashtable<>();
|
||||||
|
properties.put("key", "val");
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.setPathProperties(testFileName,
|
||||||
|
convertXmsPropertiesToCommaSeparatedString(properties));
|
||||||
|
assertCPKHeaders(abfsRestOperation, isWithCPK);
|
||||||
|
assertResponseHeader(abfsRestOperation, isWithCPK,
|
||||||
|
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPathStatusFileWithCPK() throws Exception {
|
||||||
|
testGetPathStatusFile(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPathStatusFileWithoutCPK() throws Exception {
|
||||||
|
testGetPathStatusFile(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.getPathStatus(testFileName, false);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertResponseHeader(abfsRestOperation, isWithCPK,
|
||||||
|
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
assertResponseHeader(abfsRestOperation, false,
|
||||||
|
X_MS_REQUEST_SERVER_ENCRYPTED, "");
|
||||||
|
|
||||||
|
abfsRestOperation = abfsClient.getPathStatus(testFileName, true);
|
||||||
|
assertCPKHeaders(abfsRestOperation, isWithCPK);
|
||||||
|
assertResponseHeader(abfsRestOperation, isWithCPK,
|
||||||
|
X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
|
||||||
|
assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
|
||||||
|
"true");
|
||||||
|
assertResponseHeader(abfsRestOperation, false,
|
||||||
|
X_MS_REQUEST_SERVER_ENCRYPTED, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletePathWithCPK() throws Exception {
|
||||||
|
testDeletePath(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletePathWithoutCPK() throws Exception {
|
||||||
|
testDeletePath(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testDeletePath(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
|
||||||
|
FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
|
||||||
|
Assertions.assertThat(listStatuses.length)
|
||||||
|
.describedAs("listStatuses should have 1 entry").isEqualTo(1);
|
||||||
|
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.deletePath(testFileName, false, null);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
|
||||||
|
Assertions.assertThatThrownBy(() -> fs.listStatus(new Path(testFileName)))
|
||||||
|
.isInstanceOf(FileNotFoundException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetPermissionWithCPK() throws Exception {
|
||||||
|
testSetPermission(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetPermissionWithoutCPK() throws Exception {
|
||||||
|
testSetPermission(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSetPermission(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
Assume.assumeTrue(fs.getIsNamespaceEnabled());
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
FsPermission permission = new FsPermission(FsAction.EXECUTE,
|
||||||
|
FsAction.EXECUTE, FsAction.EXECUTE);
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.setPermission(testFileName, permission.toString());
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetAclWithCPK() throws Exception {
|
||||||
|
testSetAcl(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetAclWithoutCPK() throws Exception {
|
||||||
|
testSetAcl(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSetAcl(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
Assume.assumeTrue(fs.getIsNamespaceEnabled());
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
|
||||||
|
List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(ACCESS, USER, ALL));
|
||||||
|
final Map<String, String> aclEntries = AbfsAclHelper
|
||||||
|
.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
|
||||||
|
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.setAcl(testFileName, AbfsAclHelper.serializeAclSpec(aclEntries));
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAclWithCPK() throws Exception {
|
||||||
|
testGetAcl(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAclWithoutCPK() throws Exception {
|
||||||
|
testGetAcl(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testGetAcl(final boolean isWithCPK) throws Exception {
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
Assume.assumeTrue(fs.getIsNamespaceEnabled());
|
||||||
|
createFileAndGetContent(fs, testFileName, FILE_SIZE);
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient.getAclStatus(testFileName);
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAccessWithCPK() throws Exception {
|
||||||
|
testCheckAccess(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAccessWithoutCPK() throws Exception {
|
||||||
|
testCheckAccess(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCheckAccess(final boolean isWithCPK) throws Exception {
|
||||||
|
boolean isHNSEnabled = getConfiguration()
|
||||||
|
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
|
||||||
|
Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
|
||||||
|
isHNSEnabled);
|
||||||
|
Assume.assumeTrue("AuthType has to be OAuth",
|
||||||
|
getAuthType() == AuthType.OAuth);
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs = getAbfs(isWithCPK);
|
||||||
|
final String testFileName = "/" + methodName.getMethodName();
|
||||||
|
fs.create(new Path(testFileName));
|
||||||
|
AbfsClient abfsClient = fs.getAbfsClient();
|
||||||
|
AbfsRestOperation abfsRestOperation = abfsClient
|
||||||
|
.checkAccess(testFileName, "rwx");
|
||||||
|
assertCPKHeaders(abfsRestOperation, false);
|
||||||
|
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] createFileAndGetContent(AzureBlobFileSystem fs,
|
||||||
|
String fileName, int fileSize) throws IOException {
|
||||||
|
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||||
|
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||||
|
ContractTestUtils.verifyFileContents(fs, testFilePath, fileContent);
|
||||||
|
return fileContent;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertCPKHeaders(AbfsRestOperation abfsRestOperation,
|
||||||
|
boolean isCPKHeaderExpected) {
|
||||||
|
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY, isCPKHeaderExpected);
|
||||||
|
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
isCPKHeaderExpected);
|
||||||
|
assertHeader(abfsRestOperation, X_MS_ENCRYPTION_ALGORITHM,
|
||||||
|
isCPKHeaderExpected);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNoCPKResponseHeadersPresent(
|
||||||
|
AbfsRestOperation abfsRestOperation) {
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, false,
|
||||||
|
X_MS_REQUEST_SERVER_ENCRYPTED, "");
|
||||||
|
assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
|
||||||
|
"");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertResponseHeader(AbfsRestOperation abfsRestOperation,
|
||||||
|
boolean isHeaderExpected, String headerName, String expectedValue) {
|
||||||
|
final AbfsHttpOperation result = abfsRestOperation.getResult();
|
||||||
|
final String value = result.getResponseHeader(headerName);
|
||||||
|
if (isHeaderExpected) {
|
||||||
|
Assertions.assertThat(value).isEqualTo(expectedValue);
|
||||||
|
} else {
|
||||||
|
Assertions.assertThat(value).isNull();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertHeader(AbfsRestOperation abfsRestOperation,
|
||||||
|
String headerName, boolean isCPKHeaderExpected) {
|
||||||
|
assertTrue(abfsRestOperation != null);
|
||||||
|
Optional<AbfsHttpHeader> header = abfsRestOperation.getRequestHeaders()
|
||||||
|
.stream().filter(abfsHttpHeader -> abfsHttpHeader.getName()
|
||||||
|
.equalsIgnoreCase(headerName)).findFirst();
|
||||||
|
String desc;
|
||||||
|
if (isCPKHeaderExpected) {
|
||||||
|
desc =
|
||||||
|
"CPK header " + headerName + " is expected, but the same is absent.";
|
||||||
|
} else {
|
||||||
|
desc = "CPK header " + headerName
|
||||||
|
+ " is not expected, but the same is present.";
|
||||||
|
}
|
||||||
|
Assertions.assertThat(header.isPresent()).describedAs(desc)
|
||||||
|
.isEqualTo(isCPKHeaderExpected);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getSHA256Hash(String key) throws IOException {
|
||||||
|
try {
|
||||||
|
final MessageDigest digester = MessageDigest.getInstance("SHA-256");
|
||||||
|
return digester.digest(key.getBytes(StandardCharsets.UTF_8));
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCPKSha(final AzureBlobFileSystem abfs) throws IOException {
|
||||||
|
Configuration conf = abfs.getConf();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
String encryptionKey = conf
|
||||||
|
.get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
if (encryptionKey == null || encryptionKey.isEmpty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return getBase64EncodedString(getSHA256Hash(encryptionKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBase64EncodedString(byte[] bytes) {
|
||||||
|
return java.util.Base64.getEncoder().encodeToString(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path createFileWithContent(FileSystem fs, String fileName,
|
||||||
|
byte[] fileContent) throws IOException {
|
||||||
|
Path testFilePath = new Path(fileName);
|
||||||
|
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
|
||||||
|
oStream.write(fileContent);
|
||||||
|
oStream.flush();
|
||||||
|
}
|
||||||
|
return testFilePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String convertXmsPropertiesToCommaSeparatedString(
|
||||||
|
final Hashtable<String, String> properties)
|
||||||
|
throws CharacterCodingException {
|
||||||
|
StringBuilder commaSeparatedProperties = new StringBuilder();
|
||||||
|
final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING)
|
||||||
|
.newEncoder();
|
||||||
|
for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
|
||||||
|
String key = propertyEntry.getKey();
|
||||||
|
String value = propertyEntry.getValue();
|
||||||
|
Boolean canEncodeValue = encoder.canEncode(value);
|
||||||
|
if (!canEncodeValue) {
|
||||||
|
throw new CharacterCodingException();
|
||||||
|
}
|
||||||
|
String encodedPropertyValue = Base64
|
||||||
|
.encode(encoder.encode(CharBuffer.wrap(value)).array());
|
||||||
|
commaSeparatedProperties.append(key).append(AbfsHttpConstants.EQUAL)
|
||||||
|
.append(encodedPropertyValue);
|
||||||
|
commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
|
||||||
|
}
|
||||||
|
if (commaSeparatedProperties.length() != 0) {
|
||||||
|
commaSeparatedProperties
|
||||||
|
.deleteCharAt(commaSeparatedProperties.length() - 1);
|
||||||
|
}
|
||||||
|
return commaSeparatedProperties.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getOctalNotation(FsPermission fsPermission) {
|
||||||
|
Preconditions.checkNotNull(fsPermission, "fsPermission");
|
||||||
|
return String
|
||||||
|
.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getRandomBytesArray(int length) {
|
||||||
|
final byte[] b = new byte[length];
|
||||||
|
new Random().nextBytes(b);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AzureBlobFileSystem getAbfs(boolean withCPK) throws IOException {
|
||||||
|
return getAbfs(withCPK, "12345678901234567890123456789012");
|
||||||
|
}
|
||||||
|
|
||||||
|
private AzureBlobFileSystem getAbfs(boolean withCPK, String cpk)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = getRawConfiguration();
|
||||||
|
if (withCPK) {
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName(),
|
||||||
|
cpk);
|
||||||
|
} else {
|
||||||
|
conf.unset(
|
||||||
|
FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName());
|
||||||
|
}
|
||||||
|
return (AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AzureBlobFileSystem getSameFSWithWrongCPK(
|
||||||
|
final AzureBlobFileSystem fs) throws IOException {
|
||||||
|
AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration();
|
||||||
|
Configuration conf = abfsConf.getRawConfiguration();
|
||||||
|
String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||||
|
String cpk = conf
|
||||||
|
.get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
|
||||||
|
if (cpk == null || cpk.isEmpty()) {
|
||||||
|
cpk = "01234567890123456789012345678912";
|
||||||
|
}
|
||||||
|
cpk = "different-" + cpk;
|
||||||
|
String differentCpk = cpk.substring(0, ENCRYPTION_KEY_LEN - 1);
|
||||||
|
conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
|
||||||
|
differentCpk);
|
||||||
|
conf.set("fs.defaultFS",
|
||||||
|
"abfs://" + getFileSystemName() + "@" + accountName);
|
||||||
|
AzureBlobFileSystem sameFSWithDifferentCPK =
|
||||||
|
(AzureBlobFileSystem) FileSystem.newInstance(conf);
|
||||||
|
return sameFSWithDifferentCPK;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ public final class TestConfigurationKeys {
|
||||||
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
|
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
|
||||||
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
|
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
|
||||||
public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
|
public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
|
||||||
|
public static final String FS_AZURE_TEST_CPK_ENABLED = "fs.azure.test.cpk.enabled";
|
||||||
|
|
||||||
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
|
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
|
||||||
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
|
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
|
||||||
|
@ -54,6 +55,9 @@ public final class TestConfigurationKeys {
|
||||||
|
|
||||||
public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret";
|
public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret";
|
||||||
|
|
||||||
|
public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT = "fs.azure.test.cpk-enabled-secondary-account";
|
||||||
|
public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY = "fs.azure.test.cpk-enabled-secondary-account.key";
|
||||||
|
|
||||||
public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
|
public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
|
||||||
public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
|
public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
|
||||||
public static final int TEST_TIMEOUT = 15 * 60 * 1000;
|
public static final int TEST_TIMEOUT = 15 * 60 * 1000;
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -103,7 +102,7 @@ public final class TestAbfsClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getUserAgentString(AbfsConfiguration config,
|
private String getUserAgentString(AbfsConfiguration config,
|
||||||
boolean includeSSLProvider) throws MalformedURLException {
|
boolean includeSSLProvider) throws IOException {
|
||||||
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
|
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
|
||||||
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
||||||
config, (AccessTokenProvider) null, abfsClientContext);
|
config, (AccessTokenProvider) null, abfsClientContext);
|
||||||
|
@ -250,8 +249,7 @@ public final class TestAbfsClient {
|
||||||
|
|
||||||
public static AbfsClient createTestClientFromCurrentContext(
|
public static AbfsClient createTestClientFromCurrentContext(
|
||||||
AbfsClient baseAbfsClientInstance,
|
AbfsClient baseAbfsClientInstance,
|
||||||
AbfsConfiguration abfsConfig)
|
AbfsConfiguration abfsConfig) throws IOException {
|
||||||
throws AzureBlobFileSystemException {
|
|
||||||
AuthType currentAuthType = abfsConfig.getAuthType(
|
AuthType currentAuthType = abfsConfig.getAuthType(
|
||||||
abfsConfig.getAccountName());
|
abfsConfig.getAccountName());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue