HADOOP-11188. hadoop-azure: automatically expand page blobs when they become full. Contributed by Eric Hanson.

This commit is contained in:
cnauroth 2014-10-10 15:05:52 -07:00
parent d3d3d47202
commit f4b7e99f4e
3 changed files with 126 additions and 4 deletions

View File

@ -132,6 +132,9 @@ Trunk (Unreleased)
HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
HADOOP-11188. hadoop-azure: automatically expand page blobs when they become
full. (Eric Hanson via cnauroth)
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.

View File

@ -47,6 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.microsoft.windowsazure.storage.OperationContext;
import com.microsoft.windowsazure.storage.StorageException;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
/**
@ -82,6 +83,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
*/
private volatile IOException lastError;
/**
* Current size of the page blob in bytes. It may be extended if the file
* gets full.
*/
private long currentBlobSize;
/**
* The current byte offset we're at in the blob (how many bytes we've
* uploaded to the server).
@ -114,13 +120,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
// Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
// This default block size is often used as the hbase.regionserver.hlog.blocksize.
// Set the minimum page blob file size to 128MB, which is >> the default
// block size of 32MB. This default block size is often used as the
// hbase.regionserver.hlog.blocksize.
// The goal is to have a safe minimum size for HBase log files to allow them
// to be filled and rolled without exceeding the minimum size. A larger size can be
// used by setting the fs.azure.page.blob.size configuration variable.
// to be filled and rolled without exceeding the minimum size. A larger size
// can be used by setting the fs.azure.page.blob.size configuration variable.
public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
// The default and minimum amount to extend a page blob by if it starts
// to get full.
public static final long
PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L;
// The configured page blob extension size (either the default, or if greater,
// the value configured in fs.azure.page.blob.extension.size
private long configuredPageBlobExtensionSize;
/**
* Constructs an output stream over the given page blob.
*
@ -156,6 +172,21 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
currentBlobSize = pageBlobSize;
// Set the page blob extension size. It must be a minimum of the default
// value.
configuredPageBlobExtensionSize =
conf.getLong("fs.azure.page.blob.extension.size", 0);
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
}
// make sure it is a multiple of the page size
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
configuredPageBlobExtensionSize +=
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
}
}
private void checkStreamState() throws IOException {
@ -308,6 +339,12 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
// It wasn't a partial page, we won't need to rewrite it.
previousLastPageDataWritten = new byte[0];
}
// Extend the file if we need more room in the file. This typically takes
// less than 200 milliseconds if it has to actually be done,
// so it is okay to include it in a write and won't cause a long pause.
// Other writes can be queued behind this write in any case.
conditionalExtendFile();
}
/**
@ -346,6 +383,56 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
outBuffer = new ByteArrayOutputStream();
}
/**
* Extend the page blob file if we are close to the end.
*/
private void conditionalExtendFile() {
// maximum allowed size of an Azure page blob (1 terabyte)
final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L;
// If blob is already at the maximum size, then don't try to extend it.
if (currentBlobSize == MAX_PAGE_BLOB_SIZE) {
return;
}
// If we are within the maximum write size of the end of the file,
if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) {
// Extend the file. Retry up to 3 times with back-off.
CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob();
long newSize = currentBlobSize + configuredPageBlobExtensionSize;
// Make sure we don't exceed maximum blob size.
if (newSize > MAX_PAGE_BLOB_SIZE) {
newSize = MAX_PAGE_BLOB_SIZE;
}
final int MAX_RETRIES = 3;
int retries = 1;
boolean resizeDone = false;
while(!resizeDone && retries <= MAX_RETRIES) {
try {
cloudPageBlob.resize(newSize);
resizeDone = true;
currentBlobSize = newSize;
} catch (StorageException e) {
LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
try {
// sleep 2, 8, 18 seconds for up to 3 retries
Thread.sleep(2000 * retries * retries);
} catch (InterruptedException e1) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
} finally {
retries++;
}
}
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the buffer it is committed to the

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureException;
@ -330,4 +331,35 @@ public class TestReadAndSeekPageBlobAfterWrite {
writeAndReadOneFile(numWrites, recordSize, syncInterval);
}
}
// Write to a file repeatedly to verify that it extends.
// The page blob file should start out at 128MB and finish at 256MB.
@Test(timeout=300000)
public void testFileSizeExtension() throws IOException {
final int writeSize = 1024 * 1024;
final int numWrites = 129;
final byte dataByte = 5;
byte[] data = new byte[writeSize];
Arrays.fill(data, dataByte);
FSDataOutputStream output = fs.create(PATH);
try {
for (int i = 0; i < numWrites; i++) {
output.write(data);
output.hflush();
LOG.debug("total writes = " + (i + 1));
}
} finally {
output.close();
}
// Show that we wrote more than the default page blob file size.
assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
// Verify we can list the new size. That will prove we expanded the file.
FileStatus[] status = fs.listStatus(PATH);
assertTrue(status[0].getLen() == numWrites * writeSize);
LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen());
fs.delete(PATH, false);
}
}