HADOOP-15682. ABFS: Add support for StreamCapabilities. Fix javadoc and checkstyle.

Contributed by Thomas Marquardt.
This commit is contained in:
Thomas Marquardt 2018-08-24 01:28:17 +00:00
parent 6b6f8cc2be
commit dd2b22fa31
6 changed files with 60 additions and 7 deletions

View File

@ -28,8 +28,10 @@
@InterfaceStability.Evolving
public interface ConfigurationValidator<T> {
/**
* Validates the configValue.
* Validates a configuration value.
* @param configValue the configuration value to be validated.
* @return validated value of type T
* @throws InvalidConfigurationValueException if the configuration value is invalid.
*/
T validate(String configValue) throws InvalidConfigurationValueException;
}

View File

@ -198,7 +198,7 @@ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttp
this.connection.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, clientRequestId);
}
/**
/**
* Sends the HTTP request. Note that HttpUrlConnection requires that an
* empty buffer be sent in order to set the "Content-Length: 0" header, which
* is required by our endpoint.
@ -242,6 +242,10 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
/**
* Gets and processes the HTTP response.
*
* @param buffer a buffer to hold the response entity body
* @param offset an offset in the buffer where the data will being.
* @param length the number of bytes to be written to the buffer.
*
* @throws IOException if an error occurs.
*/
public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
@ -32,13 +33,14 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
/**
* The BlobFsOutputStream for Rest AbfsClient.
*/
public class AbfsOutputStream extends OutputStream implements Syncable {
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
private final AbfsClient client;
private final String path;
private long position;
@ -87,6 +89,23 @@ public AbfsOutputStream(
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
}
/**
* Query the stream for a specific capability.
*
* @param capability string to query the stream support for.
* @return true for hsync and hflush.
*/
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return supportFlush;
default:
return false;
}
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be

View File

@ -35,7 +35,8 @@ public interface KeyProvider {
* @param conf
* Hadoop configuration parameters
* @return the plaintext storage account key
* @throws KeyProviderException
* @throws KeyProviderException if an error occurs while attempting to get
* the storage account key.
*/
String getStorageAccountKey(String accountName, Configuration conf)
throws KeyProviderException;

View File

@ -40,7 +40,7 @@
* performance.
*
*/
public class SSLSocketFactoryEx extends SSLSocketFactory {
public final class SSLSocketFactoryEx extends SSLSocketFactory {
/**
* Default indicates Ordered, preferred OpenSSL, if failed to load then fall
@ -64,9 +64,9 @@ public enum SSLChannelMode {
* Initialize a singleton SSL socket factory.
*
* @param preferredMode applicable only if the instance is not initialized.
* @throws IOException
* @throws IOException if an error occurs.
*/
public synchronized static void initializeDefaultFactory(
public static synchronized void initializeDefaultFactory(
SSLChannelMode preferredMode) throws IOException {
if (instance == null) {
instance = new SSLSocketFactoryEx(preferredMode);

View File

@ -31,6 +31,7 @@
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.hadoop.fs.StreamCapabilities;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Assume;
@ -287,6 +288,32 @@ public void testHsyncWithFlushEnabled() throws Exception {
}
}
@Test
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
}
}
@Test
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
}
}
@Test
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();