diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3e798787a34..fb66b9f59a5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -99,6 +99,8 @@ Release 2.8.0 - UNRELEASED HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common. (Larry McCay via cnauroth) + HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth) + IMPROVEMENTS HADOOP-12458. Retries is typoed to spell Retires in parts of diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index a936cd64480..00979123d79 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -33,13 +33,11 @@ import java.net.URLEncoder; import java.security.InvalidKeyException; import java.util.ArrayList; import java.util.Calendar; -import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils; import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.annotations.VisibleForTesting; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.OperationContext; @@ -2681,4 +2680,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { close(); super.finalize(); } + + @Override + public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException { + + try { + + if (isPageBlobKey(key)) { + throw new UnsupportedOperationException("Append not supported for Page Blobs"); + } + + CloudBlobWrapper blob = this.container.getBlockBlobReference(key); + + BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext()); + appendStream.initialize(); + + return new DataOutputStream(appendStream); + } catch(Exception ex) { + throw new AzureException(ex); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java new file mode 100644 index 00000000000..d1ec8dff883 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -0,0 +1,775 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Locale; +import java.util.List; +import java.util.Random; +import java.util.TimeZone; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; +import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; + +/** + * Stream object that implememnts append for Block Blobs in WASB. + */ +public class BlockBlobAppendStream extends OutputStream { + + private final String key; + private final int bufferSize; + private ByteArrayOutputStream outBuffer; + private final CloudBlockBlobWrapper blob; + private final OperationContext opContext; + + /** + * Variable to track if the stream has been closed. + */ + private boolean closed = false; + + /** + * Variable to track if the append lease is released. + */ + + private volatile boolean leaseFreed; + + /** + * Variable to track if the append stream has been + * initialized. + */ + + private boolean initialized = false; + + /** + * Last IOException encountered + */ + private volatile IOException lastError = null; + + /** + * List to keep track of the uncommitted azure storage + * block ids + */ + private final List uncommittedBlockEntries; + + private static final int UNSET_BLOCKS_COUNT = -1; + + /** + * Variable to hold the next block id to be used for azure + * storage blocks. + */ + private long nextBlockCount = UNSET_BLOCKS_COUNT; + + private final Random sequenceGenerator = new Random(); + + /** + * Time to wait to renew lease in milliseconds + */ + private static final int LEASE_RENEWAL_PERIOD = 10000; + + /** + * Number of times to retry for lease renewal + */ + private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3; + + /** + * Time to wait before retrying to set the lease + */ + private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500; + + /** + * Metadata key used on the blob to indicate append lease is active + */ + public static final String APPEND_LEASE = "append_lease"; + + /** + * Timeout value for the append lease in millisecs. If the lease is not + * renewed within 30 seconds then another thread can acquire the append lease + * on the blob + */ + public static final int APPEND_LEASE_TIMEOUT = 30000; + + /** + * Metdata key used on the blob to indicate last modified time of append lease + */ + public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified"; + + /** + * Number of times block upload needs is retried. + */ + private static final int MAX_BLOCK_UPLOAD_RETRIES = 3; + + /** + * Wait time between block upload retries in millisecs. + */ + private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000; + + private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class); + + private static final int MAX_BLOCK_COUNT = 100000; + + private ThreadPoolExecutor ioThreadPool; + + /** + * Atomic integer to provide thread id for thread names for uploader threads. + */ + private final AtomicInteger threadSequenceNumber; + + /** + * Prefix to be used for thread names for uploader threads. + */ + private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream"; + + private static final String UTC_STR = "UTC"; + + public BlockBlobAppendStream(final CloudBlockBlobWrapper blob, + final String aKey, final int bufferSize, final OperationContext opContext) + throws IOException { + + if (null == aKey || 0 == aKey.length()) { + throw new IllegalArgumentException( + "Illegal argument: The key string is null or empty"); + } + + if (0 >= bufferSize) { + throw new IllegalArgumentException( + "Illegal argument bufferSize cannot be zero or negative"); + } + + + this.blob = blob; + this.opContext = opContext; + this.key = aKey; + this.bufferSize = bufferSize; + this.threadSequenceNumber = new AtomicInteger(0); + setBlocksCount(); + + this.outBuffer = new ByteArrayOutputStream(bufferSize); + this.uncommittedBlockEntries = new ArrayList(); + + // Acquire append lease on the blob. + try { + //Set the append lease if the value of the append lease is false + if (!updateBlobAppendMetadata(true, false)) { + LOG.error("Unable to set Append Lease on the Blob : {} " + + "Possibly because another client already has a create or append stream open on the Blob", key); + throw new IOException("Unable to set Append lease on the Blob. " + + "Possibly because another client already had an append stream open on the Blob."); + } + } catch (StorageException ex) { + LOG.error("Encountered Storage exception while acquiring append " + + "lease on blob : {}. Storage Exception : {} ErrorCode : {}", + key, ex, ex.getErrorCode()); + + throw new IOException(ex); + } + + leaseFreed = false; + } + + /** + * Helper method that starts an Append Lease renewer thread and the + * thread pool. + */ + public synchronized void initialize() { + + if (initialized) { + return; + } + /* + * Start the thread for Append lease renewer. + */ + Thread appendLeaseRenewer = new Thread(new AppendRenewer()); + appendLeaseRenewer.setDaemon(true); + appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key)); + appendLeaseRenewer.start(); + + /* + * Parameters to ThreadPoolExecutor: + * corePoolSize : the number of threads to keep in the pool, even if they are idle, + * unless allowCoreThreadTimeOut is set + * maximumPoolSize : the maximum number of threads to allow in the pool + * keepAliveTime - when the number of threads is greater than the core, + * this is the maximum time that excess idle threads will + * wait for new tasks before terminating. + * unit - the time unit for the keepAliveTime argument + * workQueue - the queue to use for holding tasks before they are executed + * This queue will hold only the Runnable tasks submitted by the execute method. + */ + this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new UploaderThreadFactory()); + + initialized = true; + } + + /** + * Get the blob name. + * + * @return String Blob name. + */ + public String getKey() { + return key; + } + + /** + * Get the backing blob. + * @return buffer size of the stream. + */ + public int getBufferSize() { + return bufferSize; + } + + /** + * Writes the specified byte to this output stream. The general contract for + * write is that one byte is written to the output stream. The byte to be + * written is the eight low-order bits of the argument b. The 24 high-order + * bits of b are ignored. + * + * @param byteVal + * the byteValue to write. + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final int byteVal) throws IOException { + write(new byte[] { (byte) (byteVal & 0xFF) }); + } + + /** + * Writes b.length bytes from the specified byte array to this output stream. + * + * @param data + * the byte array to write. + * + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final byte[] data) throws IOException { + write(data, 0, data.length); + } + + /** + * Writes length bytes from the specified byte array starting at offset to + * this output stream. + * + * @param data + * the byte array to write. + * @param offset + * the start offset in the data. + * @param length + * the number of bytes to write. + * @throws IOException + * if an I/O error occurs. In particular, an IOException may be + * thrown if the output stream has been closed. + */ + @Override + public void write(final byte[] data, final int offset, final int length) + throws IOException { + + if (offset < 0 || length < 0 || length > data.length - offset) { + throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments"); + } + + writeInternal(data, offset, length); + } + + @Override + public synchronized void close() throws IOException { + + if (!initialized) { + throw new IOException("Trying to close an uninitialized Append stream"); + } + + if (closed) { + return; + } + + if (leaseFreed) { + throw new IOException(String.format("Attempting to close an append stream on blob : %s " + + " that does not have lease on the Blob. Failing close", key)); + } + + if (outBuffer.size() > 0) { + uploadBlockToStorage(outBuffer.toByteArray()); + } + + ioThreadPool.shutdown(); + + try { + if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { + LOG.error("Time out occured while waiting for IO request to finish in append" + + " for blob : {}", key); + NativeAzureFileSystemHelper.logAllLiveStackTraces(); + throw new IOException("Timed out waiting for IO requests to finish"); + } + } catch(InterruptedException intrEx) { + + // Restore the interrupted status + Thread.currentThread().interrupt(); + LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key); + throw new IOException("Append Commit interrupted."); + } + + // Calling commit after all blocks are succesfully uploaded. + if (lastError == null) { + commitAppendBlocks(); + } + + // Perform cleanup. + cleanup(); + + if (lastError != null) { + throw lastError; + } + } + + /** + * Helper method that cleans up the append stream. + */ + private synchronized void cleanup() { + + closed = true; + + try { + // Set the value of append lease to false if the value is set to true. + updateBlobAppendMetadata(false, true); + } catch(StorageException ex) { + LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} " + + "Error Code : {}", + key, ex, ex.getErrorCode()); + lastError = new IOException(ex); + } + + leaseFreed = true; + } + + /** + * Method to commit all the uncommited blocks to azure storage. + * If the commit fails then blocks are automatically cleaned up + * by Azure storage. + * @throws IOException + */ + private synchronized void commitAppendBlocks() throws IOException { + + SelfRenewingLease lease = null; + + try { + if (uncommittedBlockEntries.size() > 0) { + + //Acquiring lease on the blob. + lease = new SelfRenewingLease(blob); + + // Downloading existing blocks + List blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED, + new BlobRequestOptions(), opContext); + + // Adding uncommitted blocks. + blockEntries.addAll(uncommittedBlockEntries); + + AccessCondition accessCondition = new AccessCondition(); + accessCondition.setLeaseID(lease.getLeaseID()); + blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext); + uncommittedBlockEntries.clear(); + } + } catch(StorageException ex) { + LOG.error("Storage exception encountered during block commit phase of append for blob" + + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode()); + throw new IOException("Encountered Exception while committing append blocks", ex); + } finally { + if (lease != null) { + try { + lease.free(); + } catch(StorageException ex) { + LOG.debug("Exception encountered while releasing lease for " + + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode()); + // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object. + } + } + } + } + + /** + * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure + * storage SDK. + */ + private void setBlocksCount() throws IOException { + try { + + if (nextBlockCount == UNSET_BLOCKS_COUNT) { + + nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE)) + + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT); + + List blockEntries = + blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext); + + nextBlockCount += blockEntries.size(); + + } + } catch (StorageException ex) { + LOG.debug("Encountered storage exception during setting next Block Count." + + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode()); + throw new IOException(ex); + } + } + + /** + * Helper method that generates the next block id for uploading a block to azure storage. + * @return String representing the block ID generated. + * @throws IOException + */ + private String generateBlockId() throws IOException { + + if (nextBlockCount == UNSET_BLOCKS_COUNT) { + throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly"); + } + + byte[] blockIdInBytes = getBytesFromLong(nextBlockCount); + return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8); + } + + /** + * Returns a byte array that represents the data of a long value. This + * utility method is copied from com.microsoft.azure.storage.core.Utility class. + * This class is marked as internal, hence we clone the method here and not express + * dependency on the Utility Class + * + * @param value + * The value from which the byte array will be returned. + * + * @return A byte array that represents the data of the specified long value. + */ + private static byte[] getBytesFromLong(final long value) { + final byte[] tempArray = new byte[8]; + + for (int m = 0; m < 8; m++) { + tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF); + } + + return tempArray; + } + /** + * Helper method that creates a thread to upload a block to azure storage. + * @param payload + * @throws IOException + */ + private synchronized void uploadBlockToStorage(byte[] payload) throws IOException { + + // upload payload to azure storage + nextBlockCount++; + String blockId = generateBlockId(); + // Since uploads of the Azure storage are done in parallel threads, we go ahead + // add the blockId in the uncommitted list. If the upload of the block fails + // we don't commit the blockIds. + uncommittedBlockEntries.add(new BlockEntry(blockId)); + ioThreadPool.execute(new WriteRequest(payload, blockId)); + } + + + /** + * Helper method to updated the Blob metadata during Append lease operations. + * Blob metadata is updated to holdLease value only if the current lease + * status is equal to testCondition and the last update on the blob metadata + * is less that 30 secs old. + * @param holdLease + * @param testCondition + * @return true if the updated lease operation was successful or false otherwise + * @throws StorageException + */ + private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition) + throws StorageException { + + SelfRenewingLease lease = null; + StorageException lastStorageException = null; + int leaseRenewalRetryCount = 0; + + /* + * Updating the Blob metadata honours following algorithm based on + * 1) If the append lease metadata is present + * 2) Last updated time of the append lease + * 3) Previous value of the Append lease metadata. + * + * The algorithm: + * 1) If append lease metadata is not part of the Blob. In this case + * this is the first client to Append so we update the metadata. + * 2) If append lease metadata is present and timeout has occurred. + * In this case irrespective of what the value of the append lease is we update the metadata. + * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter) + * and timeout has not occurred, we update the metadata. + * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter) + * and timeout has not occurred, we do not update metadata and return false. + * + */ + while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) { + + lastStorageException = null; + + synchronized(this) { + try { + + final Calendar currentCalendar = Calendar + .getInstance(Locale.US); + currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR)); + long currentTime = currentCalendar.getTime().getTime(); + + // Acquire lease on the blob. + lease = new SelfRenewingLease(blob); + + blob.downloadAttributes(opContext); + HashMap metadata = blob.getMetadata(); + + if (metadata.containsKey(APPEND_LEASE) + && currentTime - Long.parseLong( + metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT + && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) { + return false; + } + + metadata.put(APPEND_LEASE, Boolean.toString(holdLease)); + metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime)); + blob.setMetadata(metadata); + AccessCondition accessCondition = new AccessCondition(); + accessCondition.setLeaseID(lease.getLeaseID()); + blob.uploadMetadata(accessCondition, null, opContext); + return true; + + } catch (StorageException ex) { + + lastStorageException = ex; + LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} " + + "Error Code : {}", + key, ex, ex.getErrorCode()); + leaseRenewalRetryCount++; + + } finally { + + if (lease != null) { + try { + lease.free(); + } catch(StorageException ex) { + LOG.debug("Encountered Storage exception while releasing lease for Blob {} " + + "during Append metadata operation. Storage Exception {} " + + "Error Code : {} ", key, ex, ex.getErrorCode()); + } finally { + lease = null; + } + } + } + } + + if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) { + throw lastStorageException; + } else { + try { + Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD); + } catch(InterruptedException ex) { + LOG.debug("Blob append metadata updated method interrupted"); + Thread.currentThread().interrupt(); + } + } + } + + // The code should not enter here because the while loop will + // always be executed and if the while loop is executed we + // would returning from the while loop. + return false; + } + + /** + * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer. + * @param data + * @param offset + * @param length + * @throws IOException + */ + private synchronized void writeInternal(final byte[] data, final int offset, final int length) + throws IOException { + + if (!initialized) { + throw new IOException("Trying to write to an un-initialized Append stream"); + } + + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + + if (leaseFreed) { + throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write")); + } + + byte[] currentData = new byte[length]; + System.arraycopy(data, offset, currentData, 0, length); + + // check to see if the data to be appended exceeds the + // buffer size. If so we upload a block to azure storage. + while ((outBuffer.size() + currentData.length) > bufferSize) { + + byte[] payload = new byte[bufferSize]; + + // Add data from the existing buffer + System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size()); + + // Updating the available size in the payload + int availableSpaceInPayload = bufferSize - outBuffer.size(); + + // Adding data from the current call + System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload); + + uploadBlockToStorage(payload); + + // updating the currentData buffer + byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload]; + System.arraycopy(currentData, availableSpaceInPayload, + tempBuffer, 0, currentData.length - availableSpaceInPayload); + currentData = tempBuffer; + outBuffer = new ByteArrayOutputStream(bufferSize); + } + + outBuffer.write(currentData); + } + + /** + * Runnable instance that uploads the block of data to azure storage. + * + * + */ + private class WriteRequest implements Runnable { + private final byte[] dataPayload; + private final String blockId; + + public WriteRequest(byte[] dataPayload, String blockId) { + this.dataPayload = dataPayload; + this.blockId = blockId; + } + + @Override + public void run() { + + int uploadRetryAttempts = 0; + IOException lastLocalException = null; + while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) { + try { + + blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload), + dataPayload.length, new BlobRequestOptions(), opContext); + break; + } catch(Exception ioe) { + Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe); + uploadRetryAttempts++; + lastLocalException = new IOException("Encountered Exception while uploading block", ioe); + try { + Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL); + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + + if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) { + lastError = lastLocalException; + } + } + } + + /** + * A ThreadFactory that creates uploader thread with + * meaningful names helpful for debugging purposes. + */ + class UploaderThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key, + threadSequenceNumber.getAndIncrement())); + return t; + } + } + + /** + * A deamon thread that renews the Append lease on the blob. + * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing + * the lease. If an error is encountered while renewing the lease + * then an lease is released by this thread, which fails all other + * operations. + */ + private class AppendRenewer implements Runnable { + + @Override + public void run() { + + while (!leaseFreed) { + + try { + Thread.sleep(LEASE_RENEWAL_PERIOD); + } catch (InterruptedException ie) { + LOG.debug("Appender Renewer thread interrupted"); + Thread.currentThread().interrupt(); + } + + Log.debug("Attempting to renew append lease on {}", key); + + try { + if (!leaseFreed) { + // Update the blob metadata to renew the append lease + if (!updateBlobAppendMetadata(true, true)) { + LOG.error("Unable to re-acquire append lease on the Blob {} ", key); + leaseFreed = true; + } + } + } catch (StorageException ex) { + + LOG.debug("Lease renewal for Blob : {} encountered " + + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode()); + + // We swallow the exception here because if the blob metadata is not updated for + // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and + // continue forward if it needs to append. + leaseFreed = true; + } + } + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index d2ff7057d45..ed651846252 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azure; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -31,7 +32,6 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.EnumSet; -import java.util.Iterator; import java.util.Set; import java.util.TimeZone; import java.util.TreeSet; @@ -41,7 +41,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.azure.AzureException; -import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.microsoft.azure.storage.AccessCondition; -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.StorageErrorCode; import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.StorageErrorCodeStrings; + import org.apache.hadoop.io.IOUtils; @@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem { throw new IOException("Unable to write RenamePending file for folder rename from " + srcKey + " to " + dstKey, e); } finally { - NativeAzureFileSystem.cleanup(LOG, output); + NativeAzureFileSystemHelper.cleanup(LOG, output); } } @@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem { public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics"; + /* + * Property to enable Append API. + */ + public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support"; + private class NativeAzureFsInputStream extends FSInputStream { private InputStream in; private final String key; @@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem { return result; } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException) { @@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem { + " Exception details: {} Error Code : {}", key, e, ((StorageException) innerException).getErrorCode()); - if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } } @@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem { return result; } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException) { @@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem { + " Exception details: {} Error Code : {}", key, e, ((StorageException) innerException).getErrorCode()); - if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } } @@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem { this.pos); } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem { private static boolean suppressRetryPolicy = false; // A counter to create unique (within-process) names for my metrics sources. private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); - + private boolean appendSupportEnabled = false; public NativeAzureFileSystem() { // set store in initialize() @@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem { this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); - + this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = {}", conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); @@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem { @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { - throw new IOException("Not supported"); + + if (!appendSupportEnabled) { + throw new UnsupportedOperationException("Append Support not enabled"); + } + + LOG.debug("Opening file: {} for append", f); + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + FileMetadata meta = null; + try { + meta = store.retrieveMetadata(key); + } catch(Exception ex) { + + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); + + if (innerException instanceof StorageException + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { + + throw new FileNotFoundException(String.format("%s is not found", key)); + } else { + throw ex; + } + } + + if (meta == null) { + throw new FileNotFoundException(f.toString()); + } + + if (meta.isDir()) { + throw new FileNotFoundException(f.toString() + + " is a directory not a file."); + } + + if (store.isPageBlobKey(key)) { + throw new IOException("Append not supported for Page Blobs"); + } + + DataOutputStream appendStream = null; + + try { + appendStream = store.retrieveAppendStream(key, bufferSize); + } catch (Exception ex) { + + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); + + if (innerException instanceof StorageException + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { + throw new FileNotFoundException(String.format("%s is not found", key)); + } else { + throw ex; + } + } + + return new FSDataOutputStream(appendStream, statistics); } @Override @@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem { lease.free(); } } catch (Exception e) { - NativeAzureFileSystem.cleanup(LOG, out); + NativeAzureFileSystemHelper.cleanup(LOG, out); String msg = "Unable to free lease on " + parent.toUri(); LOG.error(msg); throw new IOException(msg, e); @@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem { metaFile = store.retrieveMetadata(key); } catch (IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { return false; } @@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem { parentMetadata = store.retrieveMetadata(parentKey); } catch (IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException) { // Invalid State. @@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem { // if the file not present. But not retrieving metadata here is an // unrecoverable state and can only happen if there is a race condition // hence throwing a IOException - if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new IOException("File " + f + " has a parent directory " + parentPath + " whose metadata cannot be retrieved. Can't resolve"); } @@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem { instrumentation.fileDeleted(); } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { return false; } @@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem { parentMetadata = store.retrieveMetadata(parentKey); } catch (IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException) { // Invalid State. @@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem { // if the file not present. But not retrieving metadata here is an // unrecoverable state and can only happen if there is a race condition // hence throwing a IOException - if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new IOException("File " + f + " has a parent directory " + parentPath + " whose metadata cannot be retrieved. Can't resolve"); } @@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem { priorLastKey); } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { return false; } @@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem { instrumentation.fileDeleted(); } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { return false; } @@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem { store.delete(key); } catch(IOException e) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { return false; } @@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem { meta = store.retrieveMetadata(key); } catch(Exception ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem { meta = store.retrieveMetadata(key); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", f)); } @@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem { listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem { try { listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem { meta = store.retrieveMetadata(key); } catch(Exception ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem { try { inputStream = store.retrieve(key); } catch(Exception ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } @@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem { dstMetadata = store.retrieveMetadata(dstKey); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); // A BlobNotFound storage exception in only thrown from retrieveMetdata API when // there is a race condition. If there is another thread which deletes the destination // file or folder, then this thread calling rename should be able to continue with // rename gracefully. Hence the StorageException is swallowed here. if (innerException instanceof StorageException) { - if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { LOG.debug("BlobNotFound exception encountered for Destination key : {}. " + "Swallowin the exception to handle race condition gracefully", dstKey); } @@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem { parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent())); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst); return false; @@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem { try { srcMetadata = store.retrieveMetadata(srcKey); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { LOG.debug("Source {} doesn't exists. Failing rename", src); return false; @@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem { store.rename(srcKey, dstKey); } catch(IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { LOG.debug("BlobNotFoundException encountered. Failing rename", src); return false; @@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem { try { metadata = store.retrieveMetadata(key); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); } @@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem { try { metadata = store.retrieveMetadata(key); } catch (IOException ex) { - Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex); + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); if (innerException instanceof StorageException - && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) { + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); } @@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem { // Return to the caller with the randomized key. return randomizedKey; } - - private static void cleanup(Logger log, java.io.Closeable closeable) { - if (closeable != null) { - try { - closeable.close(); - } catch(IOException e) { - if (log != null) { - log.debug("Exception in closing {}", closeable, e); - } - } - } - } - - /* - * Helper method to recursively check if the cause of the exception is - * a Azure storage exception. - */ - private static Throwable checkForAzureStorageException(Exception e) { - - Throwable innerException = e.getCause(); - - while (innerException != null - && !(innerException instanceof StorageException)) { - innerException = innerException.getCause(); - } - - return innerException; - } - - /* - * Helper method to check if the AzureStorageException is - * because backing blob was not found. - */ - private static boolean isFileNotFoundException(StorageException e) { - - String errorCode = ((StorageException) e).getErrorCode(); - if (errorCode != null - && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND) - || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND) - || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString()) - || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) { - - return true; - } - - return false; - } - } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java new file mode 100644 index 00000000000..40efdc60cdd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.microsoft.azure.storage.StorageErrorCode; +import com.microsoft.azure.storage.StorageErrorCodeStrings; +import com.microsoft.azure.storage.StorageException; + +import org.apache.hadoop.classification.InterfaceAudience; +/** + * Utility class that has helper methods. + * + */ + +@InterfaceAudience.Private +final class NativeAzureFileSystemHelper { + + private NativeAzureFileSystemHelper() { + // Hiding the cosnstructor as this is a utility class. + } + + private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class); + + public static void cleanup(Logger log, java.io.Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch(IOException e) { + if (log != null) { + log.debug("Exception in closing {}", closeable, e); + } + } + } + } + + /* + * Helper method to recursively check if the cause of the exception is + * a Azure storage exception. + */ + public static Throwable checkForAzureStorageException(Exception e) { + + Throwable innerException = e.getCause(); + + while (innerException != null + && !(innerException instanceof StorageException)) { + + innerException = innerException.getCause(); + } + + return innerException; + } + + /* + * Helper method to check if the AzureStorageException is + * because backing blob was not found. + */ + public static boolean isFileNotFoundException(StorageException e) { + + String errorCode = e.getErrorCode(); + if (errorCode != null + && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND) + || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND) + || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString()) + || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) { + + return true; + } + + return false; + } + + /* + * Helper method that logs stack traces from all live threads. + */ + public static void logAllLiveStackTraces() { + + for (Map.Entry entry : Thread.getAllStackTraces().entrySet()) { + LOG.debug("Thread " + entry.getKey().getName()); + StackTraceElement[] trace = entry.getValue(); + for (int j = 0; j < trace.length; j++) { + LOG.debug("\tat " + trace[j]); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 0229cb72007..f052b7f0670 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -107,4 +107,6 @@ interface NativeFileSystemStore { void delete(String key, SelfRenewingLease lease) throws IOException; SelfRenewingLease acquireLease(String key) throws AzureException; + + DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java index 868937541c8..b2b34f8a87e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java @@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { LOG.debug(ioThreadPool.toString()); if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) { LOG.debug("Timed out after 10 minutes waiting for IO requests to finish"); - logAllStackTraces(); + NativeAzureFileSystemHelper.logAllLiveStackTraces(); LOG.debug(ioThreadPool.toString()); throw new IOException("Timed out waiting for IO requests to finish"); } @@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable { closed = true; } - // Log the stacks of all threads. - private void logAllStackTraces() { - Map liveThreads = Thread.getAllStackTraces(); - for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) { - Thread key = (Thread) i.next(); - LOG.debug("Thread " + key.getName()); - StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key); - for (int j = 0; j < trace.length; j++) { - LOG.debug("\tat " + trace[j]); - } - } - } + /** * A single write request for data to write to Azure storage. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index ce5f749a0d0..c2169a475b5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -24,11 +24,13 @@ import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.List; import java.util.EnumSet; import java.util.HashMap; import org.apache.hadoop.classification.InterfaceAudience; +import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryPolicyFactory; @@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CopyState; @@ -269,13 +273,13 @@ abstract class StorageInterface { /** * Uploads the container's metadata using the specified operation context. - * + * * @param opContext * An {@link OperationContext} object that represents the context * for the current operation. This object is used to track requests * to the storage service, and to provide additional runtime * information about the operation. - * + * * @throws StorageException * If a storage service error occurred. */ @@ -545,6 +549,30 @@ abstract class StorageInterface { void uploadMetadata(OperationContext opContext) throws StorageException; + /** + * Uploads the blob's metadata to the storage service using the specified + * lease ID, request options, and operation context. + * + * @param accessCondition + * A {@link AccessCondition} object that represents the access conditions for the blob. + * + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). + * + * @param opContext + * An {@link OperationContext} object that represents the context + * for the current operation. This object is used to track requests + * to the storage service, and to provide additional runtime + * information about the operation. + * + * @throws StorageException + * If a storage service error occurred. + */ + void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options, + OperationContext opContext) throws StorageException; + void uploadProperties(OperationContext opContext, SelfRenewingLease lease) throws StorageException; @@ -602,6 +630,63 @@ abstract class StorageInterface { OutputStream openOutputStream( BlobRequestOptions options, OperationContext opContext) throws StorageException; + + /** + * + * @param filter A {@link BlockListingFilter} value that specifies whether to download + * committed blocks, uncommitted blocks, or all blocks. + * @param options A {@link BlobRequestOptions} object that specifies any additional options for + * the request. Specifying null will use the default request options from + * the associated service client ( CloudBlobClient). + * @param opContext An {@link OperationContext} object that represents the context for the current + * operation. This object is used to track requests to the storage service, + * and to provide additional runtime information about the operation. + * @return An ArrayList object of {@link BlockEntry} objects that represent the list + * block items downloaded from the block blob. + * @throws IOException If an I/O error occurred. + * @throws StorageException If a storage service error occurred. + */ + List downloadBlockList(BlockListingFilter filter, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException; + + /** + * + * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob + * the length of all Block IDs must be identical. + * @param sourceStream An {@link InputStream} object that represents the input stream to write to the + * block blob. + * @param length A long which represents the length, in bytes, of the stream data, + * or -1 if unknown. + * @param options A {@link BlobRequestOptions} object that specifies any additional options for the + * request. Specifying null will use the default request options from the + * associated service client ( CloudBlobClient). + * @param opContext An {@link OperationContext} object that represents the context for the current operation. + * This object is used to track requests to the storage service, and to provide + * additional runtime information about the operation. + * @throws IOException If an I/O error occurred. + * @throws StorageException If a storage service error occurred. + */ + void uploadBlock(String blockId, InputStream sourceStream, + long length, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException; + + /** + * + * @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list + * block items being committed. The size field is ignored. + * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob. + * @param options A {@link BlobRequestOptions} object that specifies any additional options for the + * request. Specifying null will use the default request options from the associated + * service client ( CloudBlobClient). + * @param opContext An {@link OperationContext} object that represents the context for the current operation. + * This object is used to track requests to the storage service, and to provide additional + * runtime information about the operation. + * @throws IOException If an I/O error occurred. + * @throws StorageException If a storage service error occurred. + */ + void commitBlockList(List blockList, AccessCondition accessCondition, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException; + } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 382ff663456..298f3aaa425 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; - +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import com.microsoft.azure.storage.AccessCondition; @@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface { @Override public void uploadMetadata(OperationContext opContext) throws StorageException { - getBlob().uploadMetadata(null, null, opContext); + uploadMetadata(null, null, opContext); + } + + @Override + public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options, + OperationContext opContext) throws StorageException{ + getBlob().uploadMetadata(accessConditions, options, opContext); } public void uploadProperties(OperationContext opContext, SelfRenewingLease lease) @@ -396,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface { public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, OperationContext opContext) throws StorageException, URISyntaxException { - getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob, + getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob, null, null, options, opContext); } @@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface { getBlob().uploadProperties(null, null, opContext); } + @Override + public List downloadBlockList(BlockListingFilter filter, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException { + return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext); + + } + + @Override + public void uploadBlock(String blockId, InputStream sourceStream, + long length, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException { + ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext); + } + + @Override + public void commitBlockList(List blockList, AccessCondition accessCondition, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException { + ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext); + } } static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper { diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 9d0115afe09..4402467b72a 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -23,6 +23,7 @@ * [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration) * [Atomic Folder Rename](#Atomic_Folder_Rename) * [Accessing wasb URLs](#Accessing_wasb_URLs) + * [Append API Support and Configuration](#Append_API_Support_and_Configuration) * [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module) ## Introduction @@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the ## Limitations -* The append operation is not implemented. * File owner and group are persisted, but the permissions model is not enforced. Authorization occurs at the level of the entire Azure Blob Storage account. * File last access time is not tracked. @@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL. This causes all bare paths, such as `/testDir/testFile` to resolve automatically to that file system. +### Append API Support and Configuration + +The Azure Blob Storage interface for Hadoop has optional support for Append API for +single writer by setting the configuration `fs.azure.enable.append.support` to true. + +For Example: + + + fs.azure.enable.append.support + true + + +It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append +support does not enforce single writer internally but requires applications to guarantee this semantic. +It becomes a responsibility of the application either to ensure single-threaded handling for a particular +file path, or rely on some external locking mechanism of its own. Failure to do so will result in +unexpected behavior. + ## Testing the hadoop-azure Module The hadoop-azure module includes a full suite of unit tests. Most of the tests diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index 9f84f4b39f0..2bb2a9ac75f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -32,11 +32,12 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.TimeZone; - +import java.util.List; import org.apache.commons.httpclient.URIException; import org.apache.commons.httpclient.util.URIUtil; import org.apache.commons.lang.NotImplementedException; +import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryPolicyFactory; @@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.BlockEntry; +import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlobDirectory; @@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface { public CloudBlob getBlob() { return null; } + + @Override + public List downloadBlockList(BlockListingFilter filter, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException { + + throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests"); + } + @Override + public void uploadBlock(String blockId, InputStream sourceStream, + long length, BlobRequestOptions options, + OperationContext opContext) throws IOException, StorageException { + throw new UnsupportedOperationException("uploadBlock not used in Mock Tests"); + } + + @Override + public void commitBlockList(List blockList, AccessCondition accessCondition, + BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException { + throw new UnsupportedOperationException("commitBlockList not used in Mock Tests"); + } + + public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options, + OperationContext opContext) throws StorageException { + throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests"); + } } class MockCloudPageBlobWrapper extends MockCloudBlobWrapper @@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface { public CloudBlob getBlob() { return null; } + + public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options, + OperationContext opContext) throws StorageException { + throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests"); + } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java new file mode 100644 index 00000000000..de5199083a0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest { + + private static final String TEST_FILE = "test.dat"; + private static final Path TEST_PATH = new Path(TEST_FILE); + + private AzureBlobStorageTestAccount testAccount = null; + + @Before + public void setUp() throws Exception { + super.setUp(); + testAccount = createTestAccount(); + fs = testAccount.getFileSystem(); + Configuration conf = fs.getConf(); + conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + } + + /* + * Helper method that creates test data of size provided by the + * "size" parameter. + */ + private static byte[] getTestData(int size) { + byte[] testData = new byte[size]; + System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size); + return testData; + } + + // Helper method to create file and write fileSize bytes of data on it. + private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable { + + FSDataOutputStream createStream = null; + try { + createStream = fs.create(testPath); + byte[] fileData = null; + + if (fileSize != 0) { + fileData = getTestData(fileSize); + createStream.write(fileData); + } + return fileData; + } finally { + if (createStream != null) { + createStream.close(); + } + } + } + + /* + * Helper method to verify a file data equal to "dataLength" parameter + */ + private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex, + FSDataInputStream srcStream) { + + try { + + byte[] fileBuffer = new byte[dataLength]; + byte[] testDataBuffer = new byte[dataLength]; + + int fileBytesRead = srcStream.read(fileBuffer); + + if (fileBytesRead < dataLength) { + return false; + } + + System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength); + + if (!Arrays.equals(fileBuffer, testDataBuffer)) { + return false; + } + + return true; + + } catch (Exception ex) { + return false; + } + + } + + /* + * Helper method to verify Append on a testFile. + */ + private boolean verifyAppend(byte[] testData, Path testFile) { + + FSDataInputStream srcStream = null; + try { + + srcStream = fs.open(testFile); + int baseBufferSize = 2048; + int testDataSize = testData.length; + int testDataIndex = 0; + + while (testDataSize > baseBufferSize) { + + if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) { + return false; + } + testDataIndex += baseBufferSize; + testDataSize -= baseBufferSize; + } + + if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) { + return false; + } + + return true; + } catch(Exception ex) { + return false; + } finally { + if (srcStream != null) { + try { + srcStream.close(); + } catch(IOException ioe) { + // Swallowing + } + } + } + } + + /* + * Test case to verify if an append on small size data works. This tests + * append E2E + */ + @Test + public void testSingleAppend() throws Throwable{ + + FSDataOutputStream appendStream = null; + try { + int baseDataSize = 50; + byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH); + + int appendDataSize = 20; + byte[] appendDataBuffer = getTestData(appendDataSize); + appendStream = fs.append(TEST_PATH, 10); + appendStream.write(appendDataBuffer); + appendStream.close(); + byte[] testData = new byte[baseDataSize + appendDataSize]; + System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize); + System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize); + + Assert.assertTrue(verifyAppend(testData, TEST_PATH)); + } finally { + if (appendStream != null) { + appendStream.close(); + } + } + } + + /* + * Test case to verify append to an empty file. + */ + @Test + public void testSingleAppendOnEmptyFile() throws Throwable { + + FSDataOutputStream appendStream = null; + + try { + createBaseFileWithData(0, TEST_PATH); + + int appendDataSize = 20; + byte[] appendDataBuffer = getTestData(appendDataSize); + appendStream = fs.append(TEST_PATH, 10); + appendStream.write(appendDataBuffer); + appendStream.close(); + + Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH)); + } finally { + if (appendStream != null) { + appendStream.close(); + } + } + } + + /* + * Test to verify that we can open only one Append stream on a File. + */ + @Test + public void testSingleAppenderScenario() throws Throwable { + + FSDataOutputStream appendStream1 = null; + FSDataOutputStream appendStream2 = null; + IOException ioe = null; + try { + createBaseFileWithData(0, TEST_PATH); + appendStream1 = fs.append(TEST_PATH, 10); + boolean encounteredException = false; + try { + appendStream2 = fs.append(TEST_PATH, 10); + } catch(IOException ex) { + encounteredException = true; + ioe = ex; + } + + appendStream1.close(); + + Assert.assertTrue(encounteredException); + GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe); + } finally { + if (appendStream1 != null) { + appendStream1.close(); + } + + if (appendStream2 != null) { + appendStream2.close(); + } + } + } + + /* + * Tests to verify multiple appends on a Blob. + */ + @Test + public void testMultipleAppends() throws Throwable { + + int baseDataSize = 50; + byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH); + + int appendDataSize = 100; + int targetAppendCount = 50; + byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)]; + int testDataIndex = 0; + System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize); + testDataIndex += baseDataSize; + + int appendCount = 0; + + FSDataOutputStream appendStream = null; + + try { + while (appendCount < targetAppendCount) { + + byte[] appendDataBuffer = getTestData(appendDataSize); + appendStream = fs.append(TEST_PATH, 30); + appendStream.write(appendDataBuffer); + appendStream.close(); + + System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize); + testDataIndex += appendDataSize; + appendCount++; + } + + Assert.assertTrue(verifyAppend(testData, TEST_PATH)); + + } finally { + if (appendStream != null) { + appendStream.close(); + } + } + } + + /* + * Test to verify we multiple appends on the same stream. + */ + @Test + public void testMultipleAppendsOnSameStream() throws Throwable { + + int baseDataSize = 50; + byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH); + int appendDataSize = 100; + int targetAppendCount = 50; + byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)]; + int testDataIndex = 0; + System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize); + testDataIndex += baseDataSize; + int appendCount = 0; + + FSDataOutputStream appendStream = null; + + try { + + while (appendCount < targetAppendCount) { + + appendStream = fs.append(TEST_PATH, 50); + + int singleAppendChunkSize = 20; + int appendRunSize = 0; + while (appendRunSize < appendDataSize) { + + byte[] appendDataBuffer = getTestData(singleAppendChunkSize); + appendStream.write(appendDataBuffer); + System.arraycopy(appendDataBuffer, 0, testData, + testDataIndex + appendRunSize, singleAppendChunkSize); + + appendRunSize += singleAppendChunkSize; + } + + appendStream.close(); + testDataIndex += appendDataSize; + appendCount++; + } + + Assert.assertTrue(verifyAppend(testData, TEST_PATH)); + } finally { + if (appendStream != null) { + appendStream.close(); + } + } + } + + @Test(expected=UnsupportedOperationException.class) + /* + * Test to verify the behavior when Append Support configuration flag is set to false + */ + public void testFalseConfigurationFlagBehavior() throws Throwable { + + fs = testAccount.getFileSystem(); + Configuration conf = fs.getConf(); + conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + FSDataOutputStream appendStream = null; + + try { + createBaseFileWithData(0, TEST_PATH); + appendStream = fs.append(TEST_PATH, 10); + } finally { + if (appendStream != null) { + appendStream.close(); + } + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } +}