From dd2b22fa31b4e8c9a9a1da05ae214605fdb5c25e Mon Sep 17 00:00:00 2001 From: Thomas Marquardt Date: Fri, 24 Aug 2018 01:28:17 +0000 Subject: [PATCH] HADOOP-15682. ABFS: Add support for StreamCapabilities. Fix javadoc and checkstyle. Contributed by Thomas Marquardt. --- .../diagnostics/ConfigurationValidator.java | 4 ++- .../azurebfs/services/AbfsHttpOperation.java | 6 ++++- .../azurebfs/services/AbfsOutputStream.java | 21 ++++++++++++++- .../fs/azurebfs/services/KeyProvider.java | 3 ++- .../fs/azurebfs/utils/SSLSocketFactoryEx.java | 6 ++--- .../ITestAzureBlobFileSystemFlush.java | 27 +++++++++++++++++++ 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/diagnostics/ConfigurationValidator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/diagnostics/ConfigurationValidator.java index d61229ee803..e0121b612fe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/diagnostics/ConfigurationValidator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/diagnostics/ConfigurationValidator.java @@ -28,8 +28,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationVa @InterfaceStability.Evolving public interface ConfigurationValidator { /** - * Validates the configValue. + * Validates a configuration value. + * @param configValue the configuration value to be validated. * @return validated value of type T + * @throws InvalidConfigurationValueException if the configuration value is invalid. */ T validate(String configValue) throws InvalidConfigurationValueException; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index f493298f2c1..de38b347248 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -198,7 +198,7 @@ public class AbfsHttpOperation { this.connection.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, clientRequestId); } - /** + /** * Sends the HTTP request. Note that HttpUrlConnection requires that an * empty buffer be sent in order to set the "Content-Length: 0" header, which * is required by our endpoint. @@ -242,6 +242,10 @@ public class AbfsHttpOperation { /** * Gets and processes the HTTP response. * + * @param buffer a buffer to hold the response entity body + * @param offset an offset in the buffer where the data will being. + * @param length the number of bytes to be written to the buffer. + * * @throws IOException if an error occurs. */ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index b69ec835d6d..92e081eaa12 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorCompletionService; @@ -32,13 +33,14 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** * The BlobFsOutputStream for Rest AbfsClient. */ -public class AbfsOutputStream extends OutputStream implements Syncable { +public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { private final AbfsClient client; private final String path; private long position; @@ -87,6 +89,23 @@ public class AbfsOutputStream extends OutputStream implements Syncable { this.completionService = new ExecutorCompletionService<>(this.threadExecutor); } + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } + /** * Writes the specified byte to this output stream. The general contract for * write is that one byte is written to the output stream. The byte to be diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java index 27f76f8594f..09491c520bd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java @@ -35,7 +35,8 @@ public interface KeyProvider { * @param conf * Hadoop configuration parameters * @return the plaintext storage account key - * @throws KeyProviderException + * @throws KeyProviderException if an error occurs while attempting to get + * the storage account key. */ String getStorageAccountKey(String accountName, Configuration conf) throws KeyProviderException; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/SSLSocketFactoryEx.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/SSLSocketFactoryEx.java index 202e1850603..00e7786fa4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/SSLSocketFactoryEx.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/SSLSocketFactoryEx.java @@ -40,7 +40,7 @@ import org.wildfly.openssl.OpenSSLProvider; * performance. * */ -public class SSLSocketFactoryEx extends SSLSocketFactory { +public final class SSLSocketFactoryEx extends SSLSocketFactory { /** * Default indicates Ordered, preferred OpenSSL, if failed to load then fall @@ -64,9 +64,9 @@ public class SSLSocketFactoryEx extends SSLSocketFactory { * Initialize a singleton SSL socket factory. * * @param preferredMode applicable only if the instance is not initialized. - * @throws IOException + * @throws IOException if an error occurs. */ - public synchronized static void initializeDefaultFactory( + public static synchronized void initializeDefaultFactory( SSLChannelMode preferredMode) throws IOException { if (instance == null) { instance = new SSLSocketFactoryEx(preferredMode); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 8a6207a0109..7c6bbb5c607 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -31,6 +31,7 @@ import java.io.IOException; import com.microsoft.azure.storage.blob.BlockEntry; import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.hadoop.fs.StreamCapabilities; import org.hamcrest.core.IsEqual; import org.hamcrest.core.IsNot; import org.junit.Assume; @@ -287,6 +288,32 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { } } + @Test + public void testStreamCapabilitiesWithFlushDisabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) { + assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); + assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); + assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); + assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + } + } + + @Test + public void testStreamCapabilitiesWithFlushEnabled() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = getRandomBytesArray(); + try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) { + assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); + assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); + assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); + assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); + assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + } + } + @Test public void testHsyncWithFlushDisabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem();