HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.
This commit is contained in:
parent
d40859fab1
commit
8bc93db2e7
|
@ -741,6 +741,8 @@ Release 2.8.0 - UNRELEASED
|
|||
HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
|
||||
(Larry McCay via cnauroth)
|
||||
|
||||
HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-12458. Retries is typoed to spell Retires in parts of
|
||||
|
|
|
@ -33,13 +33,11 @@ import java.net.URLEncoder;
|
|||
import java.security.InvalidKeyException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
|
|||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
|
@ -2680,4 +2679,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
close();
|
||||
super.finalize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
|
||||
|
||||
try {
|
||||
|
||||
if (isPageBlobKey(key)) {
|
||||
throw new UnsupportedOperationException("Append not supported for Page Blobs");
|
||||
}
|
||||
|
||||
CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
|
||||
|
||||
BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
|
||||
appendStream.initialize();
|
||||
|
||||
return new DataOutputStream(appendStream);
|
||||
} catch(Exception ex) {
|
||||
throw new AzureException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,775 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
||||
import org.mortbay.log.Log;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.BlockEntry;
|
||||
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
||||
|
||||
/**
|
||||
* Stream object that implememnts append for Block Blobs in WASB.
|
||||
*/
|
||||
public class BlockBlobAppendStream extends OutputStream {
|
||||
|
||||
private final String key;
|
||||
private final int bufferSize;
|
||||
private ByteArrayOutputStream outBuffer;
|
||||
private final CloudBlockBlobWrapper blob;
|
||||
private final OperationContext opContext;
|
||||
|
||||
/**
|
||||
* Variable to track if the stream has been closed.
|
||||
*/
|
||||
private boolean closed = false;
|
||||
|
||||
/**
|
||||
* Variable to track if the append lease is released.
|
||||
*/
|
||||
|
||||
private volatile boolean leaseFreed;
|
||||
|
||||
/**
|
||||
* Variable to track if the append stream has been
|
||||
* initialized.
|
||||
*/
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
/**
|
||||
* Last IOException encountered
|
||||
*/
|
||||
private volatile IOException lastError = null;
|
||||
|
||||
/**
|
||||
* List to keep track of the uncommitted azure storage
|
||||
* block ids
|
||||
*/
|
||||
private final List<BlockEntry> uncommittedBlockEntries;
|
||||
|
||||
private static final int UNSET_BLOCKS_COUNT = -1;
|
||||
|
||||
/**
|
||||
* Variable to hold the next block id to be used for azure
|
||||
* storage blocks.
|
||||
*/
|
||||
private long nextBlockCount = UNSET_BLOCKS_COUNT;
|
||||
|
||||
private final Random sequenceGenerator = new Random();
|
||||
|
||||
/**
|
||||
* Time to wait to renew lease in milliseconds
|
||||
*/
|
||||
private static final int LEASE_RENEWAL_PERIOD = 10000;
|
||||
|
||||
/**
|
||||
* Number of times to retry for lease renewal
|
||||
*/
|
||||
private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
|
||||
|
||||
/**
|
||||
* Time to wait before retrying to set the lease
|
||||
*/
|
||||
private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
|
||||
|
||||
/**
|
||||
* Metadata key used on the blob to indicate append lease is active
|
||||
*/
|
||||
public static final String APPEND_LEASE = "append_lease";
|
||||
|
||||
/**
|
||||
* Timeout value for the append lease in millisecs. If the lease is not
|
||||
* renewed within 30 seconds then another thread can acquire the append lease
|
||||
* on the blob
|
||||
*/
|
||||
public static final int APPEND_LEASE_TIMEOUT = 30000;
|
||||
|
||||
/**
|
||||
* Metdata key used on the blob to indicate last modified time of append lease
|
||||
*/
|
||||
public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
|
||||
|
||||
/**
|
||||
* Number of times block upload needs is retried.
|
||||
*/
|
||||
private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
|
||||
|
||||
/**
|
||||
* Wait time between block upload retries in millisecs.
|
||||
*/
|
||||
private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
|
||||
|
||||
private static final int MAX_BLOCK_COUNT = 100000;
|
||||
|
||||
private ThreadPoolExecutor ioThreadPool;
|
||||
|
||||
/**
|
||||
* Atomic integer to provide thread id for thread names for uploader threads.
|
||||
*/
|
||||
private final AtomicInteger threadSequenceNumber;
|
||||
|
||||
/**
|
||||
* Prefix to be used for thread names for uploader threads.
|
||||
*/
|
||||
private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
|
||||
|
||||
private static final String UTC_STR = "UTC";
|
||||
|
||||
public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
|
||||
final String aKey, final int bufferSize, final OperationContext opContext)
|
||||
throws IOException {
|
||||
|
||||
if (null == aKey || 0 == aKey.length()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal argument: The key string is null or empty");
|
||||
}
|
||||
|
||||
if (0 >= bufferSize) {
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal argument bufferSize cannot be zero or negative");
|
||||
}
|
||||
|
||||
|
||||
this.blob = blob;
|
||||
this.opContext = opContext;
|
||||
this.key = aKey;
|
||||
this.bufferSize = bufferSize;
|
||||
this.threadSequenceNumber = new AtomicInteger(0);
|
||||
setBlocksCount();
|
||||
|
||||
this.outBuffer = new ByteArrayOutputStream(bufferSize);
|
||||
this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
|
||||
|
||||
// Acquire append lease on the blob.
|
||||
try {
|
||||
//Set the append lease if the value of the append lease is false
|
||||
if (!updateBlobAppendMetadata(true, false)) {
|
||||
LOG.error("Unable to set Append Lease on the Blob : {} "
|
||||
+ "Possibly because another client already has a create or append stream open on the Blob", key);
|
||||
throw new IOException("Unable to set Append lease on the Blob. "
|
||||
+ "Possibly because another client already had an append stream open on the Blob.");
|
||||
}
|
||||
} catch (StorageException ex) {
|
||||
LOG.error("Encountered Storage exception while acquiring append "
|
||||
+ "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
|
||||
key, ex, ex.getErrorCode());
|
||||
|
||||
throw new IOException(ex);
|
||||
}
|
||||
|
||||
leaseFreed = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that starts an Append Lease renewer thread and the
|
||||
* thread pool.
|
||||
*/
|
||||
public synchronized void initialize() {
|
||||
|
||||
if (initialized) {
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Start the thread for Append lease renewer.
|
||||
*/
|
||||
Thread appendLeaseRenewer = new Thread(new AppendRenewer());
|
||||
appendLeaseRenewer.setDaemon(true);
|
||||
appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
|
||||
appendLeaseRenewer.start();
|
||||
|
||||
/*
|
||||
* Parameters to ThreadPoolExecutor:
|
||||
* corePoolSize : the number of threads to keep in the pool, even if they are idle,
|
||||
* unless allowCoreThreadTimeOut is set
|
||||
* maximumPoolSize : the maximum number of threads to allow in the pool
|
||||
* keepAliveTime - when the number of threads is greater than the core,
|
||||
* this is the maximum time that excess idle threads will
|
||||
* wait for new tasks before terminating.
|
||||
* unit - the time unit for the keepAliveTime argument
|
||||
* workQueue - the queue to use for holding tasks before they are executed
|
||||
* This queue will hold only the Runnable tasks submitted by the execute method.
|
||||
*/
|
||||
this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the blob name.
|
||||
*
|
||||
* @return String Blob name.
|
||||
*/
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the backing blob.
|
||||
* @return buffer size of the stream.
|
||||
*/
|
||||
public int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the specified byte to this output stream. The general contract for
|
||||
* write is that one byte is written to the output stream. The byte to be
|
||||
* written is the eight low-order bits of the argument b. The 24 high-order
|
||||
* bits of b are ignored.
|
||||
*
|
||||
* @param byteVal
|
||||
* the byteValue to write.
|
||||
* @throws IOException
|
||||
* if an I/O error occurs. In particular, an IOException may be
|
||||
* thrown if the output stream has been closed.
|
||||
*/
|
||||
@Override
|
||||
public void write(final int byteVal) throws IOException {
|
||||
write(new byte[] { (byte) (byteVal & 0xFF) });
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes b.length bytes from the specified byte array to this output stream.
|
||||
*
|
||||
* @param data
|
||||
* the byte array to write.
|
||||
*
|
||||
* @throws IOException
|
||||
* if an I/O error occurs. In particular, an IOException may be
|
||||
* thrown if the output stream has been closed.
|
||||
*/
|
||||
@Override
|
||||
public void write(final byte[] data) throws IOException {
|
||||
write(data, 0, data.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes length bytes from the specified byte array starting at offset to
|
||||
* this output stream.
|
||||
*
|
||||
* @param data
|
||||
* the byte array to write.
|
||||
* @param offset
|
||||
* the start offset in the data.
|
||||
* @param length
|
||||
* the number of bytes to write.
|
||||
* @throws IOException
|
||||
* if an I/O error occurs. In particular, an IOException may be
|
||||
* thrown if the output stream has been closed.
|
||||
*/
|
||||
@Override
|
||||
public void write(final byte[] data, final int offset, final int length)
|
||||
throws IOException {
|
||||
|
||||
if (offset < 0 || length < 0 || length > data.length - offset) {
|
||||
throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
|
||||
}
|
||||
|
||||
writeInternal(data, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
|
||||
if (!initialized) {
|
||||
throw new IOException("Trying to close an uninitialized Append stream");
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (leaseFreed) {
|
||||
throw new IOException(String.format("Attempting to close an append stream on blob : %s "
|
||||
+ " that does not have lease on the Blob. Failing close", key));
|
||||
}
|
||||
|
||||
if (outBuffer.size() > 0) {
|
||||
uploadBlockToStorage(outBuffer.toByteArray());
|
||||
}
|
||||
|
||||
ioThreadPool.shutdown();
|
||||
|
||||
try {
|
||||
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
|
||||
LOG.error("Time out occured while waiting for IO request to finish in append"
|
||||
+ " for blob : {}", key);
|
||||
NativeAzureFileSystemHelper.logAllLiveStackTraces();
|
||||
throw new IOException("Timed out waiting for IO requests to finish");
|
||||
}
|
||||
} catch(InterruptedException intrEx) {
|
||||
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
|
||||
throw new IOException("Append Commit interrupted.");
|
||||
}
|
||||
|
||||
// Calling commit after all blocks are succesfully uploaded.
|
||||
if (lastError == null) {
|
||||
commitAppendBlocks();
|
||||
}
|
||||
|
||||
// Perform cleanup.
|
||||
cleanup();
|
||||
|
||||
if (lastError != null) {
|
||||
throw lastError;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that cleans up the append stream.
|
||||
*/
|
||||
private synchronized void cleanup() {
|
||||
|
||||
closed = true;
|
||||
|
||||
try {
|
||||
// Set the value of append lease to false if the value is set to true.
|
||||
updateBlobAppendMetadata(false, true);
|
||||
} catch(StorageException ex) {
|
||||
LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
|
||||
+ "Error Code : {}",
|
||||
key, ex, ex.getErrorCode());
|
||||
lastError = new IOException(ex);
|
||||
}
|
||||
|
||||
leaseFreed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to commit all the uncommited blocks to azure storage.
|
||||
* If the commit fails then blocks are automatically cleaned up
|
||||
* by Azure storage.
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void commitAppendBlocks() throws IOException {
|
||||
|
||||
SelfRenewingLease lease = null;
|
||||
|
||||
try {
|
||||
if (uncommittedBlockEntries.size() > 0) {
|
||||
|
||||
//Acquiring lease on the blob.
|
||||
lease = new SelfRenewingLease(blob);
|
||||
|
||||
// Downloading existing blocks
|
||||
List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
|
||||
new BlobRequestOptions(), opContext);
|
||||
|
||||
// Adding uncommitted blocks.
|
||||
blockEntries.addAll(uncommittedBlockEntries);
|
||||
|
||||
AccessCondition accessCondition = new AccessCondition();
|
||||
accessCondition.setLeaseID(lease.getLeaseID());
|
||||
blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
|
||||
uncommittedBlockEntries.clear();
|
||||
}
|
||||
} catch(StorageException ex) {
|
||||
LOG.error("Storage exception encountered during block commit phase of append for blob"
|
||||
+ " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
|
||||
throw new IOException("Encountered Exception while committing append blocks", ex);
|
||||
} finally {
|
||||
if (lease != null) {
|
||||
try {
|
||||
lease.free();
|
||||
} catch(StorageException ex) {
|
||||
LOG.debug("Exception encountered while releasing lease for "
|
||||
+ "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
|
||||
// Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
|
||||
* storage SDK.
|
||||
*/
|
||||
private void setBlocksCount() throws IOException {
|
||||
try {
|
||||
|
||||
if (nextBlockCount == UNSET_BLOCKS_COUNT) {
|
||||
|
||||
nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
|
||||
+ sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
|
||||
|
||||
List<BlockEntry> blockEntries =
|
||||
blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
|
||||
|
||||
nextBlockCount += blockEntries.size();
|
||||
|
||||
}
|
||||
} catch (StorageException ex) {
|
||||
LOG.debug("Encountered storage exception during setting next Block Count."
|
||||
+ " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that generates the next block id for uploading a block to azure storage.
|
||||
* @return String representing the block ID generated.
|
||||
* @throws IOException
|
||||
*/
|
||||
private String generateBlockId() throws IOException {
|
||||
|
||||
if (nextBlockCount == UNSET_BLOCKS_COUNT) {
|
||||
throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
|
||||
}
|
||||
|
||||
byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
|
||||
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a byte array that represents the data of a <code>long</code> value. This
|
||||
* utility method is copied from com.microsoft.azure.storage.core.Utility class.
|
||||
* This class is marked as internal, hence we clone the method here and not express
|
||||
* dependency on the Utility Class
|
||||
*
|
||||
* @param value
|
||||
* The value from which the byte array will be returned.
|
||||
*
|
||||
* @return A byte array that represents the data of the specified <code>long</code> value.
|
||||
*/
|
||||
private static byte[] getBytesFromLong(final long value) {
|
||||
final byte[] tempArray = new byte[8];
|
||||
|
||||
for (int m = 0; m < 8; m++) {
|
||||
tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
|
||||
}
|
||||
|
||||
return tempArray;
|
||||
}
|
||||
/**
|
||||
* Helper method that creates a thread to upload a block to azure storage.
|
||||
* @param payload
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
|
||||
|
||||
// upload payload to azure storage
|
||||
nextBlockCount++;
|
||||
String blockId = generateBlockId();
|
||||
// Since uploads of the Azure storage are done in parallel threads, we go ahead
|
||||
// add the blockId in the uncommitted list. If the upload of the block fails
|
||||
// we don't commit the blockIds.
|
||||
uncommittedBlockEntries.add(new BlockEntry(blockId));
|
||||
ioThreadPool.execute(new WriteRequest(payload, blockId));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Helper method to updated the Blob metadata during Append lease operations.
|
||||
* Blob metadata is updated to holdLease value only if the current lease
|
||||
* status is equal to testCondition and the last update on the blob metadata
|
||||
* is less that 30 secs old.
|
||||
* @param holdLease
|
||||
* @param testCondition
|
||||
* @return true if the updated lease operation was successful or false otherwise
|
||||
* @throws StorageException
|
||||
*/
|
||||
private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
|
||||
throws StorageException {
|
||||
|
||||
SelfRenewingLease lease = null;
|
||||
StorageException lastStorageException = null;
|
||||
int leaseRenewalRetryCount = 0;
|
||||
|
||||
/*
|
||||
* Updating the Blob metadata honours following algorithm based on
|
||||
* 1) If the append lease metadata is present
|
||||
* 2) Last updated time of the append lease
|
||||
* 3) Previous value of the Append lease metadata.
|
||||
*
|
||||
* The algorithm:
|
||||
* 1) If append lease metadata is not part of the Blob. In this case
|
||||
* this is the first client to Append so we update the metadata.
|
||||
* 2) If append lease metadata is present and timeout has occurred.
|
||||
* In this case irrespective of what the value of the append lease is we update the metadata.
|
||||
* 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
|
||||
* and timeout has not occurred, we update the metadata.
|
||||
* 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
|
||||
* and timeout has not occurred, we do not update metadata and return false.
|
||||
*
|
||||
*/
|
||||
while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
|
||||
|
||||
lastStorageException = null;
|
||||
|
||||
synchronized(this) {
|
||||
try {
|
||||
|
||||
final Calendar currentCalendar = Calendar
|
||||
.getInstance(Locale.US);
|
||||
currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
|
||||
long currentTime = currentCalendar.getTime().getTime();
|
||||
|
||||
// Acquire lease on the blob.
|
||||
lease = new SelfRenewingLease(blob);
|
||||
|
||||
blob.downloadAttributes(opContext);
|
||||
HashMap<String, String> metadata = blob.getMetadata();
|
||||
|
||||
if (metadata.containsKey(APPEND_LEASE)
|
||||
&& currentTime - Long.parseLong(
|
||||
metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
|
||||
&& !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
|
||||
metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
|
||||
blob.setMetadata(metadata);
|
||||
AccessCondition accessCondition = new AccessCondition();
|
||||
accessCondition.setLeaseID(lease.getLeaseID());
|
||||
blob.uploadMetadata(accessCondition, null, opContext);
|
||||
return true;
|
||||
|
||||
} catch (StorageException ex) {
|
||||
|
||||
lastStorageException = ex;
|
||||
LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
|
||||
+ "Error Code : {}",
|
||||
key, ex, ex.getErrorCode());
|
||||
leaseRenewalRetryCount++;
|
||||
|
||||
} finally {
|
||||
|
||||
if (lease != null) {
|
||||
try {
|
||||
lease.free();
|
||||
} catch(StorageException ex) {
|
||||
LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
|
||||
+ "during Append metadata operation. Storage Exception {} "
|
||||
+ "Error Code : {} ", key, ex, ex.getErrorCode());
|
||||
} finally {
|
||||
lease = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
|
||||
throw lastStorageException;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
|
||||
} catch(InterruptedException ex) {
|
||||
LOG.debug("Blob append metadata updated method interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The code should not enter here because the while loop will
|
||||
// always be executed and if the while loop is executed we
|
||||
// would returning from the while loop.
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
|
||||
* @param data
|
||||
* @param offset
|
||||
* @param length
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void writeInternal(final byte[] data, final int offset, final int length)
|
||||
throws IOException {
|
||||
|
||||
if (!initialized) {
|
||||
throw new IOException("Trying to write to an un-initialized Append stream");
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
}
|
||||
|
||||
if (leaseFreed) {
|
||||
throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
|
||||
}
|
||||
|
||||
byte[] currentData = new byte[length];
|
||||
System.arraycopy(data, offset, currentData, 0, length);
|
||||
|
||||
// check to see if the data to be appended exceeds the
|
||||
// buffer size. If so we upload a block to azure storage.
|
||||
while ((outBuffer.size() + currentData.length) > bufferSize) {
|
||||
|
||||
byte[] payload = new byte[bufferSize];
|
||||
|
||||
// Add data from the existing buffer
|
||||
System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
|
||||
|
||||
// Updating the available size in the payload
|
||||
int availableSpaceInPayload = bufferSize - outBuffer.size();
|
||||
|
||||
// Adding data from the current call
|
||||
System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
|
||||
|
||||
uploadBlockToStorage(payload);
|
||||
|
||||
// updating the currentData buffer
|
||||
byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
|
||||
System.arraycopy(currentData, availableSpaceInPayload,
|
||||
tempBuffer, 0, currentData.length - availableSpaceInPayload);
|
||||
currentData = tempBuffer;
|
||||
outBuffer = new ByteArrayOutputStream(bufferSize);
|
||||
}
|
||||
|
||||
outBuffer.write(currentData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runnable instance that uploads the block of data to azure storage.
|
||||
*
|
||||
*
|
||||
*/
|
||||
private class WriteRequest implements Runnable {
|
||||
private final byte[] dataPayload;
|
||||
private final String blockId;
|
||||
|
||||
public WriteRequest(byte[] dataPayload, String blockId) {
|
||||
this.dataPayload = dataPayload;
|
||||
this.blockId = blockId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
int uploadRetryAttempts = 0;
|
||||
IOException lastLocalException = null;
|
||||
while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
|
||||
try {
|
||||
|
||||
blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
|
||||
dataPayload.length, new BlobRequestOptions(), opContext);
|
||||
break;
|
||||
} catch(Exception ioe) {
|
||||
Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
|
||||
uploadRetryAttempts++;
|
||||
lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
|
||||
try {
|
||||
Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
|
||||
} catch(InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
|
||||
lastError = lastLocalException;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A ThreadFactory that creates uploader thread with
|
||||
* meaningful names helpful for debugging purposes.
|
||||
*/
|
||||
class UploaderThreadFactory implements ThreadFactory {
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
|
||||
threadSequenceNumber.getAndIncrement()));
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A deamon thread that renews the Append lease on the blob.
|
||||
* The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
|
||||
* the lease. If an error is encountered while renewing the lease
|
||||
* then an lease is released by this thread, which fails all other
|
||||
* operations.
|
||||
*/
|
||||
private class AppendRenewer implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
while (!leaseFreed) {
|
||||
|
||||
try {
|
||||
Thread.sleep(LEASE_RENEWAL_PERIOD);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.debug("Appender Renewer thread interrupted");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
Log.debug("Attempting to renew append lease on {}", key);
|
||||
|
||||
try {
|
||||
if (!leaseFreed) {
|
||||
// Update the blob metadata to renew the append lease
|
||||
if (!updateBlobAppendMetadata(true, true)) {
|
||||
LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
|
||||
leaseFreed = true;
|
||||
}
|
||||
}
|
||||
} catch (StorageException ex) {
|
||||
|
||||
LOG.debug("Lease renewal for Blob : {} encountered "
|
||||
+ "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
|
||||
|
||||
// We swallow the exception here because if the blob metadata is not updated for
|
||||
// APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
|
||||
// continue forward if it needs to append.
|
||||
leaseFreed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -31,7 +32,6 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.TreeSet;
|
||||
|
@ -41,7 +41,6 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.fs.azure.AzureException;
|
||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException;
|
|||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.StorageErrorCode;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
|
@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
throw new IOException("Unable to write RenamePending file for folder rename from "
|
||||
+ srcKey + " to " + dstKey, e);
|
||||
} finally {
|
||||
NativeAzureFileSystem.cleanup(LOG, output);
|
||||
NativeAzureFileSystemHelper.cleanup(LOG, output);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
|
||||
|
||||
/*
|
||||
* Property to enable Append API.
|
||||
*/
|
||||
public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
|
||||
|
||||
private class NativeAzureFsInputStream extends FSInputStream {
|
||||
private InputStream in;
|
||||
private final String key;
|
||||
|
@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
return result;
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException) {
|
||||
|
||||
|
@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
+ " Exception details: {} Error Code : {}",
|
||||
key, e, ((StorageException) innerException).getErrorCode());
|
||||
|
||||
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
}
|
||||
|
@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
return result;
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException) {
|
||||
|
||||
|
@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
+ " Exception details: {} Error Code : {}",
|
||||
key, e, ((StorageException) innerException).getErrorCode());
|
||||
|
||||
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
}
|
||||
|
@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
this.pos);
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
||||
|
@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
private static boolean suppressRetryPolicy = false;
|
||||
// A counter to create unique (within-process) names for my metrics sources.
|
||||
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
||||
|
||||
private boolean appendSupportEnabled = false;
|
||||
|
||||
public NativeAzureFileSystem() {
|
||||
// set store in initialize()
|
||||
|
@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
||||
MAX_AZURE_BLOCK_SIZE);
|
||||
|
||||
|
||||
this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
|
||||
LOG.debug("NativeAzureFileSystem. Initializing.");
|
||||
LOG.debug(" blockSize = {}",
|
||||
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
|
||||
|
@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
|
||||
throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
|
||||
if (!appendSupportEnabled) {
|
||||
throw new UnsupportedOperationException("Append Support not enabled");
|
||||
}
|
||||
|
||||
LOG.debug("Opening file: {} for append", f);
|
||||
|
||||
Path absolutePath = makeAbsolute(f);
|
||||
String key = pathToKey(absolutePath);
|
||||
FileMetadata meta = null;
|
||||
try {
|
||||
meta = store.retrieveMetadata(key);
|
||||
} catch(Exception ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
if (meta == null) {
|
||||
throw new FileNotFoundException(f.toString());
|
||||
}
|
||||
|
||||
if (meta.isDir()) {
|
||||
throw new FileNotFoundException(f.toString()
|
||||
+ " is a directory not a file.");
|
||||
}
|
||||
|
||||
if (store.isPageBlobKey(key)) {
|
||||
throw new IOException("Append not supported for Page Blobs");
|
||||
}
|
||||
|
||||
DataOutputStream appendStream = null;
|
||||
|
||||
try {
|
||||
appendStream = store.retrieveAppendStream(key, bufferSize);
|
||||
} catch (Exception ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
return new FSDataOutputStream(appendStream, statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
lease.free();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
NativeAzureFileSystem.cleanup(LOG, out);
|
||||
NativeAzureFileSystemHelper.cleanup(LOG, out);
|
||||
String msg = "Unable to free lease on " + parent.toUri();
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg, e);
|
||||
|
@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
metaFile = store.retrieveMetadata(key);
|
||||
} catch (IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
parentMetadata = store.retrieveMetadata(parentKey);
|
||||
} catch (IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException) {
|
||||
// Invalid State.
|
||||
|
@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// if the file not present. But not retrieving metadata here is an
|
||||
// unrecoverable state and can only happen if there is a race condition
|
||||
// hence throwing a IOException
|
||||
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new IOException("File " + f + " has a parent directory "
|
||||
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
|
||||
}
|
||||
|
@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
instrumentation.fileDeleted();
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
parentMetadata = store.retrieveMetadata(parentKey);
|
||||
} catch (IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException) {
|
||||
// Invalid State.
|
||||
|
@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// if the file not present. But not retrieving metadata here is an
|
||||
// unrecoverable state and can only happen if there is a race condition
|
||||
// hence throwing a IOException
|
||||
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
throw new IOException("File " + f + " has a parent directory "
|
||||
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
|
||||
}
|
||||
|
@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
priorLastKey);
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
instrumentation.fileDeleted();
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
store.delete(key);
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
meta = store.retrieveMetadata(key);
|
||||
} catch(Exception ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
meta = store.retrieveMetadata(key);
|
||||
} catch (IOException ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", f));
|
||||
}
|
||||
|
@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
|
||||
} catch (IOException ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
try {
|
||||
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
meta = store.retrieveMetadata(key);
|
||||
} catch(Exception ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
try {
|
||||
inputStream = store.retrieve(key);
|
||||
} catch(Exception ex) {
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
dstMetadata = store.retrieveMetadata(dstKey);
|
||||
} catch (IOException ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
|
||||
// there is a race condition. If there is another thread which deletes the destination
|
||||
// file or folder, then this thread calling rename should be able to continue with
|
||||
// rename gracefully. Hence the StorageException is swallowed here.
|
||||
if (innerException instanceof StorageException) {
|
||||
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
|
||||
+ "Swallowin the exception to handle race condition gracefully", dstKey);
|
||||
}
|
||||
|
@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
|
||||
} catch (IOException ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
|
||||
return false;
|
||||
|
@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
try {
|
||||
srcMetadata = store.retrieveMetadata(srcKey);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
LOG.debug("Source {} doesn't exists. Failing rename", src);
|
||||
return false;
|
||||
|
@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
store.rename(srcKey, dstKey);
|
||||
} catch(IOException ex) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
|
||||
return false;
|
||||
|
@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
try {
|
||||
metadata = store.retrieveMetadata(key);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
|
||||
}
|
||||
|
@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
try {
|
||||
metadata = store.retrieveMetadata(key);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
|
||||
}
|
||||
|
@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// Return to the caller with the randomized key.
|
||||
return randomizedKey;
|
||||
}
|
||||
|
||||
private static void cleanup(Logger log, java.io.Closeable closeable) {
|
||||
if (closeable != null) {
|
||||
try {
|
||||
closeable.close();
|
||||
} catch(IOException e) {
|
||||
if (log != null) {
|
||||
log.debug("Exception in closing {}", closeable, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to recursively check if the cause of the exception is
|
||||
* a Azure storage exception.
|
||||
*/
|
||||
private static Throwable checkForAzureStorageException(Exception e) {
|
||||
|
||||
Throwable innerException = e.getCause();
|
||||
|
||||
while (innerException != null
|
||||
&& !(innerException instanceof StorageException)) {
|
||||
innerException = innerException.getCause();
|
||||
}
|
||||
|
||||
return innerException;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to check if the AzureStorageException is
|
||||
* because backing blob was not found.
|
||||
*/
|
||||
private static boolean isFileNotFoundException(StorageException e) {
|
||||
|
||||
String errorCode = ((StorageException) e).getErrorCode();
|
||||
if (errorCode != null
|
||||
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|
||||
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|
||||
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|
||||
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.microsoft.azure.storage.StorageErrorCode;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
/**
|
||||
* Utility class that has helper methods.
|
||||
*
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
final class NativeAzureFileSystemHelper {
|
||||
|
||||
private NativeAzureFileSystemHelper() {
|
||||
// Hiding the cosnstructor as this is a utility class.
|
||||
}
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
|
||||
|
||||
public static void cleanup(Logger log, java.io.Closeable closeable) {
|
||||
if (closeable != null) {
|
||||
try {
|
||||
closeable.close();
|
||||
} catch(IOException e) {
|
||||
if (log != null) {
|
||||
log.debug("Exception in closing {}", closeable, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to recursively check if the cause of the exception is
|
||||
* a Azure storage exception.
|
||||
*/
|
||||
public static Throwable checkForAzureStorageException(Exception e) {
|
||||
|
||||
Throwable innerException = e.getCause();
|
||||
|
||||
while (innerException != null
|
||||
&& !(innerException instanceof StorageException)) {
|
||||
|
||||
innerException = innerException.getCause();
|
||||
}
|
||||
|
||||
return innerException;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to check if the AzureStorageException is
|
||||
* because backing blob was not found.
|
||||
*/
|
||||
public static boolean isFileNotFoundException(StorageException e) {
|
||||
|
||||
String errorCode = e.getErrorCode();
|
||||
if (errorCode != null
|
||||
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|
||||
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|
||||
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|
||||
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method that logs stack traces from all live threads.
|
||||
*/
|
||||
public static void logAllLiveStackTraces() {
|
||||
|
||||
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
|
||||
LOG.debug("Thread " + entry.getKey().getName());
|
||||
StackTraceElement[] trace = entry.getValue();
|
||||
for (int j = 0; j < trace.length; j++) {
|
||||
LOG.debug("\tat " + trace[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -107,4 +107,6 @@ interface NativeFileSystemStore {
|
|||
void delete(String key, SelfRenewingLease lease) throws IOException;
|
||||
|
||||
SelfRenewingLease acquireLease(String key) throws AzureException;
|
||||
|
||||
DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
|
||||
}
|
||||
|
|
|
@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
LOG.debug(ioThreadPool.toString());
|
||||
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
|
||||
LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
|
||||
logAllStackTraces();
|
||||
NativeAzureFileSystemHelper.logAllLiveStackTraces();
|
||||
LOG.debug(ioThreadPool.toString());
|
||||
throw new IOException("Timed out waiting for IO requests to finish");
|
||||
}
|
||||
|
@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
closed = true;
|
||||
}
|
||||
|
||||
// Log the stacks of all threads.
|
||||
private void logAllStackTraces() {
|
||||
Map liveThreads = Thread.getAllStackTraces();
|
||||
for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
|
||||
Thread key = (Thread) i.next();
|
||||
LOG.debug("Thread " + key.getName());
|
||||
StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
|
||||
for (int j = 0; j < trace.length; j++) {
|
||||
LOG.debug("\tat " + trace[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A single write request for data to write to Azure storage.
|
||||
|
|
|
@ -24,11 +24,13 @@ import java.io.OutputStream;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
|
@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials;
|
|||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.BlockEntry;
|
||||
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CopyState;
|
||||
|
@ -545,6 +549,30 @@ abstract class StorageInterface {
|
|||
void uploadMetadata(OperationContext opContext)
|
||||
throws StorageException;
|
||||
|
||||
/**
|
||||
* Uploads the blob's metadata to the storage service using the specified
|
||||
* lease ID, request options, and operation context.
|
||||
*
|
||||
* @param accessCondition
|
||||
* A {@link AccessCondition} object that represents the access conditions for the blob.
|
||||
*
|
||||
* @param options
|
||||
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
|
||||
* <code>null</code> will use the default request options from the associated service client (
|
||||
* {@link CloudBlobClient}).
|
||||
*
|
||||
* @param opContext
|
||||
* An {@link OperationContext} object that represents the context
|
||||
* for the current operation. This object is used to track requests
|
||||
* to the storage service, and to provide additional runtime
|
||||
* information about the operation.
|
||||
*
|
||||
* @throws StorageException
|
||||
* If a storage service error occurred.
|
||||
*/
|
||||
void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
|
||||
OperationContext opContext) throws StorageException;
|
||||
|
||||
void uploadProperties(OperationContext opContext,
|
||||
SelfRenewingLease lease)
|
||||
throws StorageException;
|
||||
|
@ -602,6 +630,63 @@ abstract class StorageInterface {
|
|||
OutputStream openOutputStream(
|
||||
BlobRequestOptions options,
|
||||
OperationContext opContext) throws StorageException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param filter A {@link BlockListingFilter} value that specifies whether to download
|
||||
* committed blocks, uncommitted blocks, or all blocks.
|
||||
* @param options A {@link BlobRequestOptions} object that specifies any additional options for
|
||||
* the request. Specifying null will use the default request options from
|
||||
* the associated service client ( CloudBlobClient).
|
||||
* @param opContext An {@link OperationContext} object that represents the context for the current
|
||||
* operation. This object is used to track requests to the storage service,
|
||||
* and to provide additional runtime information about the operation.
|
||||
* @return An ArrayList object of {@link BlockEntry} objects that represent the list
|
||||
* block items downloaded from the block blob.
|
||||
* @throws IOException If an I/O error occurred.
|
||||
* @throws StorageException If a storage service error occurred.
|
||||
*/
|
||||
List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
|
||||
* the length of all Block IDs must be identical.
|
||||
* @param sourceStream An {@link InputStream} object that represents the input stream to write to the
|
||||
* block blob.
|
||||
* @param length A long which represents the length, in bytes, of the stream data,
|
||||
* or -1 if unknown.
|
||||
* @param options A {@link BlobRequestOptions} object that specifies any additional options for the
|
||||
* request. Specifying null will use the default request options from the
|
||||
* associated service client ( CloudBlobClient).
|
||||
* @param opContext An {@link OperationContext} object that represents the context for the current operation.
|
||||
* This object is used to track requests to the storage service, and to provide
|
||||
* additional runtime information about the operation.
|
||||
* @throws IOException If an I/O error occurred.
|
||||
* @throws StorageException If a storage service error occurred.
|
||||
*/
|
||||
void uploadBlock(String blockId, InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list
|
||||
* block items being committed. The size field is ignored.
|
||||
* @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
|
||||
* @param options A {@link BlobRequestOptions} object that specifies any additional options for the
|
||||
* request. Specifying null will use the default request options from the associated
|
||||
* service client ( CloudBlobClient).
|
||||
* @param opContext An {@link OperationContext} object that represents the context for the current operation.
|
||||
* This object is used to track requests to the storage service, and to provide additional
|
||||
* runtime information about the operation.
|
||||
* @throws IOException If an I/O error occurred.
|
||||
* @throws StorageException If a storage service error occurred.
|
||||
*/
|
||||
void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.ArrayList;
|
|||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
|
@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri;
|
|||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.BlockEntry;
|
||||
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
@Override
|
||||
public void uploadMetadata(OperationContext opContext)
|
||||
throws StorageException {
|
||||
getBlob().uploadMetadata(null, null, opContext);
|
||||
uploadMetadata(null, null, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
|
||||
OperationContext opContext) throws StorageException{
|
||||
getBlob().uploadMetadata(accessConditions, options, opContext);
|
||||
}
|
||||
|
||||
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
|
||||
|
@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
getBlob().uploadProperties(null, null, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uploadBlock(String blockId, InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
|
||||
}
|
||||
}
|
||||
|
||||
static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
* [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
|
||||
* [Atomic Folder Rename](#Atomic_Folder_Rename)
|
||||
* [Accessing wasb URLs](#Accessing_wasb_URLs)
|
||||
* [Append API Support and Configuration](#Append_API_Support_and_Configuration)
|
||||
* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
|
||||
|
||||
## <a name="Introduction" />Introduction
|
||||
|
@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
|
|||
|
||||
## <a name="Limitations" />Limitations
|
||||
|
||||
* The append operation is not implemented.
|
||||
* File owner and group are persisted, but the permissions model is not enforced.
|
||||
Authorization occurs at the level of the entire Azure Blob Storage account.
|
||||
* File last access time is not tracked.
|
||||
|
@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
|
|||
This causes all bare paths, such as `/testDir/testFile` to resolve automatically
|
||||
to that file system.
|
||||
|
||||
### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
|
||||
|
||||
The Azure Blob Storage interface for Hadoop has optional support for Append API for
|
||||
single writer by setting the configuration `fs.azure.enable.append.support` to true.
|
||||
|
||||
For Example:
|
||||
|
||||
<property>
|
||||
<name>fs.azure.enable.append.support</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
|
||||
support does not enforce single writer internally but requires applications to guarantee this semantic.
|
||||
It becomes a responsibility of the application either to ensure single-threaded handling for a particular
|
||||
file path, or rely on some external locking mechanism of its own. Failure to do so will result in
|
||||
unexpected behavior.
|
||||
|
||||
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
|
||||
|
||||
The hadoop-azure module includes a full suite of unit tests. Most of the tests
|
||||
|
|
|
@ -32,11 +32,12 @@ import java.util.EnumSet;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.commons.httpclient.URIException;
|
||||
import org.apache.commons.httpclient.util.URIUtil;
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
|
@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri;
|
|||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.BlockEntry;
|
||||
import com.microsoft.azure.storage.blob.BlockListingFilter;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
|
||||
|
@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface {
|
|||
public CloudBlob getBlob() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
|
||||
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
|
||||
}
|
||||
@Override
|
||||
public void uploadBlock(String blockId, InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
|
||||
BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
|
||||
throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
|
||||
}
|
||||
|
||||
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
|
||||
OperationContext opContext) throws StorageException {
|
||||
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
|
||||
}
|
||||
}
|
||||
|
||||
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
|
||||
|
@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface {
|
|||
public CloudBlob getBlob() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
|
||||
OperationContext opContext) throws StorageException {
|
||||
throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,362 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest {
|
||||
|
||||
private static final String TEST_FILE = "test.dat";
|
||||
private static final Path TEST_PATH = new Path(TEST_FILE);
|
||||
|
||||
private AzureBlobStorageTestAccount testAccount = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
testAccount = createTestAccount();
|
||||
fs = testAccount.getFileSystem();
|
||||
Configuration conf = fs.getConf();
|
||||
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
|
||||
URI uri = fs.getUri();
|
||||
fs.initialize(uri, conf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method that creates test data of size provided by the
|
||||
* "size" parameter.
|
||||
*/
|
||||
private static byte[] getTestData(int size) {
|
||||
byte[] testData = new byte[size];
|
||||
System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
|
||||
return testData;
|
||||
}
|
||||
|
||||
// Helper method to create file and write fileSize bytes of data on it.
|
||||
private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
|
||||
|
||||
FSDataOutputStream createStream = null;
|
||||
try {
|
||||
createStream = fs.create(testPath);
|
||||
byte[] fileData = null;
|
||||
|
||||
if (fileSize != 0) {
|
||||
fileData = getTestData(fileSize);
|
||||
createStream.write(fileData);
|
||||
}
|
||||
return fileData;
|
||||
} finally {
|
||||
if (createStream != null) {
|
||||
createStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to verify a file data equal to "dataLength" parameter
|
||||
*/
|
||||
private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
|
||||
FSDataInputStream srcStream) {
|
||||
|
||||
try {
|
||||
|
||||
byte[] fileBuffer = new byte[dataLength];
|
||||
byte[] testDataBuffer = new byte[dataLength];
|
||||
|
||||
int fileBytesRead = srcStream.read(fileBuffer);
|
||||
|
||||
if (fileBytesRead < dataLength) {
|
||||
return false;
|
||||
}
|
||||
|
||||
System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
|
||||
|
||||
if (!Arrays.equals(fileBuffer, testDataBuffer)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
} catch (Exception ex) {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method to verify Append on a testFile.
|
||||
*/
|
||||
private boolean verifyAppend(byte[] testData, Path testFile) {
|
||||
|
||||
FSDataInputStream srcStream = null;
|
||||
try {
|
||||
|
||||
srcStream = fs.open(testFile);
|
||||
int baseBufferSize = 2048;
|
||||
int testDataSize = testData.length;
|
||||
int testDataIndex = 0;
|
||||
|
||||
while (testDataSize > baseBufferSize) {
|
||||
|
||||
if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
|
||||
return false;
|
||||
}
|
||||
testDataIndex += baseBufferSize;
|
||||
testDataSize -= baseBufferSize;
|
||||
}
|
||||
|
||||
if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch(Exception ex) {
|
||||
return false;
|
||||
} finally {
|
||||
if (srcStream != null) {
|
||||
try {
|
||||
srcStream.close();
|
||||
} catch(IOException ioe) {
|
||||
// Swallowing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test case to verify if an append on small size data works. This tests
|
||||
* append E2E
|
||||
*/
|
||||
@Test
|
||||
public void testSingleAppend() throws Throwable{
|
||||
|
||||
FSDataOutputStream appendStream = null;
|
||||
try {
|
||||
int baseDataSize = 50;
|
||||
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
|
||||
|
||||
int appendDataSize = 20;
|
||||
byte[] appendDataBuffer = getTestData(appendDataSize);
|
||||
appendStream = fs.append(TEST_PATH, 10);
|
||||
appendStream.write(appendDataBuffer);
|
||||
appendStream.close();
|
||||
byte[] testData = new byte[baseDataSize + appendDataSize];
|
||||
System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
|
||||
System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
|
||||
|
||||
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
|
||||
} finally {
|
||||
if (appendStream != null) {
|
||||
appendStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test case to verify append to an empty file.
|
||||
*/
|
||||
@Test
|
||||
public void testSingleAppendOnEmptyFile() throws Throwable {
|
||||
|
||||
FSDataOutputStream appendStream = null;
|
||||
|
||||
try {
|
||||
createBaseFileWithData(0, TEST_PATH);
|
||||
|
||||
int appendDataSize = 20;
|
||||
byte[] appendDataBuffer = getTestData(appendDataSize);
|
||||
appendStream = fs.append(TEST_PATH, 10);
|
||||
appendStream.write(appendDataBuffer);
|
||||
appendStream.close();
|
||||
|
||||
Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH));
|
||||
} finally {
|
||||
if (appendStream != null) {
|
||||
appendStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test to verify that we can open only one Append stream on a File.
|
||||
*/
|
||||
@Test
|
||||
public void testSingleAppenderScenario() throws Throwable {
|
||||
|
||||
FSDataOutputStream appendStream1 = null;
|
||||
FSDataOutputStream appendStream2 = null;
|
||||
IOException ioe = null;
|
||||
try {
|
||||
createBaseFileWithData(0, TEST_PATH);
|
||||
appendStream1 = fs.append(TEST_PATH, 10);
|
||||
boolean encounteredException = false;
|
||||
try {
|
||||
appendStream2 = fs.append(TEST_PATH, 10);
|
||||
} catch(IOException ex) {
|
||||
encounteredException = true;
|
||||
ioe = ex;
|
||||
}
|
||||
|
||||
appendStream1.close();
|
||||
|
||||
Assert.assertTrue(encounteredException);
|
||||
GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
|
||||
} finally {
|
||||
if (appendStream1 != null) {
|
||||
appendStream1.close();
|
||||
}
|
||||
|
||||
if (appendStream2 != null) {
|
||||
appendStream2.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Tests to verify multiple appends on a Blob.
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleAppends() throws Throwable {
|
||||
|
||||
int baseDataSize = 50;
|
||||
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
|
||||
|
||||
int appendDataSize = 100;
|
||||
int targetAppendCount = 50;
|
||||
byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
|
||||
int testDataIndex = 0;
|
||||
System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
|
||||
testDataIndex += baseDataSize;
|
||||
|
||||
int appendCount = 0;
|
||||
|
||||
FSDataOutputStream appendStream = null;
|
||||
|
||||
try {
|
||||
while (appendCount < targetAppendCount) {
|
||||
|
||||
byte[] appendDataBuffer = getTestData(appendDataSize);
|
||||
appendStream = fs.append(TEST_PATH, 30);
|
||||
appendStream.write(appendDataBuffer);
|
||||
appendStream.close();
|
||||
|
||||
System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
|
||||
testDataIndex += appendDataSize;
|
||||
appendCount++;
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
|
||||
|
||||
} finally {
|
||||
if (appendStream != null) {
|
||||
appendStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Test to verify we multiple appends on the same stream.
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleAppendsOnSameStream() throws Throwable {
|
||||
|
||||
int baseDataSize = 50;
|
||||
byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
|
||||
int appendDataSize = 100;
|
||||
int targetAppendCount = 50;
|
||||
byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
|
||||
int testDataIndex = 0;
|
||||
System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
|
||||
testDataIndex += baseDataSize;
|
||||
int appendCount = 0;
|
||||
|
||||
FSDataOutputStream appendStream = null;
|
||||
|
||||
try {
|
||||
|
||||
while (appendCount < targetAppendCount) {
|
||||
|
||||
appendStream = fs.append(TEST_PATH, 50);
|
||||
|
||||
int singleAppendChunkSize = 20;
|
||||
int appendRunSize = 0;
|
||||
while (appendRunSize < appendDataSize) {
|
||||
|
||||
byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
|
||||
appendStream.write(appendDataBuffer);
|
||||
System.arraycopy(appendDataBuffer, 0, testData,
|
||||
testDataIndex + appendRunSize, singleAppendChunkSize);
|
||||
|
||||
appendRunSize += singleAppendChunkSize;
|
||||
}
|
||||
|
||||
appendStream.close();
|
||||
testDataIndex += appendDataSize;
|
||||
appendCount++;
|
||||
}
|
||||
|
||||
Assert.assertTrue(verifyAppend(testData, TEST_PATH));
|
||||
} finally {
|
||||
if (appendStream != null) {
|
||||
appendStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected=UnsupportedOperationException.class)
|
||||
/*
|
||||
* Test to verify the behavior when Append Support configuration flag is set to false
|
||||
*/
|
||||
public void testFalseConfigurationFlagBehavior() throws Throwable {
|
||||
|
||||
fs = testAccount.getFileSystem();
|
||||
Configuration conf = fs.getConf();
|
||||
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
|
||||
URI uri = fs.getUri();
|
||||
fs.initialize(uri, conf);
|
||||
|
||||
FSDataOutputStream appendStream = null;
|
||||
|
||||
try {
|
||||
createBaseFileWithData(0, TEST_PATH);
|
||||
appendStream = fs.append(TEST_PATH, 10);
|
||||
} finally {
|
||||
if (appendStream != null) {
|
||||
appendStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
return AzureBlobStorageTestAccount.create();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue