diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index bd8ac686e3d..639862fa859 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -202,6 +202,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { */ private Set pageBlobDirs; + /** + * Configuration key to indicate the set of directories in WASB where we + * should store files as block blobs with block compaction enabled. + * + * Entries can be directory paths relative to the container (e.g. "/path") or + * fully qualified wasb:// URIs (e.g. + * wasb://container@example.blob.core.windows.net/path) + */ + public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES = + "fs.azure.block.blob.with.compaction.dir"; + + /** + * The set of directories where we should store files as block blobs with + * block compaction enabled. + */ + private Set blockBlobWithCompationDirs; + /** * Configuration key to indicate the set of directories in WASB where * we should do atomic folder rename synchronized with createNonRecursive. @@ -527,6 +544,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // User-agent userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT); + // Extract the directories that should contain block blobs with compaction + blockBlobWithCompationDirs = getDirectorySet( + KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES); + LOG.debug("Block blobs with compaction directories: {}", + setToString(blockBlobWithCompationDirs)); + // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; @@ -1164,6 +1187,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { return isKeyForDirectorySet(key, pageBlobDirs); } + /** + * Checks if the given key in Azure Storage should be stored as a block blobs + * with compaction enabled instead of normal block blob. + * + * @param key blob name + * @return true, if the file is in directory with block compaction enabled. + */ + public boolean isBlockBlobWithCompactionKey(String key) { + return isKeyForDirectorySet(key, blockBlobWithCompationDirs); + } + /** * Checks if the given key in Azure storage should have synchronized * atomic folder rename createNonRecursive implemented. @@ -1356,7 +1390,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } @Override - public DataOutputStream storefile(String key, PermissionStatus permissionStatus) + public DataOutputStream storefile(String keyEncoded, + PermissionStatus permissionStatus, + String key) throws AzureException { try { @@ -1417,12 +1453,26 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Get the blob reference from the store's container and // return it. - CloudBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(keyEncoded); storePermissionStatus(blob, permissionStatus); // Create the output stream for the Azure blob. // - OutputStream outputStream = openOutputStream(blob); + OutputStream outputStream; + + if (isBlockBlobWithCompactionKey(key)) { + BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream( + (CloudBlockBlobWrapper) blob, + keyEncoded, + this.uploadBlockSizeBytes, + true, + getInstrumentedContext()); + + outputStream = blockBlobOutputStream; + } else { + outputStream = openOutputStream(blob); + } + DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream); return dataOutStream; } catch (Exception e) { @@ -2869,10 +2919,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { CloudBlobWrapper blob = this.container.getBlockBlobReference(key); - BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext()); - appendStream.initialize(); + OutputStream outputStream; - return new DataOutputStream(appendStream); + BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream( + (CloudBlockBlobWrapper) blob, + key, + bufferSize, + isBlockBlobWithCompactionKey(key), + getInstrumentedContext()); + + outputStream = blockBlobOutputStream; + + DataOutputStream dataOutStream = new SyncableDataOutputStream( + outputStream); + + return dataOutStream; } catch(Exception ex) { throw new AzureException(ex); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index afb9379c3ca..84342cdab1e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -22,122 +22,256 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.Locale; +import java.util.Iterator; import java.util.List; import java.util.UUID; import java.util.Random; -import java.util.TimeZone; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; -import org.eclipse.jetty.util.log.Log; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.BlockEntry; import com.microsoft.azure.storage.blob.BlockListingFilter; +import com.microsoft.azure.storage.blob.BlockSearchMode; + +import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; +import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; /** - * Stream object that implememnts append for Block Blobs in WASB. + * Stream object that implements append for Block Blobs in WASB. + * + * The stream object implements hflush/hsync and block compaction. Block + * compaction is the process of replacing a sequence of small blocks with one + * big block. Azure Block blobs supports up to 50000 blocks and every + * hflush/hsync generates one block. When the number of blocks is above 32000, + * the process of compaction decreases the total number of blocks, if possible. + * If compaction is disabled, hflush/hsync are empty functions. + * + * The stream object uses background threads for uploading the blocks and the + * block blob list. Blocks can be uploaded concurrently. However, when the block + * list is uploaded, block uploading should stop. If a block is uploaded before + * the block list and the block id is not in the list, the block will be lost. + * If the block is uploaded after the block list and the block id is in the + * list, the block list upload will fail. The exclusive access for the block + * list upload is managed by uploadingSemaphore. */ -public class BlockBlobAppendStream extends OutputStream { +public class BlockBlobAppendStream extends OutputStream implements Syncable, + StreamCapabilities { + + /** + * The name of the blob/file. + */ private final String key; - private final int bufferSize; - private ByteArrayOutputStream outBuffer; + + /** + * This variable tracks if this is new blob or existing one. + */ + private boolean blobExist; + + /** + * When the blob exist, to to prevent concurrent write we take a lease. + * Taking a lease is not necessary for new blobs. + */ + private SelfRenewingLease lease = null; + + /** + * The support for process of compaction is optional. + */ + private final boolean compactionEnabled; + + /** + * The number of blocks above each block compaction is triggered. + */ + private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000; + + /** + * The number of blocks above each block compaction is triggered. + */ + private int activateCompactionBlockCount + = DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT; + + /** + * The size of the output buffer. Writes store the data in outBuffer until + * either the size is above maxBlockSize or hflush/hsync is called. + */ + private final AtomicInteger maxBlockSize; + + /** + * The current buffer where writes are stored. + */ + private ByteBuffer outBuffer; + + /** + * The size of the blob that has been successfully stored in the Azure Blob + * service. + */ + private final AtomicLong committedBlobLength = new AtomicLong(0); + + /** + * Position of last block in the blob. + */ + private volatile long blobLength = 0; + + /** + * Minutes waiting before the close operation timed out. + */ + private static final int CLOSE_UPLOAD_DELAY = 10; + + /** + * Keep alive time for the threadpool. + */ + private static final int THREADPOOL_KEEP_ALIVE = 30; + /** + * Azure Block Blob used for the stream. + */ private final CloudBlockBlobWrapper blob; + + /** + * Azure Storage operation context. + */ private final OperationContext opContext; + /** + * Commands send from client calls to the background thread pool. + */ + private abstract class UploadCommand { + + // the blob offset for the command + private final long commandBlobOffset; + + // command completion latch + private final CountDownLatch completed = new CountDownLatch(1); + + UploadCommand(long offset) { + this.commandBlobOffset = offset; + } + + long getCommandBlobOffset() { + return commandBlobOffset; + } + + void await() throws InterruptedException { + completed.await(); + } + + void awaitAsDependent() throws InterruptedException { + await(); + } + + void setCompleted() { + completed.countDown(); + } + + void execute() throws InterruptedException, IOException {} + + void dump() {} + } + + /** + * The list of recent commands. Before block list is committed, all the block + * listed in the list must be uploaded. activeBlockCommands is used for + * enumerating the blocks and waiting on the latch until the block is + * uploaded. + */ + private final ConcurrentLinkedQueue activeBlockCommands + = new ConcurrentLinkedQueue<>(); + /** * Variable to track if the stream has been closed. */ - private boolean closed = false; + private volatile boolean closed = false; /** - * Variable to track if the append lease is released. + * First IOException encountered. */ - - private volatile boolean leaseFreed; + private final AtomicReference firstError + = new AtomicReference<>(); /** - * Variable to track if the append stream has been - * initialized. + * Flag set when the first error has been thrown. */ - - private boolean initialized = false; + private boolean firstErrorThrown = false; /** - * Last IOException encountered + * Semaphore for serializing block uploads with NativeAzureFileSystem. + * + * The semaphore starts with number of permits equal to the number of block + * upload threads. Each block upload thread needs one permit to start the + * upload. The put block list acquires all the permits before the block list + * is committed. */ - private volatile IOException lastError = null; + private final Semaphore uploadingSemaphore = new Semaphore( + MAX_NUMBER_THREADS_IN_THREAD_POOL, + true); /** - * List to keep track of the uncommitted azure storage - * block ids + * Queue storing buffers with the size of the Azure block ready for + * reuse. The pool allows reusing the blocks instead of allocating new + * blocks. After the data is sent to the service, the buffer is returned + * back to the queue */ - private final List uncommittedBlockEntries; + private final ElasticByteBufferPool poolReadyByteBuffers + = new ElasticByteBufferPool(); + /** + * The blob's block list. + */ + private final List blockEntries = new ArrayList<>( + DEFAULT_CAPACITY_BLOCK_ENTRIES); + + private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024; + + /** + * The uncommitted blob's block list. + */ + private final ConcurrentLinkedDeque uncommittedBlockEntries + = new ConcurrentLinkedDeque<>(); + + /** + * Variable to hold the next block id to be used for azure storage blocks. + */ private static final int UNSET_BLOCKS_COUNT = -1; - - /** - * Variable to hold the next block id to be used for azure - * storage blocks. - */ private long nextBlockCount = UNSET_BLOCKS_COUNT; /** - * Variable to hold the block id prefix to be used for azure - * storage blocks from azure-storage-java sdk version 4.2.0 onwards + * Variable to hold the block id prefix to be used for azure storage blocks. */ private String blockIdPrefix = null; - private final Random sequenceGenerator = new Random(); - /** - * Time to wait to renew lease in milliseconds + * Maximum number of threads in block upload thread pool. */ - private static final int LEASE_RENEWAL_PERIOD = 10000; - - /** - * Number of times to retry for lease renewal - */ - private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3; - - /** - * Time to wait before retrying to set the lease - */ - private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500; - - /** - * Metadata key used on the blob to indicate append lease is active - */ - public static final String APPEND_LEASE = "append_lease"; - - /** - * Timeout value for the append lease in millisecs. If the lease is not - * renewed within 30 seconds then another thread can acquire the append lease - * on the blob - */ - public static final int APPEND_LEASE_TIMEOUT = 30000; - - /** - * Metdata key used on the blob to indicate last modified time of append lease - */ - public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified"; + private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4; /** * Number of times block upload needs is retried. @@ -145,16 +279,32 @@ public class BlockBlobAppendStream extends OutputStream { private static final int MAX_BLOCK_UPLOAD_RETRIES = 3; /** - * Wait time between block upload retries in millisecs. + * Wait time between block upload retries in milliseconds. */ private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000; - private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class); + /** + * Logger. + */ + private static final Logger LOG = + LoggerFactory.getLogger(BlockBlobAppendStream.class); + /** + * The absolute maximum of blocks for a blob. It includes committed and + * temporary blocks. + */ private static final int MAX_BLOCK_COUNT = 100000; + /** + * The upload thread pool executor. + */ private ThreadPoolExecutor ioThreadPool; + /** + * Azure Storage access conditions for the blob. + */ + private final AccessCondition accessCondition = new AccessCondition(); + /** * Atomic integer to provide thread id for thread names for uploader threads. */ @@ -163,106 +313,123 @@ public class BlockBlobAppendStream extends OutputStream { /** * Prefix to be used for thread names for uploader threads. */ - private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream"; - - private static final String UTC_STR = "UTC"; + private static final String THREAD_ID_PREFIX = "append-blockblob"; + /** + * BlockBlobAppendStream constructor. + * + * @param blob + * Azure Block Blob + * @param aKey + * blob's name + * @param bufferSize + * the maximum size of a blob block. + * @param compactionEnabled + * is the compaction process enabled for this blob + * @param opContext + * Azure Store operation context for the blob + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream cannot be used for append operations + */ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob, - final String aKey, final int bufferSize, final OperationContext opContext) + final String aKey, + final int bufferSize, + final boolean compactionEnabled, + final OperationContext opContext) throws IOException { - if (null == aKey || 0 == aKey.length()) { - throw new IllegalArgumentException( - "Illegal argument: The key string is null or empty"); - } - - if (0 >= bufferSize) { - throw new IllegalArgumentException( - "Illegal argument bufferSize cannot be zero or negative"); - } - + Preconditions.checkArgument(StringUtils.isNotEmpty(aKey)); + Preconditions.checkArgument(bufferSize >= 0); this.blob = blob; this.opContext = opContext; this.key = aKey; - this.bufferSize = bufferSize; + this.maxBlockSize = new AtomicInteger(bufferSize); this.threadSequenceNumber = new AtomicInteger(0); this.blockIdPrefix = null; - setBlocksCountAndBlockIdPrefix(); + this.compactionEnabled = compactionEnabled; + this.blobExist = true; + this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get()); - this.outBuffer = new ByteArrayOutputStream(bufferSize); - this.uncommittedBlockEntries = new ArrayList(); - - // Acquire append lease on the blob. try { - //Set the append lease if the value of the append lease is false - if (!updateBlobAppendMetadata(true, false)) { - LOG.error("Unable to set Append Lease on the Blob : {} " - + "Possibly because another client already has a create or append stream open on the Blob", key); - throw new IOException("Unable to set Append lease on the Blob. " - + "Possibly because another client already had an append stream open on the Blob."); - } + // download the block list + blockEntries.addAll( + blob.downloadBlockList( + BlockListingFilter.COMMITTED, + new BlobRequestOptions(), + opContext)); + + blobLength = blob.getProperties().getLength(); + + committedBlobLength.set(blobLength); + + // Acquiring lease on the blob. + lease = new SelfRenewingLease(blob, true); + accessCondition.setLeaseID(lease.getLeaseID()); + } catch (StorageException ex) { - LOG.error("Encountered Storage exception while acquiring append " - + "lease on blob : {}. Storage Exception : {} ErrorCode : {}", - key, ex, ex.getErrorCode()); - - throw new IOException(ex); + if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) { + blobExist = false; + } + else if (ex.getErrorCode().equals( + StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) { + throw new AzureException( + "Unable to set Append lease on the Blob: " + ex, ex); + } + else { + LOG.debug( + "Encountered storage exception." + + " StorageException : {} ErrorCode : {}", + ex, + ex.getErrorCode()); + throw new AzureException(ex); + } } - leaseFreed = false; + setBlocksCountAndBlockIdPrefix(blockEntries); + + this.ioThreadPool = new ThreadPoolExecutor( + MAX_NUMBER_THREADS_IN_THREAD_POOL, + MAX_NUMBER_THREADS_IN_THREAD_POOL, + THREADPOOL_KEEP_ALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new UploaderThreadFactory()); } /** - * Helper method that starts an Append Lease renewer thread and the - * thread pool. + * Set payload size of the stream. + * It is intended to be used for unit testing purposes only. */ - public synchronized void initialize() { + @VisibleForTesting + synchronized void setMaxBlockSize(int size) { + maxBlockSize.set(size); - if (initialized) { - return; - } - /* - * Start the thread for Append lease renewer. - */ - Thread appendLeaseRenewer = new Thread(new AppendRenewer()); - appendLeaseRenewer.setDaemon(true); - appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key)); - appendLeaseRenewer.start(); - - /* - * Parameters to ThreadPoolExecutor: - * corePoolSize : the number of threads to keep in the pool, even if they are idle, - * unless allowCoreThreadTimeOut is set - * maximumPoolSize : the maximum number of threads to allow in the pool - * keepAliveTime - when the number of threads is greater than the core, - * this is the maximum time that excess idle threads will - * wait for new tasks before terminating. - * unit - the time unit for the keepAliveTime argument - * workQueue - the queue to use for holding tasks before they are executed - * This queue will hold only the Runnable tasks submitted by the execute method. - */ - this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS, - new LinkedBlockingQueue(), new UploaderThreadFactory()); - - initialized = true; + // it is for testing only so we can abandon the previously allocated + // payload + this.outBuffer = ByteBuffer.allocate(maxBlockSize.get()); } /** - * Get the blob name. - * - * @return String Blob name. + * Set compaction parameters. + * It is intended to be used for unit testing purposes only. */ - public String getKey() { - return key; + @VisibleForTesting + void setCompactionBlockCount(int activationCount) { + activateCompactionBlockCount = activationCount; } /** - * Get the backing blob. - * @return buffer size of the stream. + * Get the list of block entries. It is used for testing purposes only. + * @return List of block entries. */ - public int getBufferSize() { - return bufferSize; + @VisibleForTesting + List getBlockList() throws StorageException, IOException { + return blob.downloadBlockList( + BlockListingFilter.COMMITTED, + new BlobRequestOptions(), + opContext); } /** @@ -282,21 +449,6 @@ public class BlockBlobAppendStream extends OutputStream { write(new byte[] { (byte) (byteVal & 0xFF) }); } - /** - * Writes b.length bytes from the specified byte array to this output stream. - * - * @param data - * the byte array to write. - * - * @throws IOException - * if an I/O error occurs. In particular, an IOException may be - * thrown if the output stream has been closed. - */ - @Override - public void write(final byte[] data) throws IOException { - write(data, 0, data.length); - } - /** * Writes length bytes from the specified byte array starting at offset to * this output stream. @@ -312,470 +464,354 @@ public class BlockBlobAppendStream extends OutputStream { * thrown if the output stream has been closed. */ @Override - public void write(final byte[] data, final int offset, final int length) + public synchronized void write(final byte[] data, int offset, int length) throws IOException { + Preconditions.checkArgument(data != null, "null data"); if (offset < 0 || length < 0 || length > data.length - offset) { - throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments"); - } - - writeInternal(data, offset, length); - } - - @Override - public synchronized void close() throws IOException { - - if (!initialized) { - throw new IOException("Trying to close an uninitialized Append stream"); - } - - if (closed) { - return; - } - - if (leaseFreed) { - throw new IOException(String.format("Attempting to close an append stream on blob : %s " - + " that does not have lease on the Blob. Failing close", key)); - } - - if (outBuffer.size() > 0) { - uploadBlockToStorage(outBuffer.toByteArray()); - } - - ioThreadPool.shutdown(); - - try { - if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { - LOG.error("Time out occurred while waiting for IO request to finish in append" - + " for blob : {}", key); - NativeAzureFileSystemHelper.logAllLiveStackTraces(); - throw new IOException("Timed out waiting for IO requests to finish"); - } - } catch(InterruptedException intrEx) { - - // Restore the interrupted status - Thread.currentThread().interrupt(); - LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key); - throw new IOException("Append Commit interrupted."); - } - - // Calling commit after all blocks are succesfully uploaded. - if (lastError == null) { - commitAppendBlocks(); - } - - // Perform cleanup. - cleanup(); - - if (lastError != null) { - throw lastError; - } - } - - /** - * Helper method that cleans up the append stream. - */ - private synchronized void cleanup() { - - closed = true; - - try { - // Set the value of append lease to false if the value is set to true. - updateBlobAppendMetadata(false, true); - } catch(StorageException ex) { - LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} " - + "Error Code : {}", - key, ex, ex.getErrorCode()); - lastError = new IOException(ex); - } - - leaseFreed = true; - } - - /** - * Method to commit all the uncommited blocks to azure storage. - * If the commit fails then blocks are automatically cleaned up - * by Azure storage. - * @throws IOException - */ - private synchronized void commitAppendBlocks() throws IOException { - - SelfRenewingLease lease = null; - - try { - if (uncommittedBlockEntries.size() > 0) { - - //Acquiring lease on the blob. - lease = new SelfRenewingLease(blob); - - // Downloading existing blocks - List blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED, - new BlobRequestOptions(), opContext); - - // Adding uncommitted blocks. - blockEntries.addAll(uncommittedBlockEntries); - - AccessCondition accessCondition = new AccessCondition(); - accessCondition.setLeaseID(lease.getLeaseID()); - blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext); - uncommittedBlockEntries.clear(); - } - } catch(StorageException ex) { - LOG.error("Storage exception encountered during block commit phase of append for blob" - + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode()); - throw new IOException("Encountered Exception while committing append blocks", ex); - } finally { - if (lease != null) { - try { - lease.free(); - } catch(StorageException ex) { - LOG.debug("Exception encountered while releasing lease for " - + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode()); - // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object. - } - } - } - } - - /** - * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure - * storage SDK. - */ - private void setBlocksCountAndBlockIdPrefix() throws IOException { - - try { - - if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) { - - List blockEntries = - blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext); - - String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : ""; - String prefix = UUID.randomUUID().toString() + "-"; - String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0); - - if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) { - - // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId - // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix - this.blockIdPrefix = ""; - nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) - + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT); - nextBlockCount += blockEntries.size(); - - } else { - - // If there are no existing blocks, create the first block with newer version (4.2.0) blockId - // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId - this.blockIdPrefix = prefix; - nextBlockCount = blockEntries.size(); - - } - - } - - } catch (StorageException ex) { - LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix." - + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode()); - throw new IOException(ex); - } - } - - /** - * Helper method that generates the next block id for uploading a block to azure storage. - * @return String representing the block ID generated. - * @throws IOException - */ - private String generateBlockId() throws IOException { - - if (nextBlockCount == UNSET_BLOCKS_COUNT) { - throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly"); - } - - if (this.blockIdPrefix == null) { - throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly"); - } - - if (!this.blockIdPrefix.equals("")) { - - return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++); - - } else { - - return generateOlderVersionBlockId(nextBlockCount++); - - } - - } - - /** - * Helper method that generates an older (2.2.0) version blockId - * @return String representing the block ID generated. - */ - private String generateOlderVersionBlockId(long id) { - - byte[] blockIdInBytes = getBytesFromLong(id); - return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); - } - - /** - * Helper method that generates an newer (4.2.0) version blockId - * @return String representing the block ID generated. - */ - private String generateNewerVersionBlockId(String prefix, long id) { - - String blockIdSuffix = String.format("%06d", id); - byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8); - return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); - } - - /** - * Returns a byte array that represents the data of a long value. This - * utility method is copied from com.microsoft.azure.storage.core.Utility class. - * This class is marked as internal, hence we clone the method here and not express - * dependency on the Utility Class - * - * @param value - * The value from which the byte array will be returned. - * - * @return A byte array that represents the data of the specified long value. - */ - private static byte[] getBytesFromLong(final long value) { - - final byte[] tempArray = new byte[8]; - - for (int m = 0; m < 8; m++) { - tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF); - } - - return tempArray; - } - - /** - * Helper method that creates a thread to upload a block to azure storage. - * @param payload - * @throws IOException - */ - private synchronized void uploadBlockToStorage(byte[] payload) - throws IOException { - - // upload payload to azure storage - String blockId = generateBlockId(); - - // Since uploads of the Azure storage are done in parallel threads, we go ahead - // add the blockId in the uncommitted list. If the upload of the block fails - // we don't commit the blockIds. - BlockEntry blockEntry = new BlockEntry(blockId); - blockEntry.setSize(payload.length); - uncommittedBlockEntries.add(blockEntry); - ioThreadPool.execute(new WriteRequest(payload, blockId)); - } - - - /** - * Helper method to updated the Blob metadata during Append lease operations. - * Blob metadata is updated to holdLease value only if the current lease - * status is equal to testCondition and the last update on the blob metadata - * is less that 30 secs old. - * @param holdLease - * @param testCondition - * @return true if the updated lease operation was successful or false otherwise - * @throws StorageException - */ - private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition) - throws StorageException { - - SelfRenewingLease lease = null; - StorageException lastStorageException = null; - int leaseRenewalRetryCount = 0; - - /* - * Updating the Blob metadata honours following algorithm based on - * 1) If the append lease metadata is present - * 2) Last updated time of the append lease - * 3) Previous value of the Append lease metadata. - * - * The algorithm: - * 1) If append lease metadata is not part of the Blob. In this case - * this is the first client to Append so we update the metadata. - * 2) If append lease metadata is present and timeout has occurred. - * In this case irrespective of what the value of the append lease is we update the metadata. - * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter) - * and timeout has not occurred, we update the metadata. - * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter) - * and timeout has not occurred, we do not update metadata and return false. - * - */ - while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) { - - lastStorageException = null; - - synchronized(this) { - try { - - final Calendar currentCalendar = Calendar - .getInstance(Locale.US); - currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR)); - long currentTime = currentCalendar.getTime().getTime(); - - // Acquire lease on the blob. - lease = new SelfRenewingLease(blob); - - blob.downloadAttributes(opContext); - HashMap metadata = blob.getMetadata(); - - if (metadata.containsKey(APPEND_LEASE) - && currentTime - Long.parseLong( - metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT - && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) { - return false; - } - - metadata.put(APPEND_LEASE, Boolean.toString(holdLease)); - metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime)); - blob.setMetadata(metadata); - AccessCondition accessCondition = new AccessCondition(); - accessCondition.setLeaseID(lease.getLeaseID()); - blob.uploadMetadata(accessCondition, null, opContext); - return true; - - } catch (StorageException ex) { - - lastStorageException = ex; - LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} " - + "Error Code : {}", - key, ex, ex.getErrorCode()); - leaseRenewalRetryCount++; - - } finally { - - if (lease != null) { - try { - lease.free(); - } catch(StorageException ex) { - LOG.debug("Encountered Storage exception while releasing lease for Blob {} " - + "during Append metadata operation. Storage Exception {} " - + "Error Code : {} ", key, ex, ex.getErrorCode()); - } finally { - lease = null; - } - } - } - } - - if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) { - throw lastStorageException; - } else { - try { - Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD); - } catch(InterruptedException ex) { - LOG.debug("Blob append metadata updated method interrupted"); - Thread.currentThread().interrupt(); - } - } - } - - // The code should not enter here because the while loop will - // always be executed and if the while loop is executed we - // would returning from the while loop. - return false; - } - - /** - * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer. - * @param data - * @param offset - * @param length - * @throws IOException - */ - private synchronized void writeInternal(final byte[] data, final int offset, final int length) - throws IOException { - - if (!initialized) { - throw new IOException("Trying to write to an un-initialized Append stream"); + throw new IndexOutOfBoundsException(); } if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - if (leaseFreed) { - throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write")); + while (outBuffer.remaining() < length) { + + int remaining = outBuffer.remaining(); + outBuffer.put(data, offset, remaining); + + // upload payload to azure storage + addBlockUploadCommand(); + + offset += remaining; + length -= remaining; } - byte[] currentData = new byte[length]; - System.arraycopy(data, offset, currentData, 0, length); + outBuffer.put(data, offset, length); + } - // check to see if the data to be appended exceeds the - // buffer size. If so we upload a block to azure storage. - while ((outBuffer.size() + currentData.length) > bufferSize) { - byte[] payload = new byte[bufferSize]; + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { - // Add data from the existing buffer - System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size()); - - // Updating the available size in the payload - int availableSpaceInPayload = bufferSize - outBuffer.size(); - - // Adding data from the current call - System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload); - - uploadBlockToStorage(payload); - - // updating the currentData buffer - byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload]; - System.arraycopy(currentData, availableSpaceInPayload, - tempBuffer, 0, currentData.length - availableSpaceInPayload); - currentData = tempBuffer; - outBuffer = new ByteArrayOutputStream(bufferSize); + if (closed) { + // calling close() after the stream is closed starts with call to flush() + return; } - outBuffer.write(currentData); + addBlockUploadCommand(); + + if (committedBlobLength.get() < blobLength) { + try { + // wait until the block list is committed + addFlushCommand().await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } /** - * Runnable instance that uploads the block of data to azure storage. - * - * + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. */ - private class WriteRequest implements Runnable { - private final byte[] dataPayload; - private final String blockId; + @Override + public void hsync() throws IOException { + // when block compaction is disabled, hsync is empty function + if (compactionEnabled) { + flush(); + } + } - public WriteRequest(byte[] dataPayload, String blockId) { - this.dataPayload = dataPayload; - this.blockId = blockId; + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. + */ + @Override + public void hflush() throws IOException { + // when block compaction is disabled, hflush is empty function + if (compactionEnabled) { + flush(); + } + } + + /** + * The Synchronization capabilities of this stream depend upon the compaction + * policy. + * @param capability string to query the stream support for. + * @return true for hsync and hflush when compaction is enabled. + */ + @Override + public boolean hasCapability(String capability) { + return compactionEnabled + && (capability.equalsIgnoreCase(HSYNC.getValue()) + || capability.equalsIgnoreCase((HFLUSH.getValue()))); + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + + LOG.debug("close {} ", key); + + if (closed) { + return; } - @Override - public void run() { + // Upload the last block regardless of compactionEnabled flag + flush(); - int uploadRetryAttempts = 0; - IOException lastLocalException = null; - while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + // Initiates an orderly shutdown in which previously submitted tasks are + // executed. + ioThreadPool.shutdown(); + + try { + // wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks + if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) { + LOG.error("Time out occurred while close() is waiting for IO request to" + + " finish in append" + + " for blob : {}", + key); + NativeAzureFileSystemHelper.logAllLiveStackTraces(); + throw new AzureException("Timed out waiting for IO requests to finish"); + } + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + // release the lease + if (firstError.get() == null && blobExist) { try { + lease.free(); + } catch (StorageException ex) { + LOG.debug("Lease free update blob {} encountered Storage Exception:" + + " {} Error Code : {}", + key, + ex, + ex.getErrorCode()); + maybeSetFirstError(new AzureException(ex)); + } + } - blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload), - dataPayload.length, new BlobRequestOptions(), opContext); + closed = true; + + // finally, throw the first exception raised if it has not + // been thrown elsewhere. + if (firstError.get() != null && !firstErrorThrown) { + throw firstError.get(); + } + } + + /** + * Helper method used to generate the blockIDs. The algorithm used is similar + * to the Azure storage SDK. + */ + private void setBlocksCountAndBlockIdPrefix(List blockEntries) { + + if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) { + + Random sequenceGenerator = new Random(); + + String blockZeroBlockId = (!blockEntries.isEmpty()) + ? blockEntries.get(0).getId() + : ""; + String prefix = UUID.randomUUID().toString() + "-"; + String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, + 0); + + if (!blockEntries.isEmpty() + && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) { + + // If blob has already been created with 2.2.0, append subsequent blocks + // with older version (2.2.0) blockId compute nextBlockCount, the way it + // was done before; and don't use blockIdPrefix + this.blockIdPrefix = ""; + nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) + + sequenceGenerator.nextInt( + Integer.MAX_VALUE - MAX_BLOCK_COUNT); + nextBlockCount += blockEntries.size(); + + } else { + + // If there are no existing blocks, create the first block with newer + // version (4.2.0) blockId. If blob has already been created with 4.2.0, + // append subsequent blocks with newer version (4.2.0) blockId + this.blockIdPrefix = prefix; + nextBlockCount = blockEntries.size(); + } + } + } + + /** + * Helper method that generates the next block id for uploading a block to + * azure storage. + * @return String representing the block ID generated. + * @throws IOException if the stream is in invalid state + */ + private String generateBlockId() throws IOException { + + if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) { + throw new AzureException( + "Append Stream in invalid state. nextBlockCount not set correctly"); + } + + return (!blockIdPrefix.isEmpty()) + ? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++) + : generateOlderVersionBlockId(nextBlockCount++); + } + + /** + * Helper method that generates an older (2.2.0) version blockId. + * @return String representing the block ID generated. + */ + private String generateOlderVersionBlockId(long id) { + + byte[] blockIdInBytes = new byte[8]; + for (int m = 0; m < 8; m++) { + blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF); + } + + return new String( + Base64.encodeBase64(blockIdInBytes), + StandardCharsets.UTF_8); + } + + /** + * Helper method that generates an newer (4.2.0) version blockId. + * @return String representing the block ID generated. + */ + private String generateNewerVersionBlockId(String prefix, long id) { + + String blockIdSuffix = String.format("%06d", id); + byte[] blockIdInBytes = + (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8); + return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); + } + + /** + * This is shared between upload block Runnable and CommitBlockList. The + * method captures retry logic + * @param blockId block name + * @param dataPayload block content + */ + private void writeBlockRequestInternal(String blockId, + ByteBuffer dataPayload, + boolean bufferPoolBuffer) { + IOException lastLocalException = null; + + int uploadRetryAttempts = 0; + while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + try { + long startTime = System.nanoTime(); + + blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream( + dataPayload.array()), dataPayload.position(), + new BlobRequestOptions(), opContext); + + LOG.debug("upload block finished for {} ms. block {} ", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime), blockId); + break; + + } catch(Exception ioe) { + LOG.debug("Encountered exception during uploading block for Blob {}" + + " Exception : {}", key, ioe); + uploadRetryAttempts++; + lastLocalException = new AzureException( + "Encountered Exception while uploading block: " + ioe, ioe); + try { + Thread.sleep( + BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1)); + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); break; - } catch(Exception ioe) { - Log.getLog().debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe); - uploadRetryAttempts++; - lastLocalException = new IOException("Encountered Exception while uploading block", ioe); - try { - Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL); - } catch(InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } } } + } - if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { - lastError = lastLocalException; + if (bufferPoolBuffer) { + poolReadyByteBuffers.putBuffer(dataPayload); + } + + if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { + maybeSetFirstError(lastLocalException); + } + } + + /** + * Set {@link #firstError} to the exception if it is not already set. + * @param exception exception to save + */ + private void maybeSetFirstError(IOException exception) { + firstError.compareAndSet(null, exception); + } + + + /** + * Throw the first error caught if it has not been raised already + * @throws IOException if one is caught and needs to be thrown. + */ + private void maybeThrowFirstError() throws IOException { + if (firstError.get() != null) { + firstErrorThrown = true; + throw firstError.get(); + } + } + + /** + * Write block list. The method captures retry logic + */ + private void writeBlockListRequestInternal() { + + IOException lastLocalException = null; + + int uploadRetryAttempts = 0; + while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + try { + + long startTime = System.nanoTime(); + + blob.commitBlockList(blockEntries, accessCondition, + new BlobRequestOptions(), opContext); + + LOG.debug("Upload block list took {} ms for blob {} ", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime), key); + break; + + } catch(Exception ioe) { + LOG.debug("Encountered exception during uploading block for Blob {}" + + " Exception : {}", key, ioe); + uploadRetryAttempts++; + lastLocalException = new AzureException( + "Encountered Exception while uploading block: " + ioe, ioe); + try { + Thread.sleep( + BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1)); + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } } } + + if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { + maybeSetFirstError(lastLocalException); + } } /** @@ -787,54 +823,319 @@ public class BlockBlobAppendStream extends OutputStream { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key, + t.setName(String.format("%s-%d", THREAD_ID_PREFIX, threadSequenceNumber.getAndIncrement())); return t; } } /** - * A deamon thread that renews the Append lease on the blob. - * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing - * the lease. If an error is encountered while renewing the lease - * then an lease is released by this thread, which fails all other - * operations. + * Upload block commands. */ - private class AppendRenewer implements Runnable { + private class UploadBlockCommand extends UploadCommand { + + // the block content for upload + private final ByteBuffer payload; + + // description of the block + private final BlockEntry entry; + + UploadBlockCommand(String blockId, ByteBuffer payload) { + + super(blobLength); + + BlockEntry blockEntry = new BlockEntry(blockId); + blockEntry.setSize(payload.position()); + blockEntry.setSearchMode(BlockSearchMode.LATEST); + + this.payload = payload; + this.entry = blockEntry; + + uncommittedBlockEntries.add(blockEntry); + } + + /** + * Execute command. + */ + void execute() throws InterruptedException { + + uploadingSemaphore.acquire(1); + writeBlockRequestInternal(entry.getId(), payload, true); + uploadingSemaphore.release(1); + + } + + void dump() { + LOG.debug("upload block {} size: {} for blob {}", + entry.getId(), + entry.getSize(), + key); + } + } + + /** + * Upload blob block list commands. + */ + private class UploadBlockListCommand extends UploadCommand { + + private BlockEntry lastBlock = null; + + UploadBlockListCommand() { + super(blobLength); + + if (!uncommittedBlockEntries.isEmpty()) { + lastBlock = uncommittedBlockEntries.getLast(); + } + } + + void awaitAsDependent() throws InterruptedException { + // empty. later commit block does not need to wait previous commit block + // lists. + } + + void dump() { + LOG.debug("commit block list with {} blocks for blob {}", + uncommittedBlockEntries.size(), key); + } + + /** + * Execute command. + */ + public void execute() throws InterruptedException, IOException { + + if (committedBlobLength.get() >= getCommandBlobOffset()) { + LOG.debug("commit already applied for {}", key); + return; + } + + if (lastBlock == null) { + LOG.debug("nothing to commit for {}", key); + return; + } + + LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key); + + for (UploadCommand activeCommand : activeBlockCommands) { + if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) { + activeCommand.dump(); + activeCommand.awaitAsDependent(); + } else { + break; + } + } + + // stop all uploads until the block list is committed + uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL); + + BlockEntry uncommittedBlock; + do { + uncommittedBlock = uncommittedBlockEntries.poll(); + blockEntries.add(uncommittedBlock); + } while (uncommittedBlock != lastBlock); + + if (blockEntries.size() > activateCompactionBlockCount) { + LOG.debug("Block compaction: activated with {} blocks for {}", + blockEntries.size(), key); + + // Block compaction + long startCompaction = System.nanoTime(); + blockCompaction(); + LOG.debug("Block compaction finished for {} ms with {} blocks for {}", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startCompaction), + blockEntries.size(), key); + } + + writeBlockListRequestInternal(); + + uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL); + + // remove blocks previous commands + for (Iterator it = activeBlockCommands.iterator(); + it.hasNext();) { + UploadCommand activeCommand = it.next(); + if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) { + it.remove(); + } else { + break; + } + } + + committedBlobLength.set(getCommandBlobOffset()); + } + + /** + * Internal output stream with read access to the internal buffer. + */ + private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream { + + ByteArrayOutputStreamInternal(int size) { + super(size); + } + + byte[] getByteArray() { + return buf; + } + } + + /** + * Block compaction process. + * + * Block compaction is only enabled when the number of blocks exceeds + * activateCompactionBlockCount. The algorithm searches for the longest + * segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize + * such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size. + * It then downloads the blocks in the sequence, concatenates the data to + * form a single block, uploads this new block, and updates the block + * list to replace the sequence of blocks with the new block. + */ + private void blockCompaction() throws IOException { + //current segment [segmentBegin, segmentEnd) and file offset/size of the + // current segment + int segmentBegin = 0, segmentEnd = 0; + long segmentOffsetBegin = 0, segmentOffsetEnd = 0; + + //longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of + // the longest segment + int maxSegmentBegin = 0, maxSegmentEnd = 0; + long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0; + + for (BlockEntry block : blockEntries) { + segmentEnd++; + segmentOffsetEnd += block.getSize(); + if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) { + if (segmentEnd - segmentBegin > 2) { + if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) { + maxSegmentBegin = segmentBegin; + maxSegmentEnd = segmentEnd; + maxSegmentOffsetBegin = segmentOffsetBegin; + maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize(); + } + } + segmentBegin = segmentEnd - 1; + segmentOffsetBegin = segmentOffsetEnd - block.getSize(); + } + } + + if (maxSegmentEnd - maxSegmentBegin > 1) { + + LOG.debug("Block compaction: {} blocks for {}", + maxSegmentEnd - maxSegmentBegin, key); + + // download synchronously all the blocks from the azure storage + ByteArrayOutputStreamInternal blockOutputStream + = new ByteArrayOutputStreamInternal(maxBlockSize.get()); + + try { + long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin; + blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream, + new BlobRequestOptions(), opContext); + } catch(StorageException ex) { + LOG.error( + "Storage exception encountered during block compaction phase" + + " : {} Storage Exception : {} Error Code: {}", + key, ex, ex.getErrorCode()); + throw new AzureException( + "Encountered Exception while committing append blocks " + ex, ex); + } + + // upload synchronously new block to the azure storage + String blockId = generateBlockId(); + + ByteBuffer byteBuffer = ByteBuffer.wrap( + blockOutputStream.getByteArray()); + byteBuffer.position(blockOutputStream.size()); + + writeBlockRequestInternal(blockId, byteBuffer, false); + + // replace blocks from the longest segment with new block id + blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear(); + BlockEntry newBlock = blockEntries.get(maxSegmentBegin); + newBlock.setId(blockId); + newBlock.setSearchMode(BlockSearchMode.LATEST); + newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin); + } + } + } + + /** + * Prepare block upload command and queue the command in thread pool executor. + */ + private synchronized void addBlockUploadCommand() throws IOException { + + maybeThrowFirstError(); + + if (blobExist && lease.isFreed()) { + throw new AzureException(String.format( + "Attempting to upload a block on blob : %s " + + " that does not have lease on the Blob. Failing upload", key)); + } + + int blockSize = outBuffer.position(); + if (blockSize > 0) { + UploadCommand command = new UploadBlockCommand(generateBlockId(), + outBuffer); + activeBlockCommands.add(command); + + blobLength += blockSize; + outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get()); + + ioThreadPool.execute(new WriteRequest(command)); + + } + } + + /** + * Prepare block list commit command and queue the command in thread pool + * executor. + */ + private synchronized UploadCommand addFlushCommand() throws IOException { + + maybeThrowFirstError(); + + if (blobExist && lease.isFreed()) { + throw new AzureException( + String.format("Attempting to upload block list on blob : %s" + + " that does not have lease on the Blob. Failing upload", key)); + } + + UploadCommand command = new UploadBlockListCommand(); + activeBlockCommands.add(command); + + ioThreadPool.execute(new WriteRequest(command)); + + return command; + } + + /** + * Runnable instance that uploads the block of data to azure storage. + */ + private class WriteRequest implements Runnable { + private final UploadCommand command; + + WriteRequest(UploadCommand command) { + this.command = command; + } @Override public void run() { - while (!leaseFreed) { - - try { - Thread.sleep(LEASE_RENEWAL_PERIOD); - } catch (InterruptedException ie) { - LOG.debug("Appender Renewer thread interrupted"); - Thread.currentThread().interrupt(); - } - - Log.getLog().debug("Attempting to renew append lease on {}", key); - - try { - if (!leaseFreed) { - // Update the blob metadata to renew the append lease - if (!updateBlobAppendMetadata(true, true)) { - LOG.error("Unable to re-acquire append lease on the Blob {} ", key); - leaseFreed = true; - } - } - } catch (StorageException ex) { - - LOG.debug("Lease renewal for Blob : {} encountered " - + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode()); - - // We swallow the exception here because if the blob metadata is not updated for - // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and - // continue forward if it needs to append. - leaseFreed = true; - } + try { + command.dump(); + long startTime = System.nanoTime(); + command.execute(); + command.setCompleted(); + LOG.debug("command finished for {} ms", + TimeUnit.NANOSECONDS.toMillis( + System.nanoTime() - startTime)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + LOG.debug( + "Encountered exception during execution of command for Blob :" + + " {} Exception : {}", key, ex); + firstError.compareAndSet(null, new AzureException(ex)); } } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 0bde124a899..280c0e0fe8d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -62,6 +62,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.azure.security.Constants; @@ -352,9 +354,9 @@ public class NativeAzureFileSystem extends FileSystem { } /** - * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote + * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote * method. - * + * * Produce a string in double quotes with backslash sequences in all the * right places. A backslash will be inserted within len from the specified byte array starting at offset * off to the output stream. The general contract for write(b, - * off, len) is that some of the bytes in the array - * b are written to the output stream in order; element - * b[off] is the first byte written and - * b[off+len-1] is the last byte written by this operation. + * off, len) is that some of the bytes in the array b + * are written to the output stream in order; element b[off] + * is the first byte written and b[off+len-1] is the last + * byte written by this operation. * * @param b * Byte array to be written. @@ -1749,7 +1796,7 @@ public class NativeAzureFileSystem extends FileSystem { OutputStream bufOutStream; if (store.isPageBlobKey(key)) { // Store page blobs directly in-place without renames. - bufOutStream = store.storefile(key, permissionStatus); + bufOutStream = store.storefile(key, permissionStatus, key); } else { // This is a block blob, so open the output blob stream based on the // encoded key. @@ -1777,7 +1824,7 @@ public class NativeAzureFileSystem extends FileSystem { // these // blocks. bufOutStream = new NativeAzureFsOutputStream(store.storefile( - keyEncoded, permissionStatus), key, keyEncoded); + keyEncoded, permissionStatus, key), key, keyEncoded); } // Construct the data output stream from the buffered output stream. FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 1c7309fa62c..57a729dc155 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -50,8 +50,9 @@ interface NativeFileSystemStore { InputStream retrieve(String key, long byteRangeStart) throws IOException; - DataOutputStream storefile(String key, PermissionStatus permissionStatus) - throws AzureException; + DataOutputStream storefile(String keyEncoded, + PermissionStatus permissionStatus, + String key) throws AzureException; boolean isPageBlobKey(String key); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 5dbb6bc8c02..7c2722ed13b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -519,7 +519,7 @@ public class SecureStorageInterfaceImpl extends StorageInterface { @Override public SelfRenewingLease acquireLease() throws StorageException { - return new SelfRenewingLease(this); + return new SelfRenewingLease(this, false); } } @@ -557,10 +557,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface { } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, + InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { - ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext); + ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, + accessCondition, options, opContext); } @Override @@ -593,4 +595,4 @@ public class SecureStorageInterfaceImpl extends StorageInterface { null, options, opContext); } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java index 00d5e99c20f..10956f73f72 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java @@ -30,6 +30,8 @@ import com.microsoft.azure.storage.blob.CloudBlob; import java.util.concurrent.atomic.AtomicInteger; +import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT; + /** * An Azure blob lease that automatically renews itself indefinitely * using a background thread. Use it to synchronize distributed processes, @@ -66,7 +68,7 @@ public class SelfRenewingLease { @VisibleForTesting static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000; - public SelfRenewingLease(CloudBlobWrapper blobWrapper) + public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent) throws StorageException { this.leaseFreed = false; @@ -79,10 +81,14 @@ public class SelfRenewingLease { leaseID = blob.acquireLease(LEASE_TIMEOUT, null); } catch (StorageException e) { + if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) { + throw e; + } + // Throw again if we don't want to keep waiting. // We expect it to be that the lease is already present, // or in some cases that the blob does not exist. - if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) { + if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) { LOG.info( "Caught exception when trying to get lease on blob " + blobWrapper.getUri().toString() + ". " + e.getMessage()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index 8b6b082ee43..e03d7311e23 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -665,6 +665,7 @@ abstract class StorageInterface { * * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob * the length of all Block IDs must be identical. + * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob. * @param sourceStream An {@link InputStream} object that represents the input stream to write to the * block blob. * @param length A long which represents the length, in bytes, of the stream data, @@ -678,7 +679,7 @@ abstract class StorageInterface { * @throws IOException If an I/O error occurred. * @throws StorageException If a storage service error occurred. */ - void uploadBlock(String blockId, InputStream sourceStream, + void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index d3d03706599..41a4dbb159d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -277,7 +277,7 @@ class StorageInterfaceImpl extends StorageInterface { return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath)); } - + @Override public CloudBlobWrapper getPageBlobReference(String relativePath) throws URISyntaxException, StorageException { @@ -286,7 +286,7 @@ class StorageInterfaceImpl extends StorageInterface { } } - + abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper { private final CloudBlob blob; @@ -441,10 +441,10 @@ class StorageInterfaceImpl extends StorageInterface { @Override public SelfRenewingLease acquireLease() throws StorageException { - return new SelfRenewingLease(this); + return new SelfRenewingLease(this, false); } } - + // // CloudBlockBlobWrapperImpl @@ -479,10 +479,10 @@ class StorageInterfaceImpl extends StorageInterface { } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { - ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext); + ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index a52fdb719ab..fc8796bcbfa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.classification.InterfaceAudience; /** * Support the Syncable interface on top of a DataOutputStream. @@ -38,6 +39,16 @@ public class SyncableDataOutputStream extends DataOutputStream super(out); } + /** + * Get a reference to the wrapped output stream. + * + * @return the underlying output stream + */ + @InterfaceAudience.LimitedPrivate({"HDFS"}) + public OutputStream getOutStream() { + return out; + } + @Override public boolean hasCapability(String capability) { if (out instanceof StreamCapabilities) { diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 758650daa96..466bf0b7895 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -153,6 +153,40 @@ line argument: ``` +### Block Blob with Compaction Support and Configuration + +Block blobs are the default kind of blob and are good for most big-data use +cases. However, block blobs have strict limit of 50,000 blocks per blob. +To prevent reaching the limit WASB, by default, does not upload new block to +the service after every `hflush()` or `hsync()`. + +For most of the cases, combining data from multiple `write()` calls in +blocks of 4Mb is a good optimization. But, in others cases, like HBase log files, +every call to `hflush()` or `hsync()` must upload the data to the service. + +Block blobs with compaction upload the data to the cloud service after every +`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush() +`/`hsync()` runs once compaction process, if number of blocks in the blob +is above 32,000. + +Block compaction search and replaces a sequence of small blocks with one big +block. That means there is associated cost with block compaction: reading +small blocks back to the client and writing it again as one big block. + +In order to have the files you create be block blobs with block compaction +enabled, the client must set the configuration variable +`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of +folder names. + +For example: + +```xml + + fs.azure.block.blob.with.compaction.dir + /hbase/WALs,/data/myblobfiles + +``` + ### Page Blob Support and Configuration The Azure Blob Storage interface for Hadoop supports two kinds of blobs, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index 4f26d9f3352..e0ae7b4f195 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -551,7 +551,8 @@ public class MockStorageInterface extends StorageInterface { throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests"); } @Override - public void uploadBlock(String blockId, InputStream sourceStream, + public void uploadBlock(String blockId, AccessCondition accessCondition, + InputStream sourceStream, long length, BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { throw new UnsupportedOperationException("uploadBlock not used in Mock Tests"); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java index 7ea753486e6..a10a3666303 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java @@ -107,7 +107,8 @@ public class TestAzureConcurrentOutOfBandIo { // outputStream = writerStorageAccount.getStore().storefile( key, - new PermissionStatus("", "", FsPermission.getDefault())); + new PermissionStatus("", "", FsPermission.getDefault()), + key); Arrays.fill(dataBlockWrite, (byte) (i % 256)); for (int j = 0; j < NUMBER_OF_BLOCKS; j++) { @@ -141,7 +142,8 @@ public class TestAzureConcurrentOutOfBandIo { // reading. This eliminates the race between the reader and writer threads. OutputStream outputStream = testAccount.getStore().storefile( "WASB_String.txt", - new PermissionStatus("", "", FsPermission.getDefault())); + new PermissionStatus("", "", FsPermission.getDefault()), + "WASB_String.txt"); Arrays.fill(dataBlockWrite, (byte) 255); for (int i = 0; i < NUMBER_OF_BLOCKS; i++) { outputStream.write(dataBlockWrite); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java new file mode 100644 index 00000000000..820ce4f2400 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemBlockCompaction.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import com.microsoft.azure.storage.blob.BlockEntry; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.List; + +/** + * Test class that runs WASB block compaction process for block blobs. + */ + +public class TestNativeAzureFileSystemBlockCompaction extends AbstractWasbTestBase { + + private static final String TEST_FILE = "/user/active/test.dat"; + private static final Path TEST_PATH = new Path(TEST_FILE); + + private static final String TEST_FILE_NORMAL = "/user/normal/test.dat"; + private static final Path TEST_PATH_NORMAL = new Path(TEST_FILE_NORMAL); + + private AzureBlobStorageTestAccount testAccount = null; + + @Before + public void setUp() throws Exception { + super.setUp(); + testAccount = createTestAccount(); + fs = testAccount.getFileSystem(); + Configuration conf = fs.getConf(); + conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true); + conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES, "/user/active"); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + } + + /* + * Helper method that creates test data of size provided by the + * "size" parameter. + */ + private static byte[] getTestData(int size) { + byte[] testData = new byte[size]; + System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size); + return testData; + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + private BlockBlobAppendStream getBlockBlobAppendStream(FSDataOutputStream appendStream) { + SyncableDataOutputStream dataOutputStream = null; + + if (appendStream.getWrappedStream() instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) { + NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream = + (NativeAzureFileSystem.NativeAzureFsOutputStream) appendStream.getWrappedStream(); + + dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream(); + } + + if (appendStream.getWrappedStream() instanceof SyncableDataOutputStream) { + dataOutputStream = (SyncableDataOutputStream) appendStream.getWrappedStream(); + } + + Assert.assertNotNull("Did not recognize " + dataOutputStream, + dataOutputStream); + + return (BlockBlobAppendStream) dataOutputStream.getOutStream(); + } + + private void verifyBlockList(BlockBlobAppendStream blockBlobStream, + int[] testData) throws Throwable { + List blockList = blockBlobStream.getBlockList(); + Assert.assertEquals("Block list length", testData.length, blockList.size()); + + int i = 0; + for (BlockEntry block: blockList) { + Assert.assertTrue(block.getSize() == testData[i++]); + } + } + + private void appendBlockList(FSDataOutputStream fsStream, + ByteArrayOutputStream memStream, + int[] testData) throws Throwable { + + for (int d: testData) { + byte[] data = getTestData(d); + memStream.write(data); + fsStream.write(data); + } + fsStream.hflush(); + } + + @Test + public void testCompactionDisabled() throws Throwable { + + try (FSDataOutputStream appendStream = fs.create(TEST_PATH_NORMAL)) { + + // testing new file + + SyncableDataOutputStream dataOutputStream = null; + + OutputStream wrappedStream = appendStream.getWrappedStream(); + if (wrappedStream instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) { + NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream = + (NativeAzureFileSystem.NativeAzureFsOutputStream) wrappedStream; + + dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream(); + } else if (wrappedStream instanceof SyncableDataOutputStream) { + dataOutputStream = (SyncableDataOutputStream) wrappedStream; + } else { + Assert.fail("Unable to determine type of " + wrappedStream + + " class of " + wrappedStream.getClass()); + } + + Assert.assertFalse("Data output stream is a BlockBlobAppendStream: " + + dataOutputStream, + dataOutputStream.getOutStream() instanceof BlockBlobAppendStream); + + } + } + + @Test + public void testCompaction() throws Throwable { + + final int n2 = 2; + final int n4 = 4; + final int n10 = 10; + final int n12 = 12; + final int n14 = 14; + final int n16 = 16; + + final int maxBlockSize = 16; + final int compactionBlockCount = 4; + + ByteArrayOutputStream memStream = new ByteArrayOutputStream(); + + try (FSDataOutputStream appendStream = fs.create(TEST_PATH)) { + + // test new file + + BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream); + blockBlobStream.setMaxBlockSize(maxBlockSize); + blockBlobStream.setCompactionBlockCount(compactionBlockCount); + + appendBlockList(appendStream, memStream, new int[]{n2}); + verifyBlockList(blockBlobStream, new int[]{n2}); + + appendStream.hflush(); + verifyBlockList(blockBlobStream, new int[]{n2}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n2, n4}); + + appendStream.hsync(); + verifyBlockList(blockBlobStream, new int[]{n2, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n2, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n2, n4, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n14, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n14, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n14, n4, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n2, n4, n4}); + verifyBlockList(blockBlobStream, new int[]{n14, n12, n10}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, new int[]{n14, n12, n10, n4}); + + appendBlockList(appendStream, memStream, + new int[]{n4, n4, n4, n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16}); + + appendBlockList(appendStream, memStream, + new int[]{n4, n4, n4, n4, n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n4}); + + appendBlockList(appendStream, memStream, + new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n4, n4}); + + appendBlockList(appendStream, memStream, + new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n4, n4, n4}); + + appendBlockList(appendStream, memStream, + new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n4, n4, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + + appendStream.close(); + + ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray()); + } + + try (FSDataOutputStream appendStream = fs.append(TEST_PATH)) { + + // test existing file + + BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream); + blockBlobStream.setMaxBlockSize(maxBlockSize); + blockBlobStream.setCompactionBlockCount(compactionBlockCount); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n16, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4, n4}); + + appendBlockList(appendStream, memStream, new int[]{n4}); + verifyBlockList(blockBlobStream, + new int[]{n14, n12, n14, n16, n16, n16, n16, n4}); + + appendStream.close(); + + ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray()); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index 73ee3f9c6ce..a5e0c4f94e8 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -23,3 +23,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG +log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG