HADOOP-14520. WASB: Block compaction for Azure Block Blobs.
Contributed by Georgi Chalakov
This commit is contained in:
parent
416a44004f
commit
87af3f4991
|
@ -202,6 +202,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
*/
|
||||
private Set<String> pageBlobDirs;
|
||||
|
||||
/**
|
||||
* Configuration key to indicate the set of directories in WASB where we
|
||||
* should store files as block blobs with block compaction enabled.
|
||||
*
|
||||
* Entries can be directory paths relative to the container (e.g. "/path") or
|
||||
* fully qualified wasb:// URIs (e.g.
|
||||
* wasb://container@example.blob.core.windows.net/path)
|
||||
*/
|
||||
public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES =
|
||||
"fs.azure.block.blob.with.compaction.dir";
|
||||
|
||||
/**
|
||||
* The set of directories where we should store files as block blobs with
|
||||
* block compaction enabled.
|
||||
*/
|
||||
private Set<String> blockBlobWithCompationDirs;
|
||||
|
||||
/**
|
||||
* Configuration key to indicate the set of directories in WASB where
|
||||
* we should do atomic folder rename synchronized with createNonRecursive.
|
||||
|
@ -527,6 +544,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// User-agent
|
||||
userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT);
|
||||
|
||||
// Extract the directories that should contain block blobs with compaction
|
||||
blockBlobWithCompationDirs = getDirectorySet(
|
||||
KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES);
|
||||
LOG.debug("Block blobs with compaction directories: {}",
|
||||
setToString(blockBlobWithCompationDirs));
|
||||
|
||||
// Extract directories that should have atomic rename applied.
|
||||
atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
|
||||
String hbaseRoot;
|
||||
|
@ -1164,6 +1187,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
return isKeyForDirectorySet(key, pageBlobDirs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given key in Azure Storage should be stored as a block blobs
|
||||
* with compaction enabled instead of normal block blob.
|
||||
*
|
||||
* @param key blob name
|
||||
* @return true, if the file is in directory with block compaction enabled.
|
||||
*/
|
||||
public boolean isBlockBlobWithCompactionKey(String key) {
|
||||
return isKeyForDirectorySet(key, blockBlobWithCompationDirs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given key in Azure storage should have synchronized
|
||||
* atomic folder rename createNonRecursive implemented.
|
||||
|
@ -1356,7 +1390,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
|
||||
public DataOutputStream storefile(String keyEncoded,
|
||||
PermissionStatus permissionStatus,
|
||||
String key)
|
||||
throws AzureException {
|
||||
try {
|
||||
|
||||
|
@ -1417,12 +1453,26 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
// Get the blob reference from the store's container and
|
||||
// return it.
|
||||
CloudBlobWrapper blob = getBlobReference(key);
|
||||
CloudBlobWrapper blob = getBlobReference(keyEncoded);
|
||||
storePermissionStatus(blob, permissionStatus);
|
||||
|
||||
// Create the output stream for the Azure blob.
|
||||
//
|
||||
OutputStream outputStream = openOutputStream(blob);
|
||||
OutputStream outputStream;
|
||||
|
||||
if (isBlockBlobWithCompactionKey(key)) {
|
||||
BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
|
||||
(CloudBlockBlobWrapper) blob,
|
||||
keyEncoded,
|
||||
this.uploadBlockSizeBytes,
|
||||
true,
|
||||
getInstrumentedContext());
|
||||
|
||||
outputStream = blockBlobOutputStream;
|
||||
} else {
|
||||
outputStream = openOutputStream(blob);
|
||||
}
|
||||
|
||||
DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
|
||||
return dataOutStream;
|
||||
} catch (Exception e) {
|
||||
|
@ -2863,10 +2913,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
|
||||
|
||||
BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
|
||||
appendStream.initialize();
|
||||
OutputStream outputStream;
|
||||
|
||||
return new DataOutputStream(appendStream);
|
||||
BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
|
||||
(CloudBlockBlobWrapper) blob,
|
||||
key,
|
||||
bufferSize,
|
||||
isBlockBlobWithCompactionKey(key),
|
||||
getInstrumentedContext());
|
||||
|
||||
outputStream = blockBlobOutputStream;
|
||||
|
||||
DataOutputStream dataOutStream = new SyncableDataOutputStream(
|
||||
outputStream);
|
||||
|
||||
return dataOutStream;
|
||||
} catch(Exception ex) {
|
||||
throw new AzureException(ex);
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -57,6 +57,8 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
|
@ -350,9 +352,9 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
|
||||
* This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
|
||||
* method.
|
||||
*
|
||||
*
|
||||
* Produce a string in double quotes with backslash sequences in all the
|
||||
* right places. A backslash will be inserted within </, allowing JSON
|
||||
* text to be delivered in HTML. In JSON text, a string cannot contain a
|
||||
|
@ -945,11 +947,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
private class NativeAzureFsOutputStream extends OutputStream {
|
||||
// We should not override flush() to actually close current block and flush
|
||||
// to DFS, this will break applications that assume flush() is a no-op.
|
||||
// Applications are advised to use Syncable.hflush() for that purpose.
|
||||
// NativeAzureFsOutputStream needs to implement Syncable if needed.
|
||||
/**
|
||||
* Azure output stream; wraps an inner stream of different types.
|
||||
*/
|
||||
public class NativeAzureFsOutputStream extends OutputStream
|
||||
implements Syncable, StreamCapabilities {
|
||||
private String key;
|
||||
private String keyEncoded;
|
||||
private OutputStream out;
|
||||
|
@ -981,6 +983,57 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
setEncodedKey(anEncodedKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a reference to the wrapped output stream.
|
||||
*
|
||||
* @return the underlying output stream
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
public OutputStream getOutStream() {
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override // Syncable
|
||||
public void sync() throws IOException {
|
||||
if (out instanceof Syncable) {
|
||||
((Syncable) out).hflush();
|
||||
} else {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override // Syncable
|
||||
public void hflush() throws IOException {
|
||||
if (out instanceof Syncable) {
|
||||
((Syncable) out).hflush();
|
||||
} else {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override // Syncable
|
||||
public void hsync() throws IOException {
|
||||
if (out instanceof Syncable) {
|
||||
((Syncable) out).hsync();
|
||||
} else {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagate probe of stream capabilities to nested stream
|
||||
* (if supported), else return false.
|
||||
* @param capability string to query the stream support for.
|
||||
* @return true if the nested stream supports the specific capability.
|
||||
*/
|
||||
@Override // StreamCapability
|
||||
public boolean hasCapability(String capability) {
|
||||
if (out instanceof StreamCapabilities) {
|
||||
return ((StreamCapabilities) out).hasCapability(capability);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (out != null) {
|
||||
|
@ -988,8 +1041,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// before returning to the caller.
|
||||
//
|
||||
out.close();
|
||||
restoreKey();
|
||||
out = null;
|
||||
try {
|
||||
restoreKey();
|
||||
} finally {
|
||||
out = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1043,10 +1099,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
/**
|
||||
* Writes <code>len</code> from the specified byte array starting at offset
|
||||
* <code>off</code> to the output stream. The general contract for write(b,
|
||||
* off, len) is that some of the bytes in the array <code>
|
||||
* b</code b> are written to the output stream in order; element
|
||||
* <code>b[off]</code> is the first byte written and
|
||||
* <code>b[off+len-1]</code> is the last byte written by this operation.
|
||||
* off, len) is that some of the bytes in the array <code>b</code>
|
||||
* are written to the output stream in order; element <code>b[off]</code>
|
||||
* is the first byte written and <code>b[off+len-1]</code> is the last
|
||||
* byte written by this operation.
|
||||
*
|
||||
* @param b
|
||||
* Byte array to be written.
|
||||
|
@ -1747,7 +1803,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
OutputStream bufOutStream;
|
||||
if (store.isPageBlobKey(key)) {
|
||||
// Store page blobs directly in-place without renames.
|
||||
bufOutStream = store.storefile(key, permissionStatus);
|
||||
bufOutStream = store.storefile(key, permissionStatus, key);
|
||||
} else {
|
||||
// This is a block blob, so open the output blob stream based on the
|
||||
// encoded key.
|
||||
|
@ -1775,7 +1831,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// these
|
||||
// blocks.
|
||||
bufOutStream = new NativeAzureFsOutputStream(store.storefile(
|
||||
keyEncoded, permissionStatus), key, keyEncoded);
|
||||
keyEncoded, permissionStatus, key), key, keyEncoded);
|
||||
}
|
||||
// Construct the data output stream from the buffered output stream.
|
||||
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
|
||||
|
|
|
@ -50,8 +50,9 @@ interface NativeFileSystemStore {
|
|||
|
||||
InputStream retrieve(String key, long byteRangeStart) throws IOException;
|
||||
|
||||
DataOutputStream storefile(String key, PermissionStatus permissionStatus)
|
||||
throws AzureException;
|
||||
DataOutputStream storefile(String keyEncoded,
|
||||
PermissionStatus permissionStatus,
|
||||
String key) throws AzureException;
|
||||
|
||||
boolean isPageBlobKey(String key);
|
||||
|
||||
|
|
|
@ -519,7 +519,7 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
|
||||
@Override
|
||||
public SelfRenewingLease acquireLease() throws StorageException {
|
||||
return new SelfRenewingLease(this);
|
||||
return new SelfRenewingLease(this, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -557,10 +557,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void uploadBlock(String blockId, InputStream sourceStream,
|
||||
public void uploadBlock(String blockId, AccessCondition accessCondition,
|
||||
InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
|
||||
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length,
|
||||
accessCondition, options, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -593,4 +595,4 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
null, options, opContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import com.microsoft.azure.storage.blob.CloudBlob;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT;
|
||||
|
||||
/**
|
||||
* An Azure blob lease that automatically renews itself indefinitely
|
||||
* using a background thread. Use it to synchronize distributed processes,
|
||||
|
@ -66,7 +68,7 @@ public class SelfRenewingLease {
|
|||
@VisibleForTesting
|
||||
static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
|
||||
|
||||
public SelfRenewingLease(CloudBlobWrapper blobWrapper)
|
||||
public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent)
|
||||
throws StorageException {
|
||||
|
||||
this.leaseFreed = false;
|
||||
|
@ -79,10 +81,14 @@ public class SelfRenewingLease {
|
|||
leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
|
||||
} catch (StorageException e) {
|
||||
|
||||
if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Throw again if we don't want to keep waiting.
|
||||
// We expect it to be that the lease is already present,
|
||||
// or in some cases that the blob does not exist.
|
||||
if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) {
|
||||
if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) {
|
||||
LOG.info(
|
||||
"Caught exception when trying to get lease on blob "
|
||||
+ blobWrapper.getUri().toString() + ". " + e.getMessage());
|
||||
|
|
|
@ -665,6 +665,7 @@ abstract class StorageInterface {
|
|||
*
|
||||
* @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
|
||||
* the length of all Block IDs must be identical.
|
||||
* @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
|
||||
* @param sourceStream An {@link InputStream} object that represents the input stream to write to the
|
||||
* block blob.
|
||||
* @param length A long which represents the length, in bytes, of the stream data,
|
||||
|
@ -678,7 +679,7 @@ abstract class StorageInterface {
|
|||
* @throws IOException If an I/O error occurred.
|
||||
* @throws StorageException If a storage service error occurred.
|
||||
*/
|
||||
void uploadBlock(String blockId, InputStream sourceStream,
|
||||
void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException;
|
||||
|
||||
|
|
|
@ -277,7 +277,7 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
|
||||
return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CloudBlobWrapper getPageBlobReference(String relativePath)
|
||||
throws URISyntaxException, StorageException {
|
||||
|
@ -286,7 +286,7 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
|
||||
private final CloudBlob blob;
|
||||
|
||||
|
@ -441,10 +441,10 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
|
||||
@Override
|
||||
public SelfRenewingLease acquireLease() throws StorageException {
|
||||
return new SelfRenewingLease(this);
|
||||
return new SelfRenewingLease(this, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//
|
||||
// CloudBlockBlobWrapperImpl
|
||||
|
@ -479,10 +479,10 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void uploadBlock(String blockId, InputStream sourceStream,
|
||||
public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
|
||||
((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.OutputStream;
|
|||
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Support the Syncable interface on top of a DataOutputStream.
|
||||
|
@ -38,6 +39,16 @@ public class SyncableDataOutputStream extends DataOutputStream
|
|||
super(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a reference to the wrapped output stream.
|
||||
*
|
||||
* @return the underlying output stream
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
public OutputStream getOutStream() {
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCapability(String capability) {
|
||||
if (out instanceof StreamCapabilities) {
|
||||
|
@ -47,9 +58,7 @@ public class SyncableDataOutputStream extends DataOutputStream
|
|||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void sync() throws IOException {
|
||||
hflush();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -150,6 +150,40 @@ line argument:
|
|||
|
||||
```
|
||||
|
||||
### Block Blob with Compaction Support and Configuration
|
||||
|
||||
Block blobs are the default kind of blob and are good for most big-data use
|
||||
cases. However, block blobs have strict limit of 50,000 blocks per blob.
|
||||
To prevent reaching the limit WASB, by default, does not upload new block to
|
||||
the service after every `hflush()` or `hsync()`.
|
||||
|
||||
For most of the cases, combining data from multiple `write()` calls in
|
||||
blocks of 4Mb is a good optimization. But, in others cases, like HBase log files,
|
||||
every call to `hflush()` or `hsync()` must upload the data to the service.
|
||||
|
||||
Block blobs with compaction upload the data to the cloud service after every
|
||||
`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush()
|
||||
`/`hsync()` runs once compaction process, if number of blocks in the blob
|
||||
is above 32,000.
|
||||
|
||||
Block compaction search and replaces a sequence of small blocks with one big
|
||||
block. That means there is associated cost with block compaction: reading
|
||||
small blocks back to the client and writing it again as one big block.
|
||||
|
||||
In order to have the files you create be block blobs with block compaction
|
||||
enabled, the client must set the configuration variable
|
||||
`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of
|
||||
folder names.
|
||||
|
||||
For example:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.block.blob.with.compaction.dir</name>
|
||||
<value>/hbase/WALs,/data/myblobfiles</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
### Page Blob Support and Configuration
|
||||
|
||||
The Azure Blob Storage interface for Hadoop supports two kinds of blobs,
|
||||
|
|
|
@ -551,7 +551,8 @@ public class MockStorageInterface extends StorageInterface {
|
|||
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
|
||||
}
|
||||
@Override
|
||||
public void uploadBlock(String blockId, InputStream sourceStream,
|
||||
public void uploadBlock(String blockId, AccessCondition accessCondition,
|
||||
InputStream sourceStream,
|
||||
long length, BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException, StorageException {
|
||||
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
|
||||
|
|
|
@ -107,7 +107,8 @@ public class TestAzureConcurrentOutOfBandIo {
|
|||
//
|
||||
outputStream = writerStorageAccount.getStore().storefile(
|
||||
key,
|
||||
new PermissionStatus("", "", FsPermission.getDefault()));
|
||||
new PermissionStatus("", "", FsPermission.getDefault()),
|
||||
key);
|
||||
|
||||
Arrays.fill(dataBlockWrite, (byte) (i % 256));
|
||||
for (int j = 0; j < NUMBER_OF_BLOCKS; j++) {
|
||||
|
@ -141,7 +142,8 @@ public class TestAzureConcurrentOutOfBandIo {
|
|||
// reading. This eliminates the race between the reader and writer threads.
|
||||
OutputStream outputStream = testAccount.getStore().storefile(
|
||||
"WASB_String.txt",
|
||||
new PermissionStatus("", "", FsPermission.getDefault()));
|
||||
new PermissionStatus("", "", FsPermission.getDefault()),
|
||||
"WASB_String.txt");
|
||||
Arrays.fill(dataBlockWrite, (byte) 255);
|
||||
for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
|
||||
outputStream.write(dataBlockWrite);
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import com.microsoft.azure.storage.blob.BlockEntry;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Test class that runs WASB block compaction process for block blobs.
|
||||
*/
|
||||
|
||||
public class TestNativeAzureFileSystemBlockCompaction extends AbstractWasbTestBase {
|
||||
|
||||
private static final String TEST_FILE = "/user/active/test.dat";
|
||||
private static final Path TEST_PATH = new Path(TEST_FILE);
|
||||
|
||||
private static final String TEST_FILE_NORMAL = "/user/normal/test.dat";
|
||||
private static final Path TEST_PATH_NORMAL = new Path(TEST_FILE_NORMAL);
|
||||
|
||||
private AzureBlobStorageTestAccount testAccount = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
testAccount = createTestAccount();
|
||||
fs = testAccount.getFileSystem();
|
||||
Configuration conf = fs.getConf();
|
||||
conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
|
||||
conf.set(AzureNativeFileSystemStore.KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES, "/user/active");
|
||||
URI uri = fs.getUri();
|
||||
fs.initialize(uri, conf);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper method that creates test data of size provided by the
|
||||
* "size" parameter.
|
||||
*/
|
||||
private static byte[] getTestData(int size) {
|
||||
byte[] testData = new byte[size];
|
||||
System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
|
||||
return testData;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
return AzureBlobStorageTestAccount.create();
|
||||
}
|
||||
|
||||
private BlockBlobAppendStream getBlockBlobAppendStream(FSDataOutputStream appendStream) {
|
||||
SyncableDataOutputStream dataOutputStream = null;
|
||||
|
||||
if (appendStream.getWrappedStream() instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
|
||||
NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
|
||||
(NativeAzureFileSystem.NativeAzureFsOutputStream) appendStream.getWrappedStream();
|
||||
|
||||
dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
|
||||
}
|
||||
|
||||
if (appendStream.getWrappedStream() instanceof SyncableDataOutputStream) {
|
||||
dataOutputStream = (SyncableDataOutputStream) appendStream.getWrappedStream();
|
||||
}
|
||||
|
||||
Assert.assertNotNull("Did not recognize " + dataOutputStream,
|
||||
dataOutputStream);
|
||||
|
||||
return (BlockBlobAppendStream) dataOutputStream.getOutStream();
|
||||
}
|
||||
|
||||
private void verifyBlockList(BlockBlobAppendStream blockBlobStream,
|
||||
int[] testData) throws Throwable {
|
||||
List<BlockEntry> blockList = blockBlobStream.getBlockList();
|
||||
Assert.assertEquals("Block list length", testData.length, blockList.size());
|
||||
|
||||
int i = 0;
|
||||
for (BlockEntry block: blockList) {
|
||||
Assert.assertTrue(block.getSize() == testData[i++]);
|
||||
}
|
||||
}
|
||||
|
||||
private void appendBlockList(FSDataOutputStream fsStream,
|
||||
ByteArrayOutputStream memStream,
|
||||
int[] testData) throws Throwable {
|
||||
|
||||
for (int d: testData) {
|
||||
byte[] data = getTestData(d);
|
||||
memStream.write(data);
|
||||
fsStream.write(data);
|
||||
}
|
||||
fsStream.hflush();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionDisabled() throws Throwable {
|
||||
|
||||
try (FSDataOutputStream appendStream = fs.create(TEST_PATH_NORMAL)) {
|
||||
|
||||
// testing new file
|
||||
|
||||
SyncableDataOutputStream dataOutputStream = null;
|
||||
|
||||
OutputStream wrappedStream = appendStream.getWrappedStream();
|
||||
if (wrappedStream instanceof NativeAzureFileSystem.NativeAzureFsOutputStream) {
|
||||
NativeAzureFileSystem.NativeAzureFsOutputStream fsOutputStream =
|
||||
(NativeAzureFileSystem.NativeAzureFsOutputStream) wrappedStream;
|
||||
|
||||
dataOutputStream = (SyncableDataOutputStream) fsOutputStream.getOutStream();
|
||||
} else if (wrappedStream instanceof SyncableDataOutputStream) {
|
||||
dataOutputStream = (SyncableDataOutputStream) wrappedStream;
|
||||
} else {
|
||||
Assert.fail("Unable to determine type of " + wrappedStream
|
||||
+ " class of " + wrappedStream.getClass());
|
||||
}
|
||||
|
||||
Assert.assertFalse("Data output stream is a BlockBlobAppendStream: "
|
||||
+ dataOutputStream,
|
||||
dataOutputStream.getOutStream() instanceof BlockBlobAppendStream);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompaction() throws Throwable {
|
||||
|
||||
final int n2 = 2;
|
||||
final int n4 = 4;
|
||||
final int n10 = 10;
|
||||
final int n12 = 12;
|
||||
final int n14 = 14;
|
||||
final int n16 = 16;
|
||||
|
||||
final int maxBlockSize = 16;
|
||||
final int compactionBlockCount = 4;
|
||||
|
||||
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
|
||||
|
||||
try (FSDataOutputStream appendStream = fs.create(TEST_PATH)) {
|
||||
|
||||
// test new file
|
||||
|
||||
BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
|
||||
blockBlobStream.setMaxBlockSize(maxBlockSize);
|
||||
blockBlobStream.setCompactionBlockCount(compactionBlockCount);
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n2});
|
||||
verifyBlockList(blockBlobStream, new int[]{n2});
|
||||
|
||||
appendStream.hflush();
|
||||
verifyBlockList(blockBlobStream, new int[]{n2});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n2, n4});
|
||||
|
||||
appendStream.hsync();
|
||||
verifyBlockList(blockBlobStream, new int[]{n2, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n2, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n2, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n14, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n14, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n14, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n2, n4, n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n14, n12, n10});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream, new int[]{n14, n12, n10, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream,
|
||||
new int[]{n4, n4, n4, n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16});
|
||||
|
||||
appendBlockList(appendStream, memStream,
|
||||
new int[]{n4, n4, n4, n4, n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream,
|
||||
new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream,
|
||||
new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream,
|
||||
new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n4, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
|
||||
appendStream.close();
|
||||
|
||||
ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
|
||||
}
|
||||
|
||||
try (FSDataOutputStream appendStream = fs.append(TEST_PATH)) {
|
||||
|
||||
// test existing file
|
||||
|
||||
BlockBlobAppendStream blockBlobStream = getBlockBlobAppendStream(appendStream);
|
||||
blockBlobStream.setMaxBlockSize(maxBlockSize);
|
||||
blockBlobStream.setCompactionBlockCount(compactionBlockCount);
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n16, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n16, n4, n4, n4, n4});
|
||||
|
||||
appendBlockList(appendStream, memStream, new int[]{n4});
|
||||
verifyBlockList(blockBlobStream,
|
||||
new int[]{n14, n12, n14, n16, n16, n16, n16, n4});
|
||||
|
||||
appendStream.close();
|
||||
|
||||
ContractTestUtils.verifyFileContents(fs, TEST_PATH, memStream.toByteArray());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue