HADOOP-15659. Code changes for bug fix and new tests.

Contributed by Da Zhou.
This commit is contained in:
Thomas Marquardt 2018-08-11 00:10:26 +00:00 committed by Yuan Gao
parent b537d8c442
commit 5da4bb8892
33 changed files with 826 additions and 108 deletions

View File

@ -141,17 +141,6 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
@ -185,11 +174,24 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>joda-time</artifactId> <artifactId>jetty-util-ajax</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<scope>compile</scope>
</dependency>
<!-- dependencies use for test only --> <!-- dependencies use for test only -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Map; import java.util.Map;
@ -33,13 +33,17 @@ import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidati
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
/** /**
* Configuration for Azure Blob FileSystem. * Configuration for Azure Blob FileSystem.
@ -111,10 +115,23 @@ public class AbfsConfiguration{
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization; private boolean createRemoteFileSystemDuringInitialization;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION)
private boolean skipUserGroupMetadataDuringInitialization;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
private int readAheadQueueDepth; private int readAheadQueueDepth;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH,
DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
@StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = "")
private String userAgentId;
private Map<String, String> storageAccountKeys; private Map<String, String> storageAccountKeys;
public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
@ -147,13 +164,38 @@ public class AbfsConfiguration{
return this.isSecure; return this.isSecure;
} }
public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException {
String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); String key;
if (accountKey == null) { String keyProviderClass =
configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
KeyProvider keyProvider;
if (keyProviderClass == null) {
// No key provider was provided so use the provided key as is.
keyProvider = new SimpleKeyProvider();
} else {
// create an instance of the key provider class and verify it
// implements KeyProvider
Object keyProviderObject;
try {
Class<?> clazz = configuration.getClassByName(keyProviderClass);
keyProviderObject = clazz.newInstance();
} catch (Exception e) {
throw new KeyProviderException("Unable to load key provider class.", e);
}
if (!(keyProviderObject instanceof KeyProvider)) {
throw new KeyProviderException(keyProviderClass
+ " specified in config is not a valid KeyProvider class.");
}
keyProvider = (KeyProvider) keyProviderObject;
}
key = keyProvider.getStorageAccountKey(accountName, configuration);
if (key == null) {
throw new ConfigurationPropertyNotFoundException(accountName); throw new ConfigurationPropertyNotFoundException(accountName);
} }
return accountKey; return key;
} }
public Configuration getConfiguration() { public Configuration getConfiguration() {
@ -212,10 +254,22 @@ public class AbfsConfiguration{
return this.createRemoteFileSystemDuringInitialization; return this.createRemoteFileSystemDuringInitialization;
} }
public boolean getSkipUserGroupMetadataDuringInitialization() {
return this.skipUserGroupMetadataDuringInitialization;
}
public int getReadAheadQueueDepth() { public int getReadAheadQueueDepth() {
return this.readAheadQueueDepth; return this.readAheadQueueDepth;
} }
public boolean isFlushEnabled() {
return this.enableFlush;
}
public String getCustomUserAgentPrefix() {
return this.userAgentId;
}
void validateStorageAccountKeys() throws InvalidConfigurationValueException { void validateStorageAccountKeys() throws InvalidConfigurationValueException {
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
@ -294,4 +348,9 @@ public class AbfsConfiguration{
void setWriteBufferSize(int bufferSize) { void setWriteBufferSize(int bufferSize) {
this.writeBufferSize = bufferSize; this.writeBufferSize = bufferSize;
} }
@VisibleForTesting
void setEnableFlush(boolean enableFlush) {
this.enableFlush = enableFlush;
}
} }

View File

@ -36,6 +36,7 @@ import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -90,7 +91,6 @@ public class AzureBlobFileSystem extends FileSystem {
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.userGroupInformation = UserGroupInformation.getCurrentUser(); this.userGroupInformation = UserGroupInformation.getCurrentUser();
this.user = userGroupInformation.getUserName(); this.user = userGroupInformation.getUserName();
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation); this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation);
LOG.debug("Initializing NativeAzureFileSystem for {}", uri); LOG.debug("Initializing NativeAzureFileSystem for {}", uri);
@ -98,10 +98,19 @@ public class AzureBlobFileSystem extends FileSystem {
this.setWorkingDirectory(this.getHomeDirectory()); this.setWorkingDirectory(this.getHomeDirectory());
if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) { if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) {
if (!this.fileSystemExists()) {
this.createFileSystem(); this.createFileSystem();
} }
} }
if (!abfsStore.getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
} else {
//Provide a default group name
this.primaryUserGroup = this.user;
}
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
@ -375,7 +384,7 @@ public class AzureBlobFileSystem extends FileSystem {
if (file.getLen() < start) { if (file.getLen() < start) {
return new BlockLocation[0]; return new BlockLocation[0];
} }
final String blobLocationHost = this.abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost();
final String[] name = { blobLocationHost }; final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost }; final String[] host = { blobLocationHost };
@ -397,6 +406,13 @@ public class AzureBlobFileSystem extends FileSystem {
return locations; return locations;
} }
@Override
protected void finalize() throws Throwable {
LOG.debug("finalize() called.");
close();
super.finalize();
}
public String getOwnerUser() { public String getOwnerUser() {
return user; return user;
} }
@ -450,13 +466,31 @@ public class AzureBlobFileSystem extends FileSystem {
} }
} }
private boolean fileSystemExists() throws IOException {
LOG.debug(
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
try {
abfsStore.getFilesystemProperties();
} catch (AzureBlobFileSystemException ex) {
try {
checkException(null, ex);
// Because HEAD request won't contain message body,
// there is not way to get the storage error code
// workaround here is to check its status code.
} catch (FileNotFoundException e) {
return false;
}
}
return true;
}
private void createFileSystem() throws IOException { private void createFileSystem() throws IOException {
LOG.debug( LOG.debug(
"AzureBlobFileSystem.createFileSystem uri: {}", uri); "AzureBlobFileSystem.createFileSystem uri: {}", uri);
try { try {
this.abfsStore.createFilesystem(); abfsStore.createFilesystem();
} catch (AzureBlobFileSystemException ex) { } catch (AzureBlobFileSystemException ex) {
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); checkException(null, ex);
} }
} }
@ -615,6 +649,11 @@ public class AzureBlobFileSystem extends FileSystem {
@VisibleForTesting @VisibleForTesting
AzureBlobFileSystemStore getAbfsStore() { AzureBlobFileSystemStore getAbfsStore() {
return this.abfsStore; return abfsStore;
}
@VisibleForTesting
AbfsClient getAbfsClient() {
return abfsStore.getClient();
} }
} }

View File

@ -31,8 +31,11 @@ import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder; import java.nio.charset.CharsetEncoder;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map; import java.util.Map;
@ -65,7 +68,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@ -75,8 +77,6 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -223,7 +223,7 @@ public class AzureBlobFileSystemStore {
final OutputStream outputStream; final OutputStream outputStream;
outputStream = new FSDataOutputStream( outputStream = new FSDataOutputStream(
new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
abfsConfiguration.getWriteBufferSize()), null); abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null);
return outputStream; return outputStream;
} }
@ -287,7 +287,7 @@ public class AzureBlobFileSystemStore {
final OutputStream outputStream; final OutputStream outputStream;
outputStream = new FSDataOutputStream( outputStream = new FSDataOutputStream(
new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset, abfsConfiguration.getWriteBufferSize()), null); offset, abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null);
return outputStream; return outputStream;
} }
@ -366,7 +366,7 @@ public class AzureBlobFileSystemStore {
true, true,
1, 1,
blockSize, blockSize,
parseLastModifiedTime(lastModified).getMillis(), parseLastModifiedTime(lastModified),
path, path,
eTag); eTag);
} else { } else {
@ -385,7 +385,7 @@ public class AzureBlobFileSystemStore {
parseIsDirectory(resourceType), parseIsDirectory(resourceType),
1, 1,
blockSize, blockSize,
parseLastModifiedTime(lastModified).getMillis(), parseLastModifiedTime(lastModified),
path, path,
eTag); eTag);
} }
@ -419,10 +419,7 @@ public class AzureBlobFileSystemStore {
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
final DateTime dateTime = DateTime.parse( lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
entry.lastModified(),
DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
lastModifiedMillis = dateTime.getMillis();
} }
Path entryPath = new Path(File.separator + entry.name()); Path entryPath = new Path(File.separator + entry.name());
@ -534,10 +531,16 @@ public class AzureBlobFileSystemStore {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
} }
private DateTime parseLastModifiedTime(final String lastModifiedTime) { private long parseLastModifiedTime(final String lastModifiedTime) {
return DateTime.parse( long parsedTime = 0;
lastModifiedTime, try {
DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime);
parsedTime = utcDate.getTime();
} catch (ParseException e) {
LOG.error("Failed to parse the date {0}", lastModifiedTime);
} finally {
return parsedTime;
}
} }
private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
@ -702,5 +705,9 @@ public class AzureBlobFileSystemStore {
} }
} }
@VisibleForTesting
AbfsClient getClient() {
return this.client;
}
} }

View File

@ -49,9 +49,15 @@ public final class ConfigurationKeys {
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in";
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable"; public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }

View File

@ -39,7 +39,7 @@ public final class FileSystemConfigurations {
private static final int ONE_MB = ONE_KB * ONE_KB; private static final int ONE_MB = ONE_KB * ONE_KB;
// Default upload and download buffer size // Default upload and download buffer size
public static final int DEFAULT_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
@ -50,10 +50,12 @@ public final class FileSystemConfigurations {
public static final int MAX_CONCURRENT_WRITE_THREADS = 8; public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false; public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false;
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
private FileSystemConfigurations() {} private FileSystemConfigurations() {}
} }

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Thrown if there is a problem instantiating a KeyProvider or retrieving a key
* using a KeyProvider object.
*/
@InterfaceAudience.Private
public class KeyProviderException extends AzureBlobFileSystemException {
private static final long serialVersionUID = 1L;
public KeyProviderException(String message) {
super(message);
}
public KeyProviderException(String message, Throwable cause) {
super(message);
}
public KeyProviderException(Throwable t) {
super(t.getMessage());
}
}

View File

@ -43,6 +43,7 @@ public enum AzureServiceErrorCode {
INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null),
INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."),
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
UNKNOWN(null, -1, null); UNKNOWN(null, -1, null);
private final String errorCode; private final String errorCode;

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.fs.azurebfs.contracts.services; package org.apache.hadoop.fs.azurebfs.contracts.services;
import com.fasterxml.jackson.annotation.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.fs.azurebfs.contracts.services;
import java.util.List; import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;

View File

@ -26,12 +26,13 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
@ -44,7 +45,7 @@ public class AbfsClient {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private final URL baseUrl; private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials; private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2018-03-28"; private final String xMsVersion = "2018-06-17";
private final ExponentialRetryPolicy retryPolicy; private final ExponentialRetryPolicy retryPolicy;
private final String filesystem; private final String filesystem;
private final AbfsConfiguration abfsConfiguration; private final AbfsConfiguration abfsConfiguration;
@ -59,7 +60,7 @@ public class AbfsClient {
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1); this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration; this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = exponentialRetryPolicy; this.retryPolicy = exponentialRetryPolicy;
this.userAgent = initializeUserAgent(); this.userAgent = initializeUserAgent(abfsConfiguration);
} }
public String getFileSystem() { public String getFileSystem() {
@ -137,7 +138,7 @@ public class AbfsClient {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
@ -380,8 +381,8 @@ public class AbfsClient {
return url; return url;
} }
private static String urlEncode(final String value) throws AzureBlobFileSystemException { public static String urlEncode(final String value) throws AzureBlobFileSystemException {
String encodedString = null; String encodedString;
try { try {
encodedString = URLEncoder.encode(value, UTF_8) encodedString = URLEncoder.encode(value, UTF_8)
.replace(PLUS, PLUS_ENCODE) .replace(PLUS, PLUS_ENCODE)
@ -393,14 +394,23 @@ public class AbfsClient {
return encodedString; return encodedString;
} }
private String initializeUserAgent() { @VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration) {
final String userAgentComment = String.format(Locale.ROOT, final String userAgentComment = String.format(Locale.ROOT,
"(JavaJRE %s; %s %s)", "(JavaJRE %s; %s %s)",
System.getProperty(JAVA_VERSION), System.getProperty(JAVA_VERSION),
System.getProperty(OS_NAME) System.getProperty(OS_NAME)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING), .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING),
System.getProperty(OS_VERSION)); System.getProperty(OS_VERSION));
String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix();
if (customUserAgentId != null && !customUserAgentId.isEmpty()) {
return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId);
}
return String.format(CLIENT_VERSION + " %s", userAgentComment); return String.format(CLIENT_VERSION + " %s", userAgentComment);
} }
@VisibleForTesting
URL getBaseUrl() {
return baseUrl;
}
} }

View File

@ -26,10 +26,11 @@ import java.net.URL;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import com.fasterxml.jackson.core.JsonFactory; import org.codehaus.jackson.JsonFactory;
import com.fasterxml.jackson.core.JsonParser; import org.codehaus.jackson.JsonParser;
import com.fasterxml.jackson.core.JsonToken; import org.codehaus.jackson.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -167,7 +168,7 @@ public class AbfsHttpOperation {
*/ */
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders) public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
throws IOException { throws IOException {
this.isTraceEnabled = this.LOG.isTraceEnabled(); this.isTraceEnabled = LOG.isTraceEnabled();
this.url = url; this.url = url;
this.method = method; this.method = method;
this.clientRequestId = UUID.randomUUID().toString(); this.clientRequestId = UUID.randomUUID().toString();
@ -303,7 +304,7 @@ public class AbfsHttpOperation {
} }
} }
} catch (IOException ex) { } catch (IOException ex) {
this.LOG.error("UnexpectedError: ", ex); LOG.error("UnexpectedError: ", ex);
throw ex; throw ex;
} finally { } finally {
if (this.isTraceEnabled) { if (this.isTraceEnabled) {
@ -355,7 +356,7 @@ public class AbfsHttpOperation {
return; return;
} }
JsonFactory jf = new JsonFactory(); JsonFactory jf = new JsonFactory();
try (JsonParser jp = jf.createParser(stream)) { try (JsonParser jp = jf.createJsonParser(stream)) {
String fieldName, fieldValue; String fieldName, fieldValue;
jp.nextToken(); // START_OBJECT - { jp.nextToken(); // START_OBJECT - {
jp.nextToken(); // FIELD_NAME - "error": jp.nextToken(); // FIELD_NAME - "error":
@ -384,7 +385,7 @@ public class AbfsHttpOperation {
// Ignore errors that occur while attempting to parse the storage // Ignore errors that occur while attempting to parse the storage
// error, since the response may have been handled by the HTTP driver // error, since the response may have been handled by the HTTP driver
// or for other reasons have an unexpected // or for other reasons have an unexpected
this.LOG.debug("ExpectedError: ", ex); LOG.debug("ExpectedError: ", ex);
} }
} }
@ -415,7 +416,7 @@ public class AbfsHttpOperation {
final ObjectMapper objectMapper = new ObjectMapper(); final ObjectMapper objectMapper = new ObjectMapper();
this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class);
} catch (IOException ex) { } catch (IOException ex) {
this.LOG.error("Unable to deserialize list results", ex); LOG.error("Unable to deserialize list results", ex);
throw ex; throw ex;
} }
} }

View File

@ -64,7 +64,7 @@ public class AbfsInputStream extends FSInputStream {
this.path = path; this.path = path;
this.contentLength = contentLength; this.contentLength = contentLength;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors(); this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.eTag = eTag; this.eTag = eTag;
this.tolerateOobAppends = false; this.tolerateOobAppends = false;
this.readAheadEnabled = true; this.readAheadEnabled = true;

View File

@ -43,6 +43,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
private final String path; private final String path;
private long position; private long position;
private boolean closed; private boolean closed;
private boolean supportFlush;
private volatile IOException lastError; private volatile IOException lastError;
private long lastFlushOffset; private long lastFlushOffset;
@ -61,11 +62,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
final AbfsClient client, final AbfsClient client,
final String path, final String path,
final long position, final long position,
final int bufferSize) { final int bufferSize,
final boolean supportFlush) {
this.client = client; this.client = client;
this.path = path; this.path = path;
this.position = position; this.position = position;
this.closed = false; this.closed = false;
this.supportFlush = supportFlush;
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
@ -162,8 +165,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/ */
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
if (supportFlush) {
flushInternalAsync(); flushInternalAsync();
} }
}
/** Similar to posix fsync, flush out the data in client's user buffer /** Similar to posix fsync, flush out the data in client's user buffer
* all the way to the disk device (but the disk may have it in its cache). * all the way to the disk device (but the disk may have it in its cache).
@ -171,8 +176,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/ */
@Override @Override
public void hsync() throws IOException { public void hsync() throws IOException {
if (supportFlush) {
flushInternal(); flushInternal();
} }
}
/** Flush out the data in client's user buffer. After the return of /** Flush out the data in client's user buffer. After the return of
* this call, new readers will see the data. * this call, new readers will see the data.
@ -180,8 +187,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/ */
@Override @Override
public void hflush() throws IOException { public void hflush() throws IOException {
if (supportFlush) {
flushInternal(); flushInternal();
} }
}
/** /**
* Force all data in the output stream to be written to Azure storage. * Force all data in the output stream to be written to Azure storage.
@ -277,8 +286,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
if (this.lastTotalAppendOffset > this.lastFlushOffset) { if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
} }
this.lastTotalAppendOffset = 0;
} }
private synchronized void flushWrittenBytesToServiceInternal(final long offset, private synchronized void flushWrittenBytesToServiceInternal(final long offset,

View File

@ -121,7 +121,7 @@ public class AbfsRestOperation {
} }
} }
if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) { if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
result.getStorageErrorMessage(), null, result); result.getStorageErrorMessage(), null, result);
} }

View File

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
/** /**
* The UrlQueryBuilder for Rest AbfsClient. * The UrlQueryBuilder for Rest AbfsClient.
@ -51,7 +52,12 @@ public class AbfsUriQueryBuilder {
} else { } else {
sb.append(AbfsHttpConstants.AND_MARK); sb.append(AbfsHttpConstants.AND_MARK);
} }
sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue()); try {
sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(AbfsClient.urlEncode(entry.getValue()));
}
catch (AzureBlobFileSystemException ex) {
throw new IllegalArgumentException("Query string param is not encode-able: " + entry.getKey() + "=" + entry.getValue());
}
} }
return sb.toString(); return sb.toString();
} }

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
/**
* The interface that every Azure file system key provider must implement.
*/
public interface KeyProvider {
/**
* Key providers must implement this method. Given a list of configuration
* parameters for the specified Azure storage account, retrieve the plaintext
* storage account key.
*
* @param accountName
* the storage account name
* @param conf
* Hadoop configuration parameters
* @return the plaintext storage account key
* @throws KeyProviderException
*/
String getStorageAccountKey(String accountName, Configuration conf)
throws KeyProviderException;
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Shell decryption key provider which invokes an external script that will
* perform the key decryption.
*/
public class ShellDecryptionKeyProvider extends SimpleKeyProvider {
private static final Logger LOG = LoggerFactory.getLogger(ShellDecryptionKeyProvider.class);
@Override
public String getStorageAccountKey(String accountName, Configuration conf)
throws KeyProviderException {
String envelope = super.getStorageAccountKey(accountName, conf);
final String command = conf.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT);
if (command == null) {
throw new KeyProviderException(
"Script path is not specified via fs.azure.shellkeyprovider.script");
}
String[] cmd = command.split(" ");
String[] cmdWithEnvelope = Arrays.copyOf(cmd, cmd.length + 1);
cmdWithEnvelope[cmdWithEnvelope.length - 1] = envelope;
String decryptedKey = null;
try {
decryptedKey = Shell.execCommand(cmdWithEnvelope);
} catch (IOException ex) {
throw new KeyProviderException(ex);
}
// trim any whitespace
return decryptedKey.trim();
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
import org.apache.hadoop.security.ProviderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Key provider that simply returns the storage account key from the
* configuration as plaintext.
*/
public class SimpleKeyProvider implements KeyProvider {
private static final Logger LOG = LoggerFactory.getLogger(SimpleKeyProvider.class);
@Override
public String getStorageAccountKey(String accountName, Configuration conf)
throws KeyProviderException {
String key = null;
try {
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
conf, AzureBlobFileSystem.class);
char[] keyChars = c.getPassword(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
if (keyChars != null) {
key = new String(keyChars);
}
} catch(IOException ioe) {
LOG.warn("Unable to get key from credential providers. {}", ioe);
}
return key;
}
}

View File

@ -27,10 +27,6 @@ import java.util.concurrent.Callable;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -41,9 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout; import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
@ -175,6 +169,17 @@ public abstract class AbstractAbfsIntegrationTest extends
return abfs; return abfs;
} }
public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception{
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
return fs;
}
public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception {
configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri);
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
return fs;
}
/** /**
* Creates the filesystem; updates the {@link #abfs} field. * Creates the filesystem; updates the {@link #abfs} field.
* @return the created filesystem. * @return the created filesystem.

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.junit.Assert;
import org.junit.Test;
/**
* Test continuation token which has equal sign.
*/
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
private static final int LIST_MAX_RESULTS = 5000;
@Test
public void testContinuationTokenHavingEqualSign() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
AbfsClient abfsClient = fs.getAbfsClient();
try {
AbfsRestOperation op = abfsClient.listPath("/", true, LIST_MAX_RESULTS, "===========");
Assert.assertTrue(false);
} catch (AbfsRestOperationException ex) {
Assert.assertEquals("InvalidQueryParameterValue", ex.getErrorCode().getErrorCode());
}
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs;
import java.util.Arrays; import java.util.Arrays;
import java.util.Random; import java.util.Random;
@ -28,8 +28,6 @@ import org.junit.runners.Parameterized;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsScaleTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
@ -72,13 +70,13 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
final byte[] b = new byte[2 * bufferSize]; final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b); new Random().nextBytes(b);
try(final FSDataOutputStream stream = fs.create(TEST_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
stream.write(b); stream.write(b);
} }
final byte[] readBuffer = new byte[2 * bufferSize]; final byte[] readBuffer = new byte[2 * bufferSize];
int result; int result;
try(final FSDataInputStream inputStream = fs.open(TEST_PATH)) { try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
inputStream.seek(bufferSize); inputStream.seek(bufferSize);
result = inputStream.read(readBuffer, bufferSize, bufferSize); result = inputStream.read(readBuffer, bufferSize, bufferSize);
assertNotEquals(-1, result); assertNotEquals(-1, result);

View File

@ -108,7 +108,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
final byte[] b = new byte[1024 * 1000]; final byte[] b = new byte[1024 * 1000];
new Random().nextBytes(b); new Random().nextBytes(b);
try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { try (FSDataOutputStream stream = fs.create(TEST_FILE)) {
stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET); stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET);
} }

View File

@ -91,7 +91,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
final FileSystem.Statistics abfsStatistics; final FileSystem.Statistics abfsStatistics;
int testBufferSize; int testBufferSize;
final byte[] sourceData; final byte[] sourceData;
try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { try (FSDataOutputStream stream = fs.create(TEST_FILE)) {
abfsStatistics = fs.getFsStatistics(); abfsStatistics = fs.getFsStatistics();
abfsStatistics.reset(); abfsStatistics.reset();

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.lang.ref.WeakReference;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
/**
* Test finalize() method when "fs.abfs.impl.disable.cache" is enabled.
*/
public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{
static final String DISABLE_CACHE_KEY = "fs.abfs.impl.disable.cache";
public ITestAzureBlobFileSystemFinalize() throws Exception {
super();
}
@Test
public void testFinalize() throws Exception {
// Disable the cache for filesystem to make sure there is no reference.
Configuration configuration = this.getConfiguration();
configuration.setBoolean(this.DISABLE_CACHE_KEY, true);
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
WeakReference<Object> ref = new WeakReference<Object>(fs);
fs = null;
int i = 0;
int maxTries = 1000;
while (ref.get() != null && i < maxTries) {
System.gc();
System.runFinalization();
i++;
}
Assert.assertTrue("testFinalizer didn't get cleaned up within maxTries", ref.get() == null);
}
}

View File

@ -20,12 +20,20 @@ package org.apache.hadoop.fs.azurebfs;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.EnumSet;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.io.IOException;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -46,6 +54,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int THREAD_SLEEP_TIME = 6000; private static final int THREAD_SLEEP_TIME = 6000;
private static final Path TEST_FILE_PATH = new Path("/testfile"); private static final Path TEST_FILE_PATH = new Path("/testfile");
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
private static final int WAITING_TIME = 4000;
public ITestAzureBlobFileSystemFlush() { public ITestAzureBlobFileSystemFlush() {
super(); super();
@ -55,7 +65,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception { public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
final byte[] b; final byte[] b;
try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
b = new byte[TEST_BUFFER_SIZE]; b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b); new Random().nextBytes(b);
@ -84,7 +94,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testAbfsOutputStreamSyncFlush() throws Exception { public void testAbfsOutputStreamSyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
final byte[] b; final byte[] b;
try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
b = new byte[TEST_BUFFER_SIZE]; b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b); new Random().nextBytes(b);
stream.write(b); stream.write(b);
@ -111,7 +121,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
final FileSystem.Statistics abfsStatistics; final FileSystem.Statistics abfsStatistics;
ExecutorService es; ExecutorService es;
try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
abfsStatistics = fs.getFsStatistics(); abfsStatistics = fs.getFsStatistics();
abfsStatistics.reset(); abfsStatistics.reset();
@ -160,7 +170,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testWriteHeavyBytesToFileAsyncFlush() throws Exception { public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();
ExecutorService es = Executors.newFixedThreadPool(10); ExecutorService es = Executors.newFixedThreadPool(10);
try(final FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
final byte[] b = new byte[TEST_BUFFER_SIZE]; final byte[] b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b); new Random().nextBytes(b);
@ -196,4 +206,118 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH); FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
} }
@Test
public void testFlushWithFlushEnabled() throws Exception {
AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
String wasbUrl = testAccount.getFileSystem().getName();
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
byte[] buffer = getRandomBytesArray();
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
// Wait for write request to be executed
Thread.sleep(WAITING_TIME);
stream.flush();
ArrayList<BlockEntry> blockList = blob.downloadBlockList(
BlockListingFilter.COMMITTED, null, null, null);
// verify block has been committed
assertEquals(1, blockList.size());
}
}
@Test
public void testFlushWithFlushDisabled() throws Exception {
AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
String wasbUrl = testAccount.getFileSystem().getName();
String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
byte[] buffer = getRandomBytesArray();
CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
// Wait for write request to be executed
Thread.sleep(WAITING_TIME);
stream.flush();
ArrayList<BlockEntry> blockList = blob.downloadBlockList(
BlockListingFilter.COMMITTED, null, null, null);
// verify block has not been committed
assertEquals(0, blockList.size());
}
}
@Test
public void testHflushWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
stream.hflush();
validate(fs, TEST_FILE_PATH, buffer, true);
}
}
@Test
public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
stream.hflush();
validate(fs, TEST_FILE_PATH, buffer, false);
}
}
@Test
public void testHsyncWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
stream.hsync();
validate(fs, TEST_FILE_PATH, buffer, true);
}
}
@Test
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
stream.hsync();
validate(fs, TEST_FILE_PATH, buffer, false);
}
}
private byte[] getRandomBytesArray() {
final byte[] b = new byte[TEST_FILE_LENGTH];
new Random().nextBytes(b);
return b;
}
private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException {
fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
FSDataOutputStream stream = fs.create(path);
stream.write(buffer);
return stream;
}
private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
this.getConfiguration());
}
private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
String filePath = path.toUri().toString();
try (FSDataInputStream inputStream = fs.open(path)) {
byte[] readBuffer = new byte[TEST_FILE_LENGTH];
int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
if (isEqual) {
assertArrayEquals(
String.format("Bytes read do not match bytes written to %1$s", filePath), writeBuffer, readBuffer);
} else {
assertThat(
String.format("Bytes read unexpectedly match bytes written to %1$s",
filePath),
readBuffer,
IsNot.not(IsEqual.equalTo(writeBuffer)));
}
}
}
} }

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
/** /**

View File

@ -30,7 +30,6 @@ import org.junit.Test;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs;
import java.util.Hashtable; import java.util.Hashtable;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -28,8 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import static org.junit.Assert.assertEquals;
/** /**
* Test FileSystemProperties. * Test FileSystemProperties.
*/ */
@ -62,7 +59,6 @@ public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
} }
@Test @Test
@Ignore("JDK7 doesn't support PATCH, so PUT is used. Fix is applied in latest test tenant")
public void testBase64FileSystemProperties() throws Exception { public void testBase64FileSystemProperties() throws Exception {
final AzureBlobFileSystem fs = getFileSystem(); final AzureBlobFileSystem fs = getFileSystem();

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -48,7 +48,7 @@ import org.junit.Test;
* Test ConfigurationServiceFieldsValidation. * Test ConfigurationServiceFieldsValidation.
*/ */
public class TestAbfsConfigurationFieldsValidation { public class TestAbfsConfigurationFieldsValidation {
private final AbfsConfiguration abfsConfiguration; private AbfsConfiguration abfsConfiguration;
private static final String INT_KEY= "intKey"; private static final String INT_KEY= "intKey";
private static final String LONG_KEY= "longKey"; private static final String LONG_KEY= "longKey";
@ -89,10 +89,12 @@ public class TestAbfsConfigurationFieldsValidation {
private boolean boolField; private boolean boolField;
public TestAbfsConfigurationFieldsValidation() throws Exception { public TestAbfsConfigurationFieldsValidation() throws Exception {
super();
Base64 base64 = new Base64(); Base64 base64 = new Base64();
this.encodedString = new String(base64.encode("base64Value".getBytes(Charsets.UTF_8)), Charsets.UTF_8); this.encodedString = new String(base64.encode("base64Value".getBytes(Charsets.UTF_8)), Charsets.UTF_8);
this.encodedAccountKey = new String(base64.encode("someAccountKey".getBytes(Charsets.UTF_8)), Charsets.UTF_8); this.encodedAccountKey = new String(base64.encode("someAccountKey".getBytes(Charsets.UTF_8)), Charsets.UTF_8);
Configuration configuration = new Configuration(false); Configuration configuration = new Configuration();
configuration.addResource("azure-bfs-test.xml");
configuration.set(INT_KEY, "1234565"); configuration.set(INT_KEY, "1234565");
configuration.set(LONG_KEY, "4194304"); configuration.set(LONG_KEY, "4194304");
configuration.set(STRING_KEY, "stringValue"); configuration.set(STRING_KEY, "stringValue");

View File

@ -43,8 +43,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
@Override @Override
public String getScheme() { public String getScheme() {
return isSecure ? return isSecure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME
FileSystemUriSchemes.ABFS_SECURE_SCHEME
: FileSystemUriSchemes.ABFS_SCHEME; : FileSystemUriSchemes.ABFS_SCHEME;
} }

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.URL;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
/**
* Test useragent of abfs client.
*
*/
public final class TestAbfsClient {
@Test
public void verifyUnknownUserAgent() throws Exception {
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)";
final Configuration configuration = new Configuration();
configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
Assert.assertTrue(pattern.matcher(userAgent).matches());
}
@Test
public void verifyUserAgent() throws Exception {
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\) Partner Service";
final Configuration configuration = new Configuration();
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
AbfsClient abfsClient = new AbfsClient(new URL("http://azure.com"), null, abfsConfiguration, null);
String userAgent = abfsClient.initializeUserAgent(abfsConfiguration);
Pattern pattern = Pattern.compile(expectedUserAgentPattern);
Assert.assertTrue(pattern.matcher(userAgent).matches());
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.File;
import org.junit.Assert;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
import org.apache.hadoop.util.Shell;
import static org.junit.Assert.assertEquals;
/**
* Test ShellDecryptionKeyProvider.
*
*/
public class TestShellDecryptionKeyProvider {
public static final Log LOG = LogFactory
.getLog(TestShellDecryptionKeyProvider.class);
private static final File TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "TestShellDecryptionKeyProvider");
@Test
public void testScriptPathNotSpecified() throws Exception {
if (!Shell.WINDOWS) {
return;
}
ShellDecryptionKeyProvider provider = new ShellDecryptionKeyProvider();
Configuration conf = new Configuration();
String account = "testacct";
String key = "key";
conf.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + account, key);
try {
provider.getStorageAccountKey(account, conf);
Assert
.fail("fs.azure.shellkeyprovider.script is not specified, we should throw");
} catch (KeyProviderException e) {
LOG.info("Received an expected exception: " + e.getMessage());
}
}
@Test
public void testValidScript() throws Exception {
if (!Shell.WINDOWS) {
return;
}
String expectedResult = "decretedKey";
// Create a simple script which echoes the given key plus the given
// expected result (so that we validate both script input and output)
File scriptFile = new File(TEST_ROOT_DIR, "testScript.cmd");
FileUtils.writeStringToFile(scriptFile, "@echo %1 " + expectedResult);
ShellDecryptionKeyProvider provider = new ShellDecryptionKeyProvider();
Configuration conf = new Configuration();
String account = "testacct";
String key = "key1";
conf.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + account, key);
conf.set(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT,
"cmd /c " + scriptFile.getAbsolutePath());
String result = provider.getStorageAccountKey(account, conf);
assertEquals(key + " " + expectedResult, result);
}
}