HADOOP-11188. hadoop-azure: automatically expand page blobs when they become full. Contributed by Eric Hanson.
(cherry picked from commit f4b7e99f4e
)
This commit is contained in:
parent
5a737026cc
commit
1646cc9f29
|
@ -82,6 +82,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
|
||||
Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
|
||||
|
||||
HADOOP-11188. hadoop-azure: automatically expand page blobs when they become
|
||||
full. (Eric Hanson via cnauroth)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)
|
||||
|
|
|
@ -47,6 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.microsoft.windowsazure.storage.OperationContext;
|
||||
import com.microsoft.windowsazure.storage.StorageException;
|
||||
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -82,6 +83,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
*/
|
||||
private volatile IOException lastError;
|
||||
|
||||
/**
|
||||
* Current size of the page blob in bytes. It may be extended if the file
|
||||
* gets full.
|
||||
*/
|
||||
private long currentBlobSize;
|
||||
/**
|
||||
* The current byte offset we're at in the blob (how many bytes we've
|
||||
* uploaded to the server).
|
||||
|
@ -114,13 +120,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
|
||||
public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
|
||||
|
||||
// Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
|
||||
// This default block size is often used as the hbase.regionserver.hlog.blocksize.
|
||||
// Set the minimum page blob file size to 128MB, which is >> the default
|
||||
// block size of 32MB. This default block size is often used as the
|
||||
// hbase.regionserver.hlog.blocksize.
|
||||
// The goal is to have a safe minimum size for HBase log files to allow them
|
||||
// to be filled and rolled without exceeding the minimum size. A larger size can be
|
||||
// used by setting the fs.azure.page.blob.size configuration variable.
|
||||
// to be filled and rolled without exceeding the minimum size. A larger size
|
||||
// can be used by setting the fs.azure.page.blob.size configuration variable.
|
||||
public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
|
||||
|
||||
// The default and minimum amount to extend a page blob by if it starts
|
||||
// to get full.
|
||||
public static final long
|
||||
PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L;
|
||||
|
||||
// The configured page blob extension size (either the default, or if greater,
|
||||
// the value configured in fs.azure.page.blob.extension.size
|
||||
private long configuredPageBlobExtensionSize;
|
||||
|
||||
/**
|
||||
* Constructs an output stream over the given page blob.
|
||||
*
|
||||
|
@ -156,6 +172,21 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
|
||||
}
|
||||
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
|
||||
currentBlobSize = pageBlobSize;
|
||||
|
||||
// Set the page blob extension size. It must be a minimum of the default
|
||||
// value.
|
||||
configuredPageBlobExtensionSize =
|
||||
conf.getLong("fs.azure.page.blob.extension.size", 0);
|
||||
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
|
||||
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
|
||||
}
|
||||
|
||||
// make sure it is a multiple of the page size
|
||||
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
|
||||
configuredPageBlobExtensionSize +=
|
||||
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
private void checkStreamState() throws IOException {
|
||||
|
@ -308,6 +339,12 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
// It wasn't a partial page, we won't need to rewrite it.
|
||||
previousLastPageDataWritten = new byte[0];
|
||||
}
|
||||
|
||||
// Extend the file if we need more room in the file. This typically takes
|
||||
// less than 200 milliseconds if it has to actually be done,
|
||||
// so it is okay to include it in a write and won't cause a long pause.
|
||||
// Other writes can be queued behind this write in any case.
|
||||
conditionalExtendFile();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -346,6 +383,56 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
|
|||
outBuffer = new ByteArrayOutputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extend the page blob file if we are close to the end.
|
||||
*/
|
||||
private void conditionalExtendFile() {
|
||||
|
||||
// maximum allowed size of an Azure page blob (1 terabyte)
|
||||
final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L;
|
||||
|
||||
// If blob is already at the maximum size, then don't try to extend it.
|
||||
if (currentBlobSize == MAX_PAGE_BLOB_SIZE) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If we are within the maximum write size of the end of the file,
|
||||
if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) {
|
||||
|
||||
// Extend the file. Retry up to 3 times with back-off.
|
||||
CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob();
|
||||
long newSize = currentBlobSize + configuredPageBlobExtensionSize;
|
||||
|
||||
// Make sure we don't exceed maximum blob size.
|
||||
if (newSize > MAX_PAGE_BLOB_SIZE) {
|
||||
newSize = MAX_PAGE_BLOB_SIZE;
|
||||
}
|
||||
final int MAX_RETRIES = 3;
|
||||
int retries = 1;
|
||||
boolean resizeDone = false;
|
||||
while(!resizeDone && retries <= MAX_RETRIES) {
|
||||
try {
|
||||
cloudPageBlob.resize(newSize);
|
||||
resizeDone = true;
|
||||
currentBlobSize = newSize;
|
||||
} catch (StorageException e) {
|
||||
LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
|
||||
try {
|
||||
|
||||
// sleep 2, 8, 18 seconds for up to 3 retries
|
||||
Thread.sleep(2000 * retries * retries);
|
||||
} catch (InterruptedException e1) {
|
||||
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} finally {
|
||||
retries++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes this output stream and forces any buffered output bytes to be
|
||||
* written out. If any data remains in the buffer it is committed to the
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azure.AzureException;
|
||||
|
@ -330,4 +331,35 @@ public class TestReadAndSeekPageBlobAfterWrite {
|
|||
writeAndReadOneFile(numWrites, recordSize, syncInterval);
|
||||
}
|
||||
}
|
||||
|
||||
// Write to a file repeatedly to verify that it extends.
|
||||
// The page blob file should start out at 128MB and finish at 256MB.
|
||||
@Test(timeout=300000)
|
||||
public void testFileSizeExtension() throws IOException {
|
||||
final int writeSize = 1024 * 1024;
|
||||
final int numWrites = 129;
|
||||
final byte dataByte = 5;
|
||||
byte[] data = new byte[writeSize];
|
||||
Arrays.fill(data, dataByte);
|
||||
FSDataOutputStream output = fs.create(PATH);
|
||||
try {
|
||||
for (int i = 0; i < numWrites; i++) {
|
||||
output.write(data);
|
||||
output.hflush();
|
||||
LOG.debug("total writes = " + (i + 1));
|
||||
}
|
||||
} finally {
|
||||
output.close();
|
||||
}
|
||||
|
||||
// Show that we wrote more than the default page blob file size.
|
||||
assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
|
||||
|
||||
// Verify we can list the new size. That will prove we expanded the file.
|
||||
FileStatus[] status = fs.listStatus(PATH);
|
||||
assertTrue(status[0].getLen() == numWrites * writeSize);
|
||||
LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen());
|
||||
fs.delete(PATH, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue