diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java index b2b34f8a87e..68ddcdf16ee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java @@ -376,6 +376,18 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { outBuffer = new ByteArrayOutputStream(); } + @VisibleForTesting + synchronized void waitForLastFlushCompletion() throws IOException { + try { + if (lastQueuedTask != null) { + lastQueuedTask.waitTillDone(); + } + } catch (InterruptedException e1) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + } + /** * Extend the page blob file if we are close to the end. */ @@ -554,7 +566,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { } @Override - public void hflush() throws IOException { // hflush is required to force data to storage, so call hsync, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index fc8796bcbfa..dcfff2fbe37 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -61,8 +61,6 @@ public class SyncableDataOutputStream extends DataOutputStream public void hflush() throws IOException { if (out instanceof Syncable) { ((Syncable) out).hflush(); - } else { - out.flush(); } } @@ -70,8 +68,6 @@ public class SyncableDataOutputStream extends DataOutputStream public void hsync() throws IOException { if (out instanceof Syncable) { ((Syncable) out).hsync(); - } else { - out.flush(); } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java new file mode 100644 index 00000000000..9ac1f734014 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Random; + +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; +import com.microsoft.azure.storage.blob.CloudBlockBlob; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import org.hamcrest.core.IsEqual; +import org.hamcrest.core.IsNot; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.junit.Assume.assumeNotNull; + +/** + * Test semantics of functions flush, hflush, hsync, and close for block blobs, + * block blobs with compaction, and page blobs. + */ +public class ITestOutputStreamSemantics extends AbstractWasbTestBase { + + private static final String PAGE_BLOB_DIR = "/pageblob"; + private static final String BLOCK_BLOB_DIR = "/blockblob"; + private static final String BLOCK_BLOB_COMPACTION_DIR = "/compaction"; + + private byte[] getRandomBytes() { + byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE + - PageBlobFormatHelpers.PAGE_HEADER_SIZE]; + Random rand = new Random(); + rand.nextBytes(buffer); + return buffer; + } + + private Path getBlobPathWithTestName(String parentDir) { + return new Path(parentDir + "/" + methodName.getMethodName()); + } + + private void validate(Path path, byte[] writeBuffer, boolean isEqual) + throws IOException { + String blobPath = path.toUri().getPath(); + try (FSDataInputStream inputStream = fs.open(path)) { + byte[] readBuffer = new byte[PageBlobFormatHelpers.PAGE_SIZE + - PageBlobFormatHelpers.PAGE_HEADER_SIZE]; + int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length); + + if (isEqual) { + assertArrayEquals( + String.format("Bytes read do not match bytes written to %1$s", + blobPath), + writeBuffer, + readBuffer); + } else { + assertThat( + String.format("Bytes read unexpectedly match bytes written to %1$s", + blobPath), + readBuffer, + IsNot.not(IsEqual.equalTo(writeBuffer))); + } + } + } + + private boolean isBlockBlobAppendStreamWrapper(FSDataOutputStream stream) { + return + ((SyncableDataOutputStream) + ((NativeAzureFileSystem.NativeAzureFsOutputStream) + stream.getWrappedStream()) + .getOutStream()) + .getOutStream() + instanceof BlockBlobAppendStream; + } + + private boolean isPageBlobStreamWrapper(FSDataOutputStream stream) { + return + ((SyncableDataOutputStream) stream.getWrappedStream()) + .getOutStream() + instanceof PageBlobOutputStream; + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + Configuration conf = new Configuration(); + + // Configure the page blob directories + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, PAGE_BLOB_DIR); + + // Configure the block blob with compaction directories + conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES, + BLOCK_BLOB_COMPACTION_DIR); + + return AzureBlobStorageTestAccount.create( + "", + EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer), + conf); + } + + // Verify flush writes data to storage for Page Blobs + @Test + public void testPageBlobFlush() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + + try (FSDataOutputStream stream = fs.create(path)) { + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.flush(); + + // flush is asynchronous for Page Blob, so we need to + // wait for it to complete + SyncableDataOutputStream syncStream = + (SyncableDataOutputStream) stream.getWrappedStream(); + PageBlobOutputStream pageBlobStream = + (PageBlobOutputStream)syncStream.getOutStream(); + pageBlobStream.waitForLastFlushCompletion(); + + validate(path, buffer, true); + } + } + + + // Verify hflush writes data to storage for Page Blobs + @Test + public void testPageBlobHFlush() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isPageBlobStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.hflush(); + validate(path, buffer, true); + } + } + + // HSync must write data to storage for Page Blobs + @Test + public void testPageBlobHSync() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isPageBlobStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.hsync(); + validate(path, buffer, true); + } + } + + // Close must write data to storage for Page Blobs + @Test + public void testPageBlobClose() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isPageBlobStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.close(); + validate(path, buffer, true); + } + } + + // Verify flush does not write data to storage for Block Blobs + @Test + public void testBlockBlobFlush() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.flush(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(1, blockList.size()); + } + + // Verify hflush does not write data to storage for Block Blobs + @Test + public void testBlockBlobHFlush() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.hflush(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(1, blockList.size()); + } + + // Verify hsync does not write data to storage for Block Blobs + @Test + public void testBlockBlobHSync() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.hsync(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(1, blockList.size()); + } + + // Close must write data to storage for Block Blobs + @Test + public void testBlockBlobClose() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + + try (FSDataOutputStream stream = fs.create(path)) { + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.close(); + validate(path, buffer, true); + } + } + + // Verify flush writes data to storage for Block Blobs with compaction + @Test + public void testBlockBlobCompactionFlush() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isBlockBlobAppendStreamWrapper(stream)); + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.flush(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(1, blockList.size()); + } + + // Verify hflush writes data to storage for Block Blobs with Compaction + @Test + public void testBlockBlobCompactionHFlush() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isBlockBlobAppendStreamWrapper(stream)); + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.hflush(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(10, blockList.size()); + } + + // Verify hsync writes data to storage for Block Blobs with compaction + @Test + public void testBlockBlobCompactionHSync() throws Exception { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + byte[] buffer = getRandomBytes(); + + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isBlockBlobAppendStreamWrapper(stream)); + for (int i = 0; i < 10; i++) { + stream.write(buffer); + stream.hsync(); + } + } + String blobPath = path.toUri().getPath(); + // Create a blob reference to read and validate the block list + CloudBlockBlob blob = testAccount.getBlobReference(blobPath.substring(1)); + // after the stream is closed, the block list should be non-empty + ArrayList blockList = blob.downloadBlockList( + BlockListingFilter.COMMITTED, + null,null, null); + assertEquals(10, blockList.size()); + } + + // Close must write data to storage for Block Blobs with compaction + @Test + public void testBlockBlobCompactionClose() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isBlockBlobAppendStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + stream.close(); + validate(path, buffer, true); + } + } + + // A small write does not write data to storage for Page Blobs + @Test + public void testPageBlobSmallWrite() throws IOException { + Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isPageBlobStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + validate(path, buffer, false); + } + } + + // A small write does not write data to storage for Block Blobs + @Test + public void testBlockBlobSmallWrite() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + byte[] buffer = getRandomBytes(); + stream.write(buffer); + validate(path, buffer, false); + } + } + + // A small write does not write data to storage for Block Blobs + // with Compaction + @Test + public void testBlockBlobCompactionSmallWrite() throws IOException { + Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); + try (FSDataOutputStream stream = fs.create(path)) { + assertTrue(isBlockBlobAppendStreamWrapper(stream)); + byte[] buffer = getRandomBytes(); + stream.write(buffer); + validate(path, buffer, false); + } + } +}