HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.

(cherry picked from commit 8bc93db2e7)
This commit is contained in:
cnauroth 2016-01-18 09:08:53 -08:00
parent da887e4252
commit 62d6166211
12 changed files with 1550 additions and 130 deletions

View File

@ -99,6 +99,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
(Larry McCay via cnauroth)
HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of

View File

@ -33,13 +33,11 @@
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -64,6 +62,7 @@
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
@ -2681,4 +2680,24 @@ protected void finalize() throws Throwable {
close();
super.finalize();
}
@Override
public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
try {
if (isPageBlobKey(key)) {
throw new UnsupportedOperationException("Append not supported for Page Blobs");
}
CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
appendStream.initialize();
return new DataOutputStream(appendStream);
} catch(Exception ex) {
throw new AzureException(ex);
}
}
}

View File

@ -0,0 +1,775 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Locale;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
/**
* Stream object that implememnts append for Block Blobs in WASB.
*/
public class BlockBlobAppendStream extends OutputStream {
private final String key;
private final int bufferSize;
private ByteArrayOutputStream outBuffer;
private final CloudBlockBlobWrapper blob;
private final OperationContext opContext;
/**
* Variable to track if the stream has been closed.
*/
private boolean closed = false;
/**
* Variable to track if the append lease is released.
*/
private volatile boolean leaseFreed;
/**
* Variable to track if the append stream has been
* initialized.
*/
private boolean initialized = false;
/**
* Last IOException encountered
*/
private volatile IOException lastError = null;
/**
* List to keep track of the uncommitted azure storage
* block ids
*/
private final List<BlockEntry> uncommittedBlockEntries;
private static final int UNSET_BLOCKS_COUNT = -1;
/**
* Variable to hold the next block id to be used for azure
* storage blocks.
*/
private long nextBlockCount = UNSET_BLOCKS_COUNT;
private final Random sequenceGenerator = new Random();
/**
* Time to wait to renew lease in milliseconds
*/
private static final int LEASE_RENEWAL_PERIOD = 10000;
/**
* Number of times to retry for lease renewal
*/
private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
/**
* Time to wait before retrying to set the lease
*/
private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
/**
* Metadata key used on the blob to indicate append lease is active
*/
public static final String APPEND_LEASE = "append_lease";
/**
* Timeout value for the append lease in millisecs. If the lease is not
* renewed within 30 seconds then another thread can acquire the append lease
* on the blob
*/
public static final int APPEND_LEASE_TIMEOUT = 30000;
/**
* Metdata key used on the blob to indicate last modified time of append lease
*/
public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
/**
* Number of times block upload needs is retried.
*/
private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
/**
* Wait time between block upload retries in millisecs.
*/
private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
private static final int MAX_BLOCK_COUNT = 100000;
private ThreadPoolExecutor ioThreadPool;
/**
* Atomic integer to provide thread id for thread names for uploader threads.
*/
private final AtomicInteger threadSequenceNumber;
/**
* Prefix to be used for thread names for uploader threads.
*/
private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
private static final String UTC_STR = "UTC";
public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
final String aKey, final int bufferSize, final OperationContext opContext)
throws IOException {
if (null == aKey || 0 == aKey.length()) {
throw new IllegalArgumentException(
"Illegal argument: The key string is null or empty");
}
if (0 >= bufferSize) {
throw new IllegalArgumentException(
"Illegal argument bufferSize cannot be zero or negative");
}
this.blob = blob;
this.opContext = opContext;
this.key = aKey;
this.bufferSize = bufferSize;
this.threadSequenceNumber = new AtomicInteger(0);
setBlocksCount();
this.outBuffer = new ByteArrayOutputStream(bufferSize);
this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
// Acquire append lease on the blob.
try {
//Set the append lease if the value of the append lease is false
if (!updateBlobAppendMetadata(true, false)) {
LOG.error("Unable to set Append Lease on the Blob : {} "
+ "Possibly because another client already has a create or append stream open on the Blob", key);
throw new IOException("Unable to set Append lease on the Blob. "
+ "Possibly because another client already had an append stream open on the Blob.");
}
} catch (StorageException ex) {
LOG.error("Encountered Storage exception while acquiring append "
+ "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
key, ex, ex.getErrorCode());
throw new IOException(ex);
}
leaseFreed = false;
}
/**
* Helper method that starts an Append Lease renewer thread and the
* thread pool.
*/
public synchronized void initialize() {
if (initialized) {
return;
}
/*
* Start the thread for Append lease renewer.
*/
Thread appendLeaseRenewer = new Thread(new AppendRenewer());
appendLeaseRenewer.setDaemon(true);
appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
appendLeaseRenewer.start();
/*
* Parameters to ThreadPoolExecutor:
* corePoolSize : the number of threads to keep in the pool, even if they are idle,
* unless allowCoreThreadTimeOut is set
* maximumPoolSize : the maximum number of threads to allow in the pool
* keepAliveTime - when the number of threads is greater than the core,
* this is the maximum time that excess idle threads will
* wait for new tasks before terminating.
* unit - the time unit for the keepAliveTime argument
* workQueue - the queue to use for holding tasks before they are executed
* This queue will hold only the Runnable tasks submitted by the execute method.
*/
this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
initialized = true;
}
/**
* Get the blob name.
*
* @return String Blob name.
*/
public String getKey() {
return key;
}
/**
* Get the backing blob.
* @return buffer size of the stream.
*/
public int getBufferSize() {
return bufferSize;
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
*
* @param byteVal
* the byteValue to write.
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final int byteVal) throws IOException {
write(new byte[] { (byte) (byteVal & 0xFF) });
}
/**
* Writes b.length bytes from the specified byte array to this output stream.
*
* @param data
* the byte array to write.
*
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final byte[] data) throws IOException {
write(data, 0, data.length);
}
/**
* Writes length bytes from the specified byte array starting at offset to
* this output stream.
*
* @param data
* the byte array to write.
* @param offset
* the start offset in the data.
* @param length
* the number of bytes to write.
* @throws IOException
* if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final byte[] data, final int offset, final int length)
throws IOException {
if (offset < 0 || length < 0 || length > data.length - offset) {
throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
}
writeInternal(data, offset, length);
}
@Override
public synchronized void close() throws IOException {
if (!initialized) {
throw new IOException("Trying to close an uninitialized Append stream");
}
if (closed) {
return;
}
if (leaseFreed) {
throw new IOException(String.format("Attempting to close an append stream on blob : %s "
+ " that does not have lease on the Blob. Failing close", key));
}
if (outBuffer.size() > 0) {
uploadBlockToStorage(outBuffer.toByteArray());
}
ioThreadPool.shutdown();
try {
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.error("Time out occured while waiting for IO request to finish in append"
+ " for blob : {}", key);
NativeAzureFileSystemHelper.logAllLiveStackTraces();
throw new IOException("Timed out waiting for IO requests to finish");
}
} catch(InterruptedException intrEx) {
// Restore the interrupted status
Thread.currentThread().interrupt();
LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
throw new IOException("Append Commit interrupted.");
}
// Calling commit after all blocks are succesfully uploaded.
if (lastError == null) {
commitAppendBlocks();
}
// Perform cleanup.
cleanup();
if (lastError != null) {
throw lastError;
}
}
/**
* Helper method that cleans up the append stream.
*/
private synchronized void cleanup() {
closed = true;
try {
// Set the value of append lease to false if the value is set to true.
updateBlobAppendMetadata(false, true);
} catch(StorageException ex) {
LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
+ "Error Code : {}",
key, ex, ex.getErrorCode());
lastError = new IOException(ex);
}
leaseFreed = true;
}
/**
* Method to commit all the uncommited blocks to azure storage.
* If the commit fails then blocks are automatically cleaned up
* by Azure storage.
* @throws IOException
*/
private synchronized void commitAppendBlocks() throws IOException {
SelfRenewingLease lease = null;
try {
if (uncommittedBlockEntries.size() > 0) {
//Acquiring lease on the blob.
lease = new SelfRenewingLease(blob);
// Downloading existing blocks
List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
new BlobRequestOptions(), opContext);
// Adding uncommitted blocks.
blockEntries.addAll(uncommittedBlockEntries);
AccessCondition accessCondition = new AccessCondition();
accessCondition.setLeaseID(lease.getLeaseID());
blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
uncommittedBlockEntries.clear();
}
} catch(StorageException ex) {
LOG.error("Storage exception encountered during block commit phase of append for blob"
+ " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
throw new IOException("Encountered Exception while committing append blocks", ex);
} finally {
if (lease != null) {
try {
lease.free();
} catch(StorageException ex) {
LOG.debug("Exception encountered while releasing lease for "
+ "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
// Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
}
}
}
}
/**
* Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
* storage SDK.
*/
private void setBlocksCount() throws IOException {
try {
if (nextBlockCount == UNSET_BLOCKS_COUNT) {
nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
List<BlockEntry> blockEntries =
blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
nextBlockCount += blockEntries.size();
}
} catch (StorageException ex) {
LOG.debug("Encountered storage exception during setting next Block Count."
+ " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
throw new IOException(ex);
}
}
/**
* Helper method that generates the next block id for uploading a block to azure storage.
* @return String representing the block ID generated.
* @throws IOException
*/
private String generateBlockId() throws IOException {
if (nextBlockCount == UNSET_BLOCKS_COUNT) {
throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
}
byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
}
/**
* Returns a byte array that represents the data of a <code>long</code> value. This
* utility method is copied from com.microsoft.azure.storage.core.Utility class.
* This class is marked as internal, hence we clone the method here and not express
* dependency on the Utility Class
*
* @param value
* The value from which the byte array will be returned.
*
* @return A byte array that represents the data of the specified <code>long</code> value.
*/
private static byte[] getBytesFromLong(final long value) {
final byte[] tempArray = new byte[8];
for (int m = 0; m < 8; m++) {
tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
}
return tempArray;
}
/**
* Helper method that creates a thread to upload a block to azure storage.
* @param payload
* @throws IOException
*/
private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
// upload payload to azure storage
nextBlockCount++;
String blockId = generateBlockId();
// Since uploads of the Azure storage are done in parallel threads, we go ahead
// add the blockId in the uncommitted list. If the upload of the block fails
// we don't commit the blockIds.
uncommittedBlockEntries.add(new BlockEntry(blockId));
ioThreadPool.execute(new WriteRequest(payload, blockId));
}
/**
* Helper method to updated the Blob metadata during Append lease operations.
* Blob metadata is updated to holdLease value only if the current lease
* status is equal to testCondition and the last update on the blob metadata
* is less that 30 secs old.
* @param holdLease
* @param testCondition
* @return true if the updated lease operation was successful or false otherwise
* @throws StorageException
*/
private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
throws StorageException {
SelfRenewingLease lease = null;
StorageException lastStorageException = null;
int leaseRenewalRetryCount = 0;
/*
* Updating the Blob metadata honours following algorithm based on
* 1) If the append lease metadata is present
* 2) Last updated time of the append lease
* 3) Previous value of the Append lease metadata.
*
* The algorithm:
* 1) If append lease metadata is not part of the Blob. In this case
* this is the first client to Append so we update the metadata.
* 2) If append lease metadata is present and timeout has occurred.
* In this case irrespective of what the value of the append lease is we update the metadata.
* 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
* and timeout has not occurred, we update the metadata.
* 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
* and timeout has not occurred, we do not update metadata and return false.
*
*/
while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
lastStorageException = null;
synchronized(this) {
try {
final Calendar currentCalendar = Calendar
.getInstance(Locale.US);
currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
long currentTime = currentCalendar.getTime().getTime();
// Acquire lease on the blob.
lease = new SelfRenewingLease(blob);
blob.downloadAttributes(opContext);
HashMap<String, String> metadata = blob.getMetadata();
if (metadata.containsKey(APPEND_LEASE)
&& currentTime - Long.parseLong(
metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
&& !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
return false;
}
metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
blob.setMetadata(metadata);
AccessCondition accessCondition = new AccessCondition();
accessCondition.setLeaseID(lease.getLeaseID());
blob.uploadMetadata(accessCondition, null, opContext);
return true;
} catch (StorageException ex) {
lastStorageException = ex;
LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
+ "Error Code : {}",
key, ex, ex.getErrorCode());
leaseRenewalRetryCount++;
} finally {
if (lease != null) {
try {
lease.free();
} catch(StorageException ex) {
LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
+ "during Append metadata operation. Storage Exception {} "
+ "Error Code : {} ", key, ex, ex.getErrorCode());
} finally {
lease = null;
}
}
}
}
if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
throw lastStorageException;
} else {
try {
Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
} catch(InterruptedException ex) {
LOG.debug("Blob append metadata updated method interrupted");
Thread.currentThread().interrupt();
}
}
}
// The code should not enter here because the while loop will
// always be executed and if the while loop is executed we
// would returning from the while loop.
return false;
}
/**
* This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
* @param data
* @param offset
* @param length
* @throws IOException
*/
private synchronized void writeInternal(final byte[] data, final int offset, final int length)
throws IOException {
if (!initialized) {
throw new IOException("Trying to write to an un-initialized Append stream");
}
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
if (leaseFreed) {
throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
}
byte[] currentData = new byte[length];
System.arraycopy(data, offset, currentData, 0, length);
// check to see if the data to be appended exceeds the
// buffer size. If so we upload a block to azure storage.
while ((outBuffer.size() + currentData.length) > bufferSize) {
byte[] payload = new byte[bufferSize];
// Add data from the existing buffer
System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
// Updating the available size in the payload
int availableSpaceInPayload = bufferSize - outBuffer.size();
// Adding data from the current call
System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
uploadBlockToStorage(payload);
// updating the currentData buffer
byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
System.arraycopy(currentData, availableSpaceInPayload,
tempBuffer, 0, currentData.length - availableSpaceInPayload);
currentData = tempBuffer;
outBuffer = new ByteArrayOutputStream(bufferSize);
}
outBuffer.write(currentData);
}
/**
* Runnable instance that uploads the block of data to azure storage.
*
*
*/
private class WriteRequest implements Runnable {
private final byte[] dataPayload;
private final String blockId;
public WriteRequest(byte[] dataPayload, String blockId) {
this.dataPayload = dataPayload;
this.blockId = blockId;
}
@Override
public void run() {
int uploadRetryAttempts = 0;
IOException lastLocalException = null;
while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
try {
blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
dataPayload.length, new BlobRequestOptions(), opContext);
break;
} catch(Exception ioe) {
Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
uploadRetryAttempts++;
lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
try {
Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
lastError = lastLocalException;
}
}
}
/**
* A ThreadFactory that creates uploader thread with
* meaningful names helpful for debugging purposes.
*/
class UploaderThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
threadSequenceNumber.getAndIncrement()));
return t;
}
}
/**
* A deamon thread that renews the Append lease on the blob.
* The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
* the lease. If an error is encountered while renewing the lease
* then an lease is released by this thread, which fails all other
* operations.
*/
private class AppendRenewer implements Runnable {
@Override
public void run() {
while (!leaseFreed) {
try {
Thread.sleep(LEASE_RENEWAL_PERIOD);
} catch (InterruptedException ie) {
LOG.debug("Appender Renewer thread interrupted");
Thread.currentThread().interrupt();
}
Log.debug("Attempting to renew append lease on {}", key);
try {
if (!leaseFreed) {
// Update the blob metadata to renew the append lease
if (!updateBlobAppendMetadata(true, true)) {
LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
leaseFreed = true;
}
}
} catch (StorageException ex) {
LOG.debug("Lease renewal for Blob : {} encountered "
+ "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
// We swallow the exception here because if the blob metadata is not updated for
// APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
// continue forward if it needs to append.
leaseFreed = true;
}
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -31,7 +32,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
@ -41,7 +41,6 @@
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -60,8 +59,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@ -73,12 +70,8 @@
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import org.apache.hadoop.io.IOUtils;
@ -288,7 +281,7 @@ public void writeFile(FileSystem fs) throws IOException {
throw new IOException("Unable to write RenamePending file for folder rename from "
+ srcKey + " to " + dstKey, e);
} finally {
NativeAzureFileSystem.cleanup(LOG, output);
NativeAzureFileSystemHelper.cleanup(LOG, output);
}
}
@ -663,6 +656,11 @@ public String getScheme() {
public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
/*
* Property to enable Append API.
*/
public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
@ -728,7 +726,7 @@ public synchronized int read() throws FileNotFoundException, IOException {
return result;
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@ -736,7 +734,7 @@ public synchronized int read() throws FileNotFoundException, IOException {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@ -782,7 +780,7 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
return result;
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@ -790,7 +788,7 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@ -822,10 +820,10 @@ public synchronized void seek(long pos) throws FileNotFoundException, EOFExcepti
this.pos);
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -1041,7 +1039,7 @@ private void restoreKey() throws IOException {
private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
private boolean appendSupportEnabled = false;
public NativeAzureFileSystem() {
// set store in initialize()
@ -1164,7 +1162,7 @@ public void initialize(URI uri, Configuration conf)
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = {}",
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
@ -1294,7 +1292,61 @@ public AzureFileSystemInstrumentation getInstrumentation() {
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
throw new IOException("Not supported");
if (!appendSupportEnabled) {
throw new UnsupportedOperationException("Append Support not enabled");
}
LOG.debug("Opening file: {} for append", f);
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
FileMetadata meta = null;
try {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
} else {
throw ex;
}
}
if (meta == null) {
throw new FileNotFoundException(f.toString());
}
if (meta.isDir()) {
throw new FileNotFoundException(f.toString()
+ " is a directory not a file.");
}
if (store.isPageBlobKey(key)) {
throw new IOException("Append not supported for Page Blobs");
}
DataOutputStream appendStream = null;
try {
appendStream = store.retrieveAppendStream(key, bufferSize);
} catch (Exception ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
} else {
throw ex;
}
}
return new FSDataOutputStream(appendStream, statistics);
}
@Override
@ -1379,7 +1431,7 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
lease.free();
}
} catch (Exception e) {
NativeAzureFileSystem.cleanup(LOG, out);
NativeAzureFileSystemHelper.cleanup(LOG, out);
String msg = "Unable to free lease on " + parent.toUri();
LOG.error(msg);
throw new IOException(msg, e);
@ -1577,10 +1629,10 @@ public boolean delete(Path f, boolean recursive,
metaFile = store.retrieveMetadata(key);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@ -1611,7 +1663,7 @@ public boolean delete(Path f, boolean recursive,
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@ -1619,7 +1671,7 @@ public boolean delete(Path f, boolean recursive,
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@ -1662,10 +1714,10 @@ public boolean delete(Path f, boolean recursive,
instrumentation.fileDeleted();
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@ -1684,7 +1736,7 @@ public boolean delete(Path f, boolean recursive,
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@ -1692,7 +1744,7 @@ public boolean delete(Path f, boolean recursive,
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@ -1728,10 +1780,10 @@ public boolean delete(Path f, boolean recursive,
priorLastKey);
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@ -1763,10 +1815,10 @@ public boolean delete(Path f, boolean recursive,
instrumentation.fileDeleted();
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@ -1785,10 +1837,10 @@ public boolean delete(Path f, boolean recursive,
store.delete(key);
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@ -1829,10 +1881,10 @@ public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOExceptio
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -1922,10 +1974,10 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
meta = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", f));
}
@ -1948,10 +2000,10 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -1972,10 +2024,10 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -2196,10 +2248,10 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -2219,10 +2271,10 @@ public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundExcepti
try {
inputStream = store.retrieve(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@ -2261,14 +2313,14 @@ public boolean rename(Path src, Path dst) throws FileNotFoundException, IOExcept
dstMetadata = store.retrieveMetadata(dstKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
// there is a race condition. If there is another thread which deletes the destination
// file or folder, then this thread calling rename should be able to continue with
// rename gracefully. Hence the StorageException is swallowed here.
if (innerException instanceof StorageException) {
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
+ "Swallowin the exception to handle race condition gracefully", dstKey);
}
@ -2294,10 +2346,10 @@ public boolean rename(Path src, Path dst) throws FileNotFoundException, IOExcept
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
return false;
@ -2320,10 +2372,10 @@ public boolean rename(Path src, Path dst) throws FileNotFoundException, IOExcept
try {
srcMetadata = store.retrieveMetadata(srcKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Source {} doesn't exists. Failing rename", src);
return false;
@ -2342,10 +2394,10 @@ public boolean rename(Path src, Path dst) throws FileNotFoundException, IOExcept
store.rename(srcKey, dstKey);
} catch(IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
@ -2552,10 +2604,10 @@ public void setPermission(Path p, FsPermission permission) throws FileNotFoundEx
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@ -2591,10 +2643,10 @@ public void setOwner(Path p, String username, String groupname)
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@ -2817,52 +2869,4 @@ private static String encodeKey(String aKey) {
// Return to the caller with the randomized key.
return randomizedKey;
}
private static void cleanup(Logger log, java.io.Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch(IOException e) {
if (log != null) {
log.debug("Exception in closing {}", closeable, e);
}
}
}
}
/*
* Helper method to recursively check if the cause of the exception is
* a Azure storage exception.
*/
private static Throwable checkForAzureStorageException(Exception e) {
Throwable innerException = e.getCause();
while (innerException != null
&& !(innerException instanceof StorageException)) {
innerException = innerException.getCause();
}
return innerException;
}
/*
* Helper method to check if the AzureStorageException is
* because backing blob was not found.
*/
private static boolean isFileNotFoundException(StorageException e) {
String errorCode = ((StorageException) e).getErrorCode();
if (errorCode != null
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
return true;
}
return false;
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Utility class that has helper methods.
*
*/
@InterfaceAudience.Private
final class NativeAzureFileSystemHelper {
private NativeAzureFileSystemHelper() {
// Hiding the cosnstructor as this is a utility class.
}
private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
public static void cleanup(Logger log, java.io.Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch(IOException e) {
if (log != null) {
log.debug("Exception in closing {}", closeable, e);
}
}
}
}
/*
* Helper method to recursively check if the cause of the exception is
* a Azure storage exception.
*/
public static Throwable checkForAzureStorageException(Exception e) {
Throwable innerException = e.getCause();
while (innerException != null
&& !(innerException instanceof StorageException)) {
innerException = innerException.getCause();
}
return innerException;
}
/*
* Helper method to check if the AzureStorageException is
* because backing blob was not found.
*/
public static boolean isFileNotFoundException(StorageException e) {
String errorCode = e.getErrorCode();
if (errorCode != null
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
return true;
}
return false;
}
/*
* Helper method that logs stack traces from all live threads.
*/
public static void logAllLiveStackTraces() {
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
LOG.debug("Thread " + entry.getKey().getName());
StackTraceElement[] trace = entry.getValue();
for (int j = 0; j < trace.length; j++) {
LOG.debug("\tat " + trace[j]);
}
}
}
}

View File

@ -107,4 +107,6 @@ void updateFolderLastModifiedTime(String key, Date lastModified,
void delete(String key, SelfRenewingLease lease) throws IOException;
SelfRenewingLease acquireLease(String key) throws AzureException;
DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
}

View File

@ -29,8 +29,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@ -216,7 +214,7 @@ public synchronized void close() throws IOException {
LOG.debug(ioThreadPool.toString());
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
logAllStackTraces();
NativeAzureFileSystemHelper.logAllLiveStackTraces();
LOG.debug(ioThreadPool.toString());
throw new IOException("Timed out waiting for IO requests to finish");
}
@ -230,18 +228,7 @@ public synchronized void close() throws IOException {
closed = true;
}
// Log the stacks of all threads.
private void logAllStackTraces() {
Map liveThreads = Thread.getAllStackTraces();
for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
Thread key = (Thread) i.next();
LOG.debug("Thread " + key.getName());
StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
for (int j = 0; j < trace.length; j++) {
LOG.debug("\tat " + trace[j]);
}
}
}
/**
* A single write request for data to write to Azure storage.

View File

@ -24,11 +24,13 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@ -36,6 +38,8 @@
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CopyState;
@ -269,13 +273,13 @@ public abstract void downloadAttributes(OperationContext opContext)
/**
* Uploads the container's metadata using the specified operation context.
*
*
* @param opContext
* An {@link OperationContext} object that represents the context
* for the current operation. This object is used to track requests
* to the storage service, and to provide additional runtime
* information about the operation.
*
*
* @throws StorageException
* If a storage service error occurred.
*/
@ -545,6 +549,30 @@ InputStream openInputStream(BlobRequestOptions options,
void uploadMetadata(OperationContext opContext)
throws StorageException;
/**
* Uploads the blob's metadata to the storage service using the specified
* lease ID, request options, and operation context.
*
* @param accessCondition
* A {@link AccessCondition} object that represents the access conditions for the blob.
*
* @param options
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
* <code>null</code> will use the default request options from the associated service client (
* {@link CloudBlobClient}).
*
* @param opContext
* An {@link OperationContext} object that represents the context
* for the current operation. This object is used to track requests
* to the storage service, and to provide additional runtime
* information about the operation.
*
* @throws StorageException
* If a storage service error occurred.
*/
void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException;
void uploadProperties(OperationContext opContext,
SelfRenewingLease lease)
throws StorageException;
@ -602,6 +630,63 @@ public abstract interface CloudBlockBlobWrapper
OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException;
/**
*
* @param filter A {@link BlockListingFilter} value that specifies whether to download
* committed blocks, uncommitted blocks, or all blocks.
* @param options A {@link BlobRequestOptions} object that specifies any additional options for
* the request. Specifying null will use the default request options from
* the associated service client ( CloudBlobClient).
* @param opContext An {@link OperationContext} object that represents the context for the current
* operation. This object is used to track requests to the storage service,
* and to provide additional runtime information about the operation.
* @return An ArrayList object of {@link BlockEntry} objects that represent the list
* block items downloaded from the block blob.
* @throws IOException If an I/O error occurred.
* @throws StorageException If a storage service error occurred.
*/
List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException;
/**
*
* @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
* the length of all Block IDs must be identical.
* @param sourceStream An {@link InputStream} object that represents the input stream to write to the
* block blob.
* @param length A long which represents the length, in bytes, of the stream data,
* or -1 if unknown.
* @param options A {@link BlobRequestOptions} object that specifies any additional options for the
* request. Specifying null will use the default request options from the
* associated service client ( CloudBlobClient).
* @param opContext An {@link OperationContext} object that represents the context for the current operation.
* This object is used to track requests to the storage service, and to provide
* additional runtime information about the operation.
* @throws IOException If an I/O error occurred.
* @throws StorageException If a storage service error occurred.
*/
void uploadBlock(String blockId, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException;
/**
*
* @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list
* block items being committed. The size field is ignored.
* @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
* @param options A {@link BlobRequestOptions} object that specifies any additional options for the
* request. Specifying null will use the default request options from the associated
* service client ( CloudBlobClient).
* @param opContext An {@link OperationContext} object that represents the context for the current operation.
* This object is used to track requests to the storage service, and to provide additional
* runtime information about the operation.
* @throws IOException If an I/O error occurred.
* @throws StorageException If a storage service error occurred.
*/
void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException;
}
/**

View File

@ -27,7 +27,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.azure.storage.AccessCondition;
@ -40,6 +40,8 @@
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@ -362,7 +364,13 @@ public CloudBlobDirectory getParent() throws URISyntaxException,
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
getBlob().uploadMetadata(null, null, opContext);
uploadMetadata(null, null, opContext);
}
@Override
public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
OperationContext opContext) throws StorageException{
getBlob().uploadMetadata(accessConditions, options, opContext);
}
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
@ -396,7 +404,7 @@ public CopyState getCopyState() {
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
throws StorageException, URISyntaxException {
getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
null, null, options, opContext);
}
@ -440,6 +448,25 @@ public void uploadProperties(OperationContext opContext)
getBlob().uploadProperties(null, null, opContext);
}
@Override
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
}
@Override
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
}
}
static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {

View File

@ -23,6 +23,7 @@
* [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
* [Atomic Folder Rename](#Atomic_Folder_Rename)
* [Accessing wasb URLs](#Accessing_wasb_URLs)
* [Append API Support and Configuration](#Append_API_Support_and_Configuration)
* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
## <a name="Introduction" />Introduction
@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
## <a name="Limitations" />Limitations
* The append operation is not implemented.
* File owner and group are persisted, but the permissions model is not enforced.
Authorization occurs at the level of the entire Azure Blob Storage account.
* File last access time is not tracked.
@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
This causes all bare paths, such as `/testDir/testFile` to resolve automatically
to that file system.
### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
The Azure Blob Storage interface for Hadoop has optional support for Append API for
single writer by setting the configuration `fs.azure.enable.append.support` to true.
For Example:
<property>
<name>fs.azure.enable.append.support</name>
<value>true</value>
</property>
It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
support does not enforce single writer internally but requires applications to guarantee this semantic.
It becomes a responsibility of the application either to ensure single-threaded handling for a particular
file path, or rely on some external locking mechanism of its own. Failure to do so will result in
unexpected behavior.
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests

View File

@ -32,11 +32,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.TimeZone;
import java.util.List;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.util.URIUtil;
import org.apache.commons.lang.NotImplementedException;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@ -46,6 +47,8 @@
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
@ -524,6 +527,30 @@ public SelfRenewingLease acquireLease() {
public CloudBlob getBlob() {
return null;
}
@Override
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
}
@Override
public void uploadBlock(String blockId, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
}
@Override
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
}
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
}
}
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
@ -580,5 +607,10 @@ public SelfRenewingLease acquireLease() {
public CloudBlob getBlob() {
return null;
}
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException {
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
}
}
}

View File

@ -0,0 +1,362 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest {
private static final String TEST_FILE = "test.dat";
private static final Path TEST_PATH = new Path(TEST_FILE);
private AzureBlobStorageTestAccount testAccount = null;
@Before
public void setUp() throws Exception {
super.setUp();
testAccount = createTestAccount();
fs = testAccount.getFileSystem();
Configuration conf = fs.getConf();
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
URI uri = fs.getUri();
fs.initialize(uri, conf);
}
/*
* Helper method that creates test data of size provided by the
* "size" parameter.
*/
private static byte[] getTestData(int size) {
byte[] testData = new byte[size];
System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
return testData;
}
// Helper method to create file and write fileSize bytes of data on it.
private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
FSDataOutputStream createStream = null;
try {
createStream = fs.create(testPath);
byte[] fileData = null;
if (fileSize != 0) {
fileData = getTestData(fileSize);
createStream.write(fileData);
}
return fileData;
} finally {
if (createStream != null) {
createStream.close();
}
}
}
/*
* Helper method to verify a file data equal to "dataLength" parameter
*/
private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
FSDataInputStream srcStream) {
try {
byte[] fileBuffer = new byte[dataLength];
byte[] testDataBuffer = new byte[dataLength];
int fileBytesRead = srcStream.read(fileBuffer);
if (fileBytesRead < dataLength) {
return false;
}
System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
if (!Arrays.equals(fileBuffer, testDataBuffer)) {
return false;
}
return true;
} catch (Exception ex) {
return false;
}
}
/*
* Helper method to verify Append on a testFile.
*/
private boolean verifyAppend(byte[] testData, Path testFile) {
FSDataInputStream srcStream = null;
try {
srcStream = fs.open(testFile);
int baseBufferSize = 2048;
int testDataSize = testData.length;
int testDataIndex = 0;
while (testDataSize > baseBufferSize) {
if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
return false;
}
testDataIndex += baseBufferSize;
testDataSize -= baseBufferSize;
}
if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
return false;
}
return true;
} catch(Exception ex) {
return false;
} finally {
if (srcStream != null) {
try {
srcStream.close();
} catch(IOException ioe) {
// Swallowing
}
}
}
}
/*
* Test case to verify if an append on small size data works. This tests
* append E2E
*/
@Test
public void testSingleAppend() throws Throwable{
FSDataOutputStream appendStream = null;
try {
int baseDataSize = 50;
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
int appendDataSize = 20;
byte[] appendDataBuffer = getTestData(appendDataSize);
appendStream = fs.append(TEST_PATH, 10);
appendStream.write(appendDataBuffer);
appendStream.close();
byte[] testData = new byte[baseDataSize + appendDataSize];
System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
} finally {
if (appendStream != null) {
appendStream.close();
}
}
}
/*
* Test case to verify append to an empty file.
*/
@Test
public void testSingleAppendOnEmptyFile() throws Throwable {
FSDataOutputStream appendStream = null;
try {
createBaseFileWithData(0, TEST_PATH);
int appendDataSize = 20;
byte[] appendDataBuffer = getTestData(appendDataSize);
appendStream = fs.append(TEST_PATH, 10);
appendStream.write(appendDataBuffer);
appendStream.close();
Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH));
} finally {
if (appendStream != null) {
appendStream.close();
}
}
}
/*
* Test to verify that we can open only one Append stream on a File.
*/
@Test
public void testSingleAppenderScenario() throws Throwable {
FSDataOutputStream appendStream1 = null;
FSDataOutputStream appendStream2 = null;
IOException ioe = null;
try {
createBaseFileWithData(0, TEST_PATH);
appendStream1 = fs.append(TEST_PATH, 10);
boolean encounteredException = false;
try {
appendStream2 = fs.append(TEST_PATH, 10);
} catch(IOException ex) {
encounteredException = true;
ioe = ex;
}
appendStream1.close();
Assert.assertTrue(encounteredException);
GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
} finally {
if (appendStream1 != null) {
appendStream1.close();
}
if (appendStream2 != null) {
appendStream2.close();
}
}
}
/*
* Tests to verify multiple appends on a Blob.
*/
@Test
public void testMultipleAppends() throws Throwable {
int baseDataSize = 50;
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
int appendDataSize = 100;
int targetAppendCount = 50;
byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
int testDataIndex = 0;
System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
testDataIndex += baseDataSize;
int appendCount = 0;
FSDataOutputStream appendStream = null;
try {
while (appendCount < targetAppendCount) {
byte[] appendDataBuffer = getTestData(appendDataSize);
appendStream = fs.append(TEST_PATH, 30);
appendStream.write(appendDataBuffer);
appendStream.close();
System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
testDataIndex += appendDataSize;
appendCount++;
}
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
} finally {
if (appendStream != null) {
appendStream.close();
}
}
}
/*
* Test to verify we multiple appends on the same stream.
*/
@Test
public void testMultipleAppendsOnSameStream() throws Throwable {
int baseDataSize = 50;
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
int appendDataSize = 100;
int targetAppendCount = 50;
byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
int testDataIndex = 0;
System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
testDataIndex += baseDataSize;
int appendCount = 0;
FSDataOutputStream appendStream = null;
try {
while (appendCount < targetAppendCount) {
appendStream = fs.append(TEST_PATH, 50);
int singleAppendChunkSize = 20;
int appendRunSize = 0;
while (appendRunSize < appendDataSize) {
byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
appendStream.write(appendDataBuffer);
System.arraycopy(appendDataBuffer, 0, testData,
testDataIndex + appendRunSize, singleAppendChunkSize);
appendRunSize += singleAppendChunkSize;
}
appendStream.close();
testDataIndex += appendDataSize;
appendCount++;
}
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
} finally {
if (appendStream != null) {
appendStream.close();
}
}
}
@Test(expected=UnsupportedOperationException.class)
/*
* Test to verify the behavior when Append Support configuration flag is set to false
*/
public void testFalseConfigurationFlagBehavior() throws Throwable {
fs = testAccount.getFileSystem();
Configuration conf = fs.getConf();
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
URI uri = fs.getUri();
fs.initialize(uri, conf);
FSDataOutputStream appendStream = null;
try {
createBaseFileWithData(0, TEST_PATH);
appendStream = fs.append(TEST_PATH, 10);
} finally {
if (appendStream != null) {
appendStream.close();
}
}
}
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create();
}
}