HADOOP-15677. WASB: Add support for StreamCapabilities.
Contributed by Thomas Marquardt.
This commit is contained in:
parent
585a4f96d7
commit
1eef0acfe5
|
@ -29,11 +29,13 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
|
import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
|
@ -52,7 +54,7 @@ import com.microsoft.azure.storage.blob.CloudPageBlob;
|
||||||
* An output stream that write file data to a page blob stored using ASV's
|
* An output stream that write file data to a page blob stored using ASV's
|
||||||
* custom format.
|
* custom format.
|
||||||
*/
|
*/
|
||||||
final class PageBlobOutputStream extends OutputStream implements Syncable {
|
final class PageBlobOutputStream extends OutputStream implements Syncable, StreamCapabilities {
|
||||||
/**
|
/**
|
||||||
* The maximum number of raw bytes Azure Storage allows us to upload in a
|
* The maximum number of raw bytes Azure Storage allows us to upload in a
|
||||||
* single request (4 MB).
|
* single request (4 MB).
|
||||||
|
@ -195,6 +197,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query the stream for a specific capability.
|
||||||
|
*
|
||||||
|
* @param capability string to query the stream support for.
|
||||||
|
* @return true for hsync and hflush.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
switch (capability.toLowerCase(Locale.ENGLISH)) {
|
||||||
|
case StreamCapabilities.HSYNC:
|
||||||
|
case StreamCapabilities.HFLUSH:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes this output stream and releases any system resources associated with
|
* Closes this output stream and releases any system resources associated with
|
||||||
* this stream. If any data remains in the buffer it is committed to the
|
* this stream. If any data remains in the buffer it is committed to the
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.hamcrest.core.IsEqual;
|
import org.hamcrest.core.IsEqual;
|
||||||
import org.hamcrest.core.IsNot;
|
import org.hamcrest.core.IsNot;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -186,6 +187,20 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Page Blobs have StreamCapabilities.HFLUSH and StreamCapabilities.HSYNC.
|
||||||
|
@Test
|
||||||
|
public void testPageBlobCapabilities() throws IOException {
|
||||||
|
Path path = getBlobPathWithTestName(PAGE_BLOB_DIR);
|
||||||
|
try (FSDataOutputStream stream = fs.create(path)) {
|
||||||
|
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
|
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
stream.write(getRandomBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Verify flush does not write data to storage for Block Blobs
|
// Verify flush does not write data to storage for Block Blobs
|
||||||
@Test
|
@Test
|
||||||
public void testBlockBlobFlush() throws Exception {
|
public void testBlockBlobFlush() throws Exception {
|
||||||
|
@ -265,6 +280,20 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Block Blobs do not have any StreamCapabilities.
|
||||||
|
@Test
|
||||||
|
public void testBlockBlobCapabilities() throws IOException {
|
||||||
|
Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR);
|
||||||
|
try (FSDataOutputStream stream = fs.create(path)) {
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
stream.write(getRandomBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Verify flush writes data to storage for Block Blobs with compaction
|
// Verify flush writes data to storage for Block Blobs with compaction
|
||||||
@Test
|
@Test
|
||||||
public void testBlockBlobCompactionFlush() throws Exception {
|
public void testBlockBlobCompactionFlush() throws Exception {
|
||||||
|
@ -347,6 +376,20 @@ public class ITestOutputStreamSemantics extends AbstractWasbTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Block Blobs with Compaction have StreamCapabilities.HFLUSH and HSYNC.
|
||||||
|
@Test
|
||||||
|
public void testBlockBlobCompactionCapabilities() throws IOException {
|
||||||
|
Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR);
|
||||||
|
try (FSDataOutputStream stream = fs.create(path)) {
|
||||||
|
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
|
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
|
assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
stream.write(getRandomBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// A small write does not write data to storage for Page Blobs
|
// A small write does not write data to storage for Page Blobs
|
||||||
@Test
|
@Test
|
||||||
public void testPageBlobSmallWrite() throws IOException {
|
public void testPageBlobSmallWrite() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue