From acffe203b8128989a1cde872dc5576c810e5a0f0 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Tue, 21 Sep 2021 17:18:06 +0530 Subject: [PATCH] HADOOP-17195. ABFS: OutOfMemory error while uploading huge files (#3446) Addresses the problem of processes running out of memory when there are many ABFS output streams queuing data to upload, especially when the network upload bandwidth is less than the rate data is generated. ABFS Output streams now buffer their blocks of data to "disk", "bytebuffer" or "array", as set in "fs.azure.data.blocks.buffer" When buffering via disk, the location for temporary storage is set in "fs.azure.buffer.dir" For safe scaling: use "disk" (default); for performance, when confident that upload bandwidth will never be a bottleneck, experiment with the memory options. The number of blocks a single stream can have queued for uploading is set in "fs.azure.block.upload.active.blocks". The default value is 20. Contributed by Mehakmeet Singh. --- .../hadoop/fs/CommonConfigurationKeys.java | 5 + .../fs/statistics/StreamStatisticNames.java | 12 + .../fs/store/BlockUploadStatistics.java | 33 + .../apache/hadoop/fs/store/DataBlocks.java | 1123 +++++++++++++++++ .../src/main/resources/core-default.xml | 7 + .../hadoop/fs/store/TestDataBlocks.java | 138 ++ .../fs/azurebfs/AzureBlobFileSystem.java | 42 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 165 ++- .../azurebfs/constants/ConfigurationKeys.java | 31 + .../constants/FileSystemConfigurations.java | 18 + .../azurebfs/services/AbfsOutputStream.java | 469 ++++--- .../services/AbfsOutputStreamContext.java | 111 +- .../services/AbfsOutputStreamStatistics.java | 4 +- .../AbfsOutputStreamStatisticsImpl.java | 25 +- .../azure/integration/AzureTestConstants.java | 13 + .../azurebfs/AbstractAbfsIntegrationTest.java | 2 +- .../fs/azurebfs/ITestAbfsHugeFiles.java | 134 ++ .../ITestAzureBlobFileSystemLease.java | 15 +- .../services/TestAbfsOutputStream.java | 163 ++- 19 files changed, 2230 insertions(+), 280 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 1ea44df5032..6949c67f278 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -464,4 +464,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IOStatistics logging level. */ public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT = IOSTATISTICS_LOGGING_LEVEL_DEBUG; + + /** + * default hadoop temp dir on local system: {@value}. + */ + public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index bbb8517118e..7e9137294c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -358,6 +358,18 @@ public final class StreamStatisticNames { public static final String REMOTE_BYTES_READ = "remote_bytes_read"; + /** + * Total number of Data blocks allocated by an outputStream. + */ + public static final String BLOCKS_ALLOCATED + = "blocks_allocated"; + + /** + * Total number of Data blocks released by an outputStream. + */ + public static final String BLOCKS_RELEASED + = "blocks_released"; + private StreamStatisticNames() { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java new file mode 100644 index 00000000000..bf7cbbbc5d5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +public interface BlockUploadStatistics { + + /** + * A block has been allocated. + */ + void blockAllocated(); + + /** + * A block has been released. + */ + void blockReleased(); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java new file mode 100644 index 00000000000..5c7c9378f16 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java @@ -0,0 +1,1123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DirectBufferPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; + +/** + * A class to provide disk, byteBuffer and byteArray option for Filesystem + * OutputStreams. + * + *

+ * Implementation of DataBlocks taken from HADOOP-13560 to support huge file + * uploads in S3A with different options rather than one. + */ +public final class DataBlocks { + + private static final Logger LOG = + LoggerFactory.getLogger(DataBlocks.class); + + /** + * Buffer blocks to disk. + * Capacity is limited to available disk space. + */ + public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; + + /** + * Use a byte buffer. + */ + public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer"; + + /** + * Use an in-memory array. Fast but will run of heap rapidly. + */ + public static final String DATA_BLOCKS_BUFFER_ARRAY = "array"; + + private DataBlocks() { + } + + /** + * Validate args to a write command. These are the same validation checks + * expected for any implementation of {@code OutputStream.write()}. + * + * @param b byte array containing data. + * @param off offset in array where to start. + * @param len number of bytes to be written. + * @throws NullPointerException for a null buffer + * @throws IndexOutOfBoundsException if indices are out of range + */ + public static void validateWriteArgs(byte[] b, int off, int len) + throws IOException { + Preconditions.checkNotNull(b); + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException( + "write (b[" + b.length + "], " + off + ", " + len + ')'); + } + } + + /** + * Create a factory. + * + * @param keyToBufferDir Key to buffer directory config for a FS. + * @param configuration factory configurations. + * @param name factory name -the option from {@link CommonConfigurationKeys}. + * @return the factory, ready to be initialized. + * @throws IllegalArgumentException if the name is unknown. + */ + public static BlockFactory createFactory(String keyToBufferDir, + Configuration configuration, + String name) { + LOG.debug("Creating DataFactory of type : {}", name); + switch (name) { + case DATA_BLOCKS_BUFFER_ARRAY: + return new ArrayBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BUFFER_DISK: + return new DiskBlockFactory(keyToBufferDir, configuration); + case DATA_BLOCKS_BYTEBUFFER: + return new ByteBufferBlockFactory(keyToBufferDir, configuration); + default: + throw new IllegalArgumentException("Unsupported block buffer" + + " \"" + name + '"'); + } + } + + /** + * The output information for an upload. + * It can be one of a file, an input stream or a byteArray. + * {@link #toByteArray()} method to be used to convert the data into byte + * array to be done in this class as well. + * When closed, any stream is closed. Any source file is untouched. + */ + public static final class BlockUploadData implements Closeable { + private final File file; + private InputStream uploadStream; + private byte[] byteArray; + private boolean isClosed; + + /** + * Constructor for byteArray upload data block. File and uploadStream + * would be null. + * + * @param byteArray byteArray used to construct BlockUploadData. + */ + public BlockUploadData(byte[] byteArray) { + this.file = null; + this.uploadStream = null; + + this.byteArray = requireNonNull(byteArray); + } + + /** + * File constructor; input stream and byteArray will be null. + * + * @param file file to upload + */ + BlockUploadData(File file) { + Preconditions.checkArgument(file.exists(), "No file: %s", file); + this.file = file; + this.uploadStream = null; + this.byteArray = null; + } + + /** + * Stream constructor, file and byteArray field will be null. + * + * @param uploadStream stream to upload. + */ + BlockUploadData(InputStream uploadStream) { + requireNonNull(uploadStream, "rawUploadStream"); + this.uploadStream = uploadStream; + this.file = null; + this.byteArray = null; + } + + /** + * Predicate: does this instance contain a file reference. + * + * @return true if there is a file. + */ + boolean hasFile() { + return file != null; + } + + /** + * Get the file, if there is one. + * + * @return the file for uploading, or null. + */ + File getFile() { + return file; + } + + /** + * Get the raw upload stream, if the object was + * created with one. + * + * @return the upload stream or null. + */ + InputStream getUploadStream() { + return uploadStream; + } + + /** + * Convert to a byte array. + * If the data is stored in a file, it will be read and returned. + * If the data was passed in via an input stream (which happens if the + * data is stored in a bytebuffer) then it will be converted to a byte + * array -which will then be cached for any subsequent use. + * + * @return byte[] after converting the uploadBlock. + * @throws IOException throw if an exception is caught while reading + * File/InputStream or closing InputStream. + */ + public byte[] toByteArray() throws IOException { + Preconditions.checkState(!isClosed, "Block is closed"); + if (byteArray != null) { + return byteArray; + } + if (file != null) { + // Need to save byteArray here so that we don't read File if + // byteArray() is called more than once on the same file. + byteArray = FileUtils.readFileToByteArray(file); + return byteArray; + } + byteArray = IOUtils.toByteArray(uploadStream); + IOUtils.close(uploadStream); + uploadStream = null; + return byteArray; + } + + /** + * Close: closes any upload stream and byteArray provided in the + * constructor. + * + * @throws IOException inherited exception. + */ + @Override + public void close() throws IOException { + isClosed = true; + cleanupWithLogger(LOG, uploadStream); + byteArray = null; + if (file != null) { + LOG.debug("File deleted in BlockUploadData close: {}", file.delete()); + } + } + } + + /** + * Base class for block factories. + */ + public static abstract class BlockFactory implements Closeable { + + private final String keyToBufferDir; + private final Configuration conf; + + protected BlockFactory(String keyToBufferDir, Configuration conf) { + this.keyToBufferDir = keyToBufferDir; + this.conf = conf; + } + + /** + * Create a block. + * + * @param index index of block + * @param limit limit of the block. + * @param statistics stats to work with + * @return a new block. + */ + public abstract DataBlock create(long index, int limit, + BlockUploadStatistics statistics) + throws IOException; + + /** + * Implement any close/cleanup operation. + * Base class is a no-op. + * + * @throws IOException Inherited exception; implementations should + * avoid raising it. + */ + @Override + public void close() throws IOException { + } + + /** + * Configuration. + * + * @return config passed to create the factory. + */ + protected Configuration getConf() { + return conf; + } + + /** + * Key to Buffer Directory config for a FS instance. + * + * @return String containing key to Buffer dir. + */ + public String getKeyToBufferDir() { + return keyToBufferDir; + } + } + + /** + * This represents a block being uploaded. + */ + public static abstract class DataBlock implements Closeable { + + enum DestState {Writing, Upload, Closed} + + private volatile DestState state = Writing; + private final long index; + private final BlockUploadStatistics statistics; + + protected DataBlock(long index, + BlockUploadStatistics statistics) { + this.index = index; + this.statistics = statistics; + } + + /** + * Atomically enter a state, verifying current state. + * + * @param current current state. null means "no check" + * @param next next state + * @throws IllegalStateException if the current state is not as expected + */ + protected synchronized final void enterState(DestState current, + DestState next) + throws IllegalStateException { + verifyState(current); + LOG.debug("{}: entering state {}", this, next); + state = next; + } + + /** + * Verify that the block is in the declared state. + * + * @param expected expected state. + * @throws IllegalStateException if the DataBlock is in the wrong state + */ + protected final void verifyState(DestState expected) + throws IllegalStateException { + if (expected != null && state != expected) { + throw new IllegalStateException("Expected stream state " + expected + + " -but actual state is " + state + " in " + this); + } + } + + /** + * Current state. + * + * @return the current state. + */ + final DestState getState() { + return state; + } + + /** + * Return the current data size. + * + * @return the size of the data. + */ + public abstract int dataSize(); + + /** + * Predicate to verify that the block has the capacity to write + * the given set of bytes. + * + * @param bytes number of bytes desired to be written. + * @return true if there is enough space. + */ + abstract boolean hasCapacity(long bytes); + + /** + * Predicate to check if there is data in the block. + * + * @return true if there is + */ + public boolean hasData() { + return dataSize() > 0; + } + + /** + * The remaining capacity in the block before it is full. + * + * @return the number of bytes remaining. + */ + public abstract int remainingCapacity(); + + /** + * Write a series of bytes from the buffer, from the offset. + * Returns the number of bytes written. + * Only valid in the state {@code Writing}. + * Base class verifies the state but does no writing. + * + * @param buffer buffer. + * @param offset offset. + * @param length length of write. + * @return number of bytes written. + * @throws IOException trouble + */ + public int write(byte[] buffer, int offset, int length) throws IOException { + verifyState(Writing); + Preconditions.checkArgument(buffer != null, "Null buffer"); + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(offset >= 0, "offset is negative"); + Preconditions.checkArgument( + !(buffer.length - offset < length), + "buffer shorter than amount of data to write"); + return 0; + } + + /** + * Flush the output. + * Only valid in the state {@code Writing}. + * In the base class, this is a no-op + * + * @throws IOException any IO problem. + */ + public void flush() throws IOException { + verifyState(Writing); + } + + /** + * Switch to the upload state and return a stream for uploading. + * Base class calls {@link #enterState(DestState, DestState)} to + * manage the state machine. + * + * @return the stream. + * @throws IOException trouble + */ + public BlockUploadData startUpload() throws IOException { + LOG.debug("Start datablock[{}] upload", index); + enterState(Writing, Upload); + return null; + } + + /** + * Enter the closed state. + * + * @return true if the class was in any other state, implying that + * the subclass should do its close operations. + */ + protected synchronized boolean enterClosedState() { + if (!state.equals(Closed)) { + enterState(null, Closed); + return true; + } else { + return false; + } + } + + @Override + public void close() throws IOException { + if (enterClosedState()) { + LOG.debug("Closed {}", this); + innerClose(); + } + } + + /** + * Inner close logic for subclasses to implement. + */ + protected void innerClose() throws IOException { + + } + + /** + * A block has been allocated. + */ + protected void blockAllocated() { + if (statistics != null) { + statistics.blockAllocated(); + } + } + + /** + * A block has been released. + */ + protected void blockReleased() { + if (statistics != null) { + statistics.blockReleased(); + } + } + + protected BlockUploadStatistics getStatistics() { + return statistics; + } + + public long getIndex() { + return index; + } + } + + // ==================================================================== + + /** + * Use byte arrays on the heap for storage. + */ + static class ArrayBlockFactory extends BlockFactory { + + ArrayBlockFactory(String keyToBufferDir, Configuration conf) { + super(keyToBufferDir, conf); + } + + @Override + public DataBlock create(long index, int limit, + BlockUploadStatistics statistics) + throws IOException { + return new ByteArrayBlock(0, limit, statistics); + } + + } + + static class DataBlockByteArrayOutputStream extends ByteArrayOutputStream { + + DataBlockByteArrayOutputStream(int size) { + super(size); + } + + /** + * InputStream backed by the internal byte array. + * + * @return ByteArrayInputStream instance. + */ + ByteArrayInputStream getInputStream() { + ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count); + this.reset(); + this.buf = null; + return bin; + } + } + + /** + * Stream to memory via a {@code ByteArrayOutputStream}. + *

+ * It can consume a lot of heap space + * proportional to the mismatch between writes to the stream and + * the JVM-wide upload bandwidth to a Store's endpoint. + * The memory consumption can be limited by tuning the filesystem settings + * to restrict the number of queued/active uploads. + */ + + static class ByteArrayBlock extends DataBlock { + private DataBlockByteArrayOutputStream buffer; + private final int limit; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + ByteArrayBlock(long index, + int limit, + BlockUploadStatistics statistics) { + super(index, statistics); + this.limit = limit; + this.buffer = new DataBlockByteArrayOutputStream(limit); + blockAllocated(); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * + * @return the amount of data available to upload. + */ + @Override + public int dataSize() { + return dataSize != null ? dataSize : buffer.size(); + } + + @Override + public BlockUploadData startUpload() throws IOException { + super.startUpload(); + dataSize = buffer.size(); + ByteArrayInputStream bufferData = buffer.getInputStream(); + buffer = null; + return new BlockUploadData(bufferData); + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override + public int remainingCapacity() { + return limit - dataSize(); + } + + @Override + public int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + buffer.write(b, offset, written); + return written; + } + + @Override + protected void innerClose() { + buffer = null; + blockReleased(); + } + + @Override + public String toString() { + return "ByteArrayBlock{" + + "index=" + getIndex() + + ", state=" + getState() + + ", limit=" + limit + + ", dataSize=" + dataSize + + '}'; + } + } + + // ==================================================================== + + /** + * Stream via Direct ByteBuffers; these are allocated off heap + * via {@link DirectBufferPool}. + */ + + static class ByteBufferBlockFactory extends BlockFactory { + + private final DirectBufferPool bufferPool = new DirectBufferPool(); + private final AtomicInteger buffersOutstanding = new AtomicInteger(0); + + ByteBufferBlockFactory(String keyToBufferDir, Configuration conf) { + super(keyToBufferDir, conf); + } + + @Override public ByteBufferBlock create(long index, int limit, + BlockUploadStatistics statistics) + throws IOException { + return new ByteBufferBlock(index, limit, statistics); + } + + private ByteBuffer requestBuffer(int limit) { + LOG.debug("Requesting buffer of size {}", limit); + buffersOutstanding.incrementAndGet(); + return bufferPool.getBuffer(limit); + } + + private void releaseBuffer(ByteBuffer buffer) { + LOG.debug("Releasing buffer"); + bufferPool.returnBuffer(buffer); + buffersOutstanding.decrementAndGet(); + } + + /** + * Get count of outstanding buffers. + * + * @return the current buffer count. + */ + public int getOutstandingBufferCount() { + return buffersOutstanding.get(); + } + + @Override + public String toString() { + return "ByteBufferBlockFactory{" + + "buffersOutstanding=" + buffersOutstanding + + '}'; + } + + /** + * A DataBlock which requests a buffer from pool on creation; returns + * it when it is closed. + */ + class ByteBufferBlock extends DataBlock { + private ByteBuffer blockBuffer; + private final int bufferSize; + // cache data size so that it is consistent after the buffer is reset. + private Integer dataSize; + + /** + * Instantiate. This will request a ByteBuffer of the desired size. + * + * @param index block index. + * @param bufferSize buffer size. + * @param statistics statistics to update. + */ + ByteBufferBlock(long index, + int bufferSize, + BlockUploadStatistics statistics) { + super(index, statistics); + this.bufferSize = bufferSize; + this.blockBuffer = requestBuffer(bufferSize); + blockAllocated(); + } + + /** + * Get the amount of data; if there is no buffer then the size is 0. + * + * @return the amount of data available to upload. + */ + @Override public int dataSize() { + return dataSize != null ? dataSize : bufferCapacityUsed(); + } + + @Override + public BlockUploadData startUpload() throws IOException { + super.startUpload(); + dataSize = bufferCapacityUsed(); + // set the buffer up from reading from the beginning + blockBuffer.limit(blockBuffer.position()); + blockBuffer.position(0); + return new BlockUploadData( + new ByteBufferInputStream(dataSize, blockBuffer)); + } + + @Override + public boolean hasCapacity(long bytes) { + return bytes <= remainingCapacity(); + } + + @Override + public int remainingCapacity() { + return blockBuffer != null ? blockBuffer.remaining() : 0; + } + + private int bufferCapacityUsed() { + return blockBuffer.capacity() - blockBuffer.remaining(); + } + + @Override + public int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + blockBuffer.put(b, offset, written); + return written; + } + + /** + * Closing the block will release the buffer. + */ + @Override + protected void innerClose() { + if (blockBuffer != null) { + blockReleased(); + releaseBuffer(blockBuffer); + blockBuffer = null; + } + } + + @Override + public String toString() { + return "ByteBufferBlock{" + + "index=" + getIndex() + + ", state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + bufferSize + + ", remainingCapacity=" + remainingCapacity() + + '}'; + } + + /** + * Provide an input stream from a byte buffer; supporting + * {@link #mark(int)}, which is required to enable replay of failed + * PUT attempts. + */ + class ByteBufferInputStream extends InputStream { + + private final int size; + private ByteBuffer byteBuffer; + + ByteBufferInputStream(int size, + ByteBuffer byteBuffer) { + LOG.debug("Creating ByteBufferInputStream of size {}", size); + this.size = size; + this.byteBuffer = byteBuffer; + } + + /** + * After the stream is closed, set the local reference to the byte + * buffer to null; this guarantees that future attempts to use + * stream methods will fail. + */ + @Override + public synchronized void close() { + LOG.debug("ByteBufferInputStream.close() for {}", + ByteBufferBlock.super.toString()); + byteBuffer = null; + } + + /** + * Verify that the stream is open. + * + * @throws IOException if the stream is closed + */ + private void verifyOpen() throws IOException { + if (byteBuffer == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + public synchronized int read() throws IOException { + if (available() > 0) { + return byteBuffer.get() & 0xFF; + } else { + return -1; + } + } + + @Override + public synchronized long skip(long offset) throws IOException { + verifyOpen(); + long newPos = position() + offset; + if (newPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (newPos > size) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + byteBuffer.position((int) newPos); + return newPos; + } + + @Override + public synchronized int available() { + Preconditions.checkState(byteBuffer != null, + FSExceptionMessages.STREAM_IS_CLOSED); + return byteBuffer.remaining(); + } + + /** + * Get the current buffer position. + * + * @return the buffer position + */ + public synchronized int position() { + return byteBuffer.position(); + } + + /** + * Check if there is data left. + * + * @return true if there is data remaining in the buffer. + */ + public synchronized boolean hasRemaining() { + return byteBuffer.hasRemaining(); + } + + @Override + public synchronized void mark(int readlimit) { + LOG.debug("mark at {}", position()); + byteBuffer.mark(); + } + + @Override + public synchronized void reset() throws IOException { + LOG.debug("reset"); + byteBuffer.reset(); + } + + @Override + public boolean markSupported() { + return true; + } + + /** + * Read in data. + * + * @param b destination buffer. + * @param offset offset within the buffer. + * @param length length of bytes to read. + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the + * amount of data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + @SuppressWarnings("NullableProblems") + public synchronized int read(byte[] b, int offset, int length) + throws IOException { + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(b != null, "Null buffer"); + if (b.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length =" + length + + ", with offset =" + offset + + "; buffer capacity =" + (b.length - offset)); + } + verifyOpen(); + if (!hasRemaining()) { + return -1; + } + + int toRead = Math.min(length, available()); + byteBuffer.get(b, offset, toRead); + return toRead; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ByteBufferInputStream{"); + sb.append("size=").append(size); + ByteBuffer buf = this.byteBuffer; + if (buf != null) { + sb.append(", available=").append(buf.remaining()); + } + sb.append(", ").append(ByteBufferBlock.super.toString()); + sb.append('}'); + return sb.toString(); + } + } + } + } + + // ==================================================================== + + /** + * Buffer blocks to disk. + */ + static class DiskBlockFactory extends BlockFactory { + + private LocalDirAllocator directoryAllocator; + + DiskBlockFactory(String keyToBufferDir, Configuration conf) { + super(keyToBufferDir, conf); + String bufferDir = conf.get(keyToBufferDir) != null + ? keyToBufferDir : HADOOP_TMP_DIR; + directoryAllocator = new LocalDirAllocator(bufferDir); + } + + /** + * Create a temp file and a {@link DiskBlock} instance to manage it. + * + * @param index block index. + * @param limit limit of the block. + * @param statistics statistics to update. + * @return the new block. + * @throws IOException IO problems + */ + @Override + public DataBlock create(long index, + int limit, + BlockUploadStatistics statistics) + throws IOException { + File destFile = createTmpFileForWrite(String.format("datablock-%04d-", + index), + limit, getConf()); + + return new DiskBlock(destFile, limit, index, statistics); + } + + /** + * Demand create the directory allocator, then create a temporary file. + * This does not mark the file for deletion when a process exits. + * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. + * + * @param pathStr prefix for the temporary file. + * @param size the size of the file that is going to be written. + * @param conf the Configuration object. + * @return a unique temporary file. + * @throws IOException IO problems + */ + File createTmpFileForWrite(String pathStr, long size, + Configuration conf) throws IOException { + Path path = directoryAllocator.getLocalPathForWrite(pathStr, + size, conf); + File dir = new File(path.getParent().toUri().getPath()); + String prefix = path.getName(); + // create a temp file on this directory + return File.createTempFile(prefix, null, dir); + } + } + + /** + * Stream to a file. + * This will stop at the limit; the caller is expected to create a new block. + */ + static class DiskBlock extends DataBlock { + + private int bytesWritten; + private final File bufferFile; + private final int limit; + private BufferedOutputStream out; + private final AtomicBoolean closed = new AtomicBoolean(false); + + DiskBlock(File bufferFile, + int limit, + long index, + BlockUploadStatistics statistics) + throws FileNotFoundException { + super(index, statistics); + this.limit = limit; + this.bufferFile = bufferFile; + blockAllocated(); + out = new BufferedOutputStream(new FileOutputStream(bufferFile)); + } + + @Override public int dataSize() { + return bytesWritten; + } + + @Override + boolean hasCapacity(long bytes) { + return dataSize() + bytes <= limit; + } + + @Override public int remainingCapacity() { + return limit - bytesWritten; + } + + @Override + public int write(byte[] b, int offset, int len) throws IOException { + super.write(b, offset, len); + int written = Math.min(remainingCapacity(), len); + out.write(b, offset, written); + bytesWritten += written; + return written; + } + + @Override + public BlockUploadData startUpload() throws IOException { + super.startUpload(); + try { + out.flush(); + } finally { + out.close(); + out = null; + } + return new BlockUploadData(bufferFile); + } + + /** + * The close operation will delete the destination file if it still + * exists. + * + * @throws IOException IO problems + */ + @SuppressWarnings("UnnecessaryDefault") + @Override + protected void innerClose() throws IOException { + final DestState state = getState(); + LOG.debug("Closing {}", this); + switch (state) { + case Writing: + if (bufferFile.exists()) { + // file was not uploaded + LOG.debug("Block[{}]: Deleting buffer file as upload did not start", + getIndex()); + closeBlock(); + } + break; + + case Upload: + LOG.debug("Block[{}]: Buffer file {} exists —close upload stream", + getIndex(), bufferFile); + break; + + case Closed: + closeBlock(); + break; + + default: + // this state can never be reached, but checkstyle complains, so + // it is here. + } + } + + /** + * Flush operation will flush to disk. + * + * @throws IOException IOE raised on FileOutputStream + */ + @Override public void flush() throws IOException { + super.flush(); + out.flush(); + } + + @Override + public String toString() { + String sb = "FileBlock{" + + "index=" + getIndex() + + ", destFile=" + bufferFile + + ", state=" + getState() + + ", dataSize=" + dataSize() + + ", limit=" + limit + + '}'; + return sb; + } + + /** + * Close the block. + * This will delete the block's buffer file if the block has + * not previously been closed. + */ + void closeBlock() { + LOG.debug("block[{}]: closeBlock()", getIndex()); + if (!closed.getAndSet(true)) { + blockReleased(); + if (!bufferFile.delete() && bufferFile.exists()) { + LOG.warn("delete({}) returned false", + bufferFile.getAbsoluteFile()); + } + } else { + LOG.debug("block[{}]: skipping re-entrant closeBlock()", getIndex()); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4d289a71b5b..7b18217426b 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2304,6 +2304,13 @@ + + fs.azure.buffer.dir + ${hadoop.tmp.dir}/abfs + Directory path for buffer files needed to upload data blocks + in AbfsOutputStream. + + fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java new file mode 100644 index 00000000000..5698a08c7e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.store; + +import java.io.IOException; +import java.util.Random; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * UTs to test {@link DataBlocks} functionalities. + */ +public class TestDataBlocks { + private final Configuration configuration = new Configuration(); + private static final int ONE_KB = 1024; + private static final Logger LOG = + LoggerFactory.getLogger(TestDataBlocks.class); + + /** + * Test to verify different DataBlocks factories, different operations. + */ + @Test + public void testDataBlocksFactory() throws Exception { + testCreateFactory(DATA_BLOCKS_BUFFER_DISK); + testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY); + testCreateFactory(DATA_BLOCKS_BYTEBUFFER); + } + + /** + * Verify creation of a data block factory and its operations. + * + * @param nameOfFactory Name of the DataBlock factory to be created. + * @throws IOException Throw IOE in case of failure while creating a block. + */ + public void testCreateFactory(String nameOfFactory) throws Exception { + LOG.info("Testing: {}", nameOfFactory); + DataBlocks.BlockFactory blockFactory = + DataBlocks.createFactory("Dir", configuration, nameOfFactory); + + DataBlocks.DataBlock dataBlock = blockFactory.create(0, ONE_KB, null); + assertWriteBlock(dataBlock); + assertToByteArray(dataBlock); + assertCloseBlock(dataBlock); + } + + /** + * Verify Writing of a dataBlock. + * + * @param dataBlock DataBlock to be tested. + * @throws IOException Throw Exception in case of failures. + */ + private void assertWriteBlock(DataBlocks.DataBlock dataBlock) + throws IOException { + byte[] oneKbBuff = new byte[ONE_KB]; + new Random().nextBytes(oneKbBuff); + dataBlock.write(oneKbBuff, 0, ONE_KB); + // Verify DataBlock state is at Writing. + dataBlock.verifyState(DataBlocks.DataBlock.DestState.Writing); + // Verify that the DataBlock has data written. + assertTrue("Expected Data block to have data", dataBlock.hasData()); + // Verify the size of data. + assertEquals("Mismatch in data size in block", ONE_KB, + dataBlock.dataSize()); + // Verify that no capacity is left in the data block to write more. + assertFalse("Expected the data block to have no capacity to write 1 byte " + + "of data", dataBlock.hasCapacity(1)); + } + + /** + * Verify the Conversion of Data blocks into byte[]. + * + * @param dataBlock data block to be tested. + * @throws Exception Throw Exception in case of failures. + */ + private void assertToByteArray(DataBlocks.DataBlock dataBlock) + throws Exception { + DataBlocks.BlockUploadData blockUploadData = dataBlock.startUpload(); + // Verify that the current state is in upload. + dataBlock.verifyState(DataBlocks.DataBlock.DestState.Upload); + // Convert the DataBlock upload to byteArray. + byte[] bytesWritten = blockUploadData.toByteArray(); + // Verify that we can call toByteArray() more than once and gives the + // same byte[]. + assertEquals("Mismatch in byteArray provided by toByteArray() the second " + + "time", bytesWritten, blockUploadData.toByteArray()); + IOUtils.close(blockUploadData); + // Verify that after closing blockUploadData, we can't call toByteArray(). + LambdaTestUtils.intercept(IllegalStateException.class, + "Block is closed", + "Expected to throw IllegalStateException.java after closing " + + "blockUploadData and trying to call toByteArray()", + () -> { + blockUploadData.toByteArray(); + }); + } + + /** + * Verify the close() of data blocks. + * + * @param dataBlock data block to be tested. + * @throws IOException Throw Exception in case of failures. + */ + private void assertCloseBlock(DataBlocks.DataBlock dataBlock) + throws IOException { + dataBlock.close(); + // Verify that the current state is in Closed. + dataBlock.verifyState(DataBlocks.DataBlock.DestState.Closed); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 91274289f54..b4c8b3eff32 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -90,6 +90,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; @@ -101,6 +102,11 @@ import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; @@ -125,6 +131,13 @@ public class AzureBlobFileSystem extends FileSystem private TracingHeaderFormat tracingHeaderFormat; private Listener listener; + /** Name of blockFactory to be used by AbfsOutputStream. */ + private String blockOutputBuffer; + /** BlockFactory instance to be used. */ + private DataBlocks.BlockFactory blockFactory; + /** Maximum Active blocks per OutputStream. */ + private int blockOutputActiveBlocks; + @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -136,8 +149,33 @@ public class AzureBlobFileSystem extends FileSystem this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); abfsCounters = new AbfsCountersImpl(uri); - this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), - configuration, abfsCounters); + // name of the blockFactory to be used. + this.blockOutputBuffer = configuration.getTrimmed(DATA_BLOCKS_BUFFER, + DATA_BLOCKS_BUFFER_DEFAULT); + // blockFactory used for this FS instance. + this.blockFactory = + DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR, + configuration, blockOutputBuffer); + this.blockOutputActiveBlocks = + configuration.getInt(FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS, + BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT); + if (blockOutputActiveBlocks < 1) { + blockOutputActiveBlocks = 1; + } + + // AzureBlobFileSystemStore with params in builder. + AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder + systemStoreBuilder = + new AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder() + .withUri(uri) + .withSecureScheme(this.isSecureScheme()) + .withConfiguration(configuration) + .withAbfsCounters(abfsCounters) + .withBlockFactory(blockFactory) + .withBlockOutputActiveBlocks(blockOutputActiveBlocks) + .build(); + + this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder); LOG.trace("AzureBlobFileSystemStore init complete"); final AbfsConfiguration abfsConfiguration = abfsStore diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index de6f676bab5..3e9c2b75242 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -51,6 +51,8 @@ import java.util.Optional; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -120,8 +122,12 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.http.client.utils.URIBuilder; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; @@ -172,10 +178,23 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { */ private Set appendBlobDirSet; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, - Configuration configuration, - AbfsCounters abfsCounters) throws IOException { - this.uri = uri; + /** BlockFactory being used by this instance.*/ + private DataBlocks.BlockFactory blockFactory; + /** Number of active data blocks per AbfsOutputStream */ + private int blockOutputActiveBlocks; + /** Bounded ThreadPool for this instance. */ + private ExecutorService boundedThreadPool; + + /** + * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. + * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters + * required. + * @param abfsStoreBuilder Builder for AzureBlobFileSystemStore. + * @throws IOException Throw IOE in case of failure during constructing. + */ + public AzureBlobFileSystemStore( + AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException { + this.uri = abfsStoreBuilder.uri; String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; @@ -183,7 +202,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); try { - this.abfsConfiguration = new AbfsConfiguration(configuration, accountName); + this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName); } catch (IllegalAccessException exception) { throw new FileSystemOperationUnhandledException(exception); } @@ -213,16 +232,16 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { updateInfiniteLeaseDirs(); this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); - boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; + boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme; this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration); - this.abfsCounters = abfsCounters; + this.abfsCounters = abfsStoreBuilder.abfsCounters; initializeClient(uri, fileSystemName, accountName, useHttps); final Class identityTransformerClass = - configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, + abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, IdentityTransformerInterface.class); try { this.identityTransformer = - identityTransformerClass.getConstructor(Configuration.class).newInstance(configuration); + identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration); } catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) { throw new IOException(e); } @@ -236,6 +255,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { this.appendBlobDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); } + this.blockFactory = abfsStoreBuilder.blockFactory; + this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks; + this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( + abfsConfiguration.getWriteMaxConcurrentRequestCount(), + abfsConfiguration.getMaxWriteRequestsToQueue(), + 10L, TimeUnit.SECONDS, + "abfs-bounded"); } /** @@ -272,6 +298,10 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { } try { Futures.allAsList(futures).get(); + // shutdown the threadPool and set it to null. + HadoopExecutors.shutdown(boundedThreadPool, LOG, + 30, TimeUnit.SECONDS); + boundedThreadPool = null; } catch (InterruptedException e) { LOG.error("Interrupted freeing leases", e); Thread.currentThread().interrupt(); @@ -498,7 +528,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { public OutputStream createFile(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, final FsPermission permission, final FsPermission umask, - TracingContext tracingContext) throws AzureBlobFileSystemException { + TracingContext tracingContext) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", @@ -549,12 +579,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { AbfsLease lease = maybeCreateLease(relativePath, tracingContext); return new AbfsOutputStream( - client, - statistics, - relativePath, - 0, - populateAbfsOutputStreamContext(isAppendBlob, lease), - tracingContext); + populateAbfsOutputStreamContext( + isAppendBlob, + lease, + client, + statistics, + relativePath, + 0, + tracingContext)); } } @@ -628,8 +660,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { return op; } - private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob, - AbfsLease lease) { + /** + * Method to populate AbfsOutputStreamContext with different parameters to + * be used to construct {@link AbfsOutputStream}. + * + * @param isAppendBlob is Append blob support enabled? + * @param lease instance of AbfsLease for this AbfsOutputStream. + * @param client AbfsClient. + * @param statistics FileSystem statistics. + * @param path Path for AbfsOutputStream. + * @param position Position or offset of the file being opened, set to 0 + * when creating a new file, but needs to be set for APPEND + * calls on the same file. + * @param tracingContext instance of TracingContext for this AbfsOutputStream. + * @return AbfsOutputStreamContext instance with the desired parameters. + */ + private AbfsOutputStreamContext populateAbfsOutputStreamContext( + boolean isAppendBlob, + AbfsLease lease, + AbfsClient client, + FileSystem.Statistics statistics, + String path, + long position, + TracingContext tracingContext) { int bufferSize = abfsConfiguration.getWriteBufferSize(); if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; @@ -644,6 +697,15 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .withLease(lease) + .withBlockFactory(blockFactory) + .withBlockOutputActiveBlocks(blockOutputActiveBlocks) + .withClient(client) + .withPosition(position) + .withFsStatistics(statistics) + .withPath(path) + .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool, + blockOutputActiveBlocks, true)) + .withTracingContext(tracingContext) .build(); } @@ -755,7 +817,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, - TracingContext tracingContext) throws AzureBlobFileSystemException { + TracingContext tracingContext) throws IOException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", client.getFileSystem(), @@ -791,12 +853,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { AbfsLease lease = maybeCreateLease(relativePath, tracingContext); return new AbfsOutputStream( - client, - statistics, - relativePath, - offset, - populateAbfsOutputStreamContext(isAppendBlob, lease), - tracingContext); + populateAbfsOutputStreamContext( + isAppendBlob, + lease, + client, + statistics, + relativePath, + offset, + tracingContext)); } } @@ -1744,6 +1808,57 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { } } + /** + * A builder class for AzureBlobFileSystemStore. + */ + public static final class AzureBlobFileSystemStoreBuilder { + + private URI uri; + private boolean isSecureScheme; + private Configuration configuration; + private AbfsCounters abfsCounters; + private DataBlocks.BlockFactory blockFactory; + private int blockOutputActiveBlocks; + + public AzureBlobFileSystemStoreBuilder withUri(URI value) { + this.uri = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) { + this.isSecureScheme = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder withConfiguration( + Configuration value) { + this.configuration = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder withAbfsCounters( + AbfsCounters value) { + this.abfsCounters = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder withBlockFactory( + DataBlocks.BlockFactory value) { + this.blockFactory = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks( + int value) { + this.blockOutputActiveBlocks = value; + return this; + } + + public AzureBlobFileSystemStoreBuilder build() { + return this; + } + } + @VisibleForTesting AbfsClient getClient() { return this.client; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 4a2c5951bd5..12beb5a9bba 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -56,6 +56,37 @@ public final class ConfigurationKeys { public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests"; public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; + + /** + * Maximum Number of blocks a single output stream can have + * active (uploading, or queued to the central FileSystem + * instance's pool of queued operations. + * This stops a single stream overloading the shared thread pool. + * {@value} + *

+ * Default is {@link FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT} + */ + public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS = + "fs.azure.block.upload.active.blocks"; + + /** + * Buffer directory path for uploading AbfsOutputStream data blocks. + * Value: {@value} + */ + public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR = + "fs.azure.buffer.dir"; + + /** + * What data block buffer to use. + *
+ * Options include: "disk"(Default), "array", and "bytebuffer". + *
+ * Default is {@link FileSystemConfigurations#DATA_BLOCKS_BUFFER_DEFAULT}. + * Value: {@value} + */ + public static final String DATA_BLOCKS_BUFFER = + "fs.azure.data.blocks.buffer"; + /** If the data size written by Hadoop app is small, i.e. data size : * (a) before any of HFlush/HSync call is made or * (b) between 2 HFlush/Hsync API calls diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index a1de9dfc0ac..f58c61e8908 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -115,5 +115,23 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; + /** + * Limit of queued block upload operations before writes + * block for an OutputStream. Value: {@value} + */ + public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20; + + /** + * Buffer blocks to disk. + * Capacity is limited to available disk space. + */ + public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; + + /** + * Default buffer option: {@value}. + */ + public static final String DATA_BLOCKS_BUFFER_DEFAULT = + DATA_BLOCKS_BUFFER_DISK; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 91b068a78c9..61160f92bdf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -20,24 +20,20 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.HttpURLConnection; -import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.UUID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; - +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; @@ -47,10 +43,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; @@ -63,6 +58,7 @@ import static org.apache.hadoop.io.IOUtils.wrapException; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE; +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState; /** * The BlobFsOutputStream for Rest AbfsClient. @@ -72,6 +68,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private final AbfsClient client; private final String path; + /** The position in the file being uploaded, where the next block would be + * uploaded. + * This is used in constructing the AbfsClient requests to ensure that, + * even if blocks are uploaded out of order, they are reassembled in + * correct order. + * */ private long position; private boolean closed; private boolean supportFlush; @@ -91,8 +93,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private final int maxRequestsThatCanBeQueued; private ConcurrentLinkedDeque writeOperations; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService completionService; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; @@ -103,15 +103,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private AbfsLease lease; private String leaseId; - /** - * Queue storing buffers with the size of the Azure block ready for - * reuse. The pool allows reusing the blocks instead of allocating new - * blocks. After the data is sent to the service, the buffer is returned - * back to the queue - */ - private ElasticByteBufferPool byteBufferPool - = new ElasticByteBufferPool(); - private final Statistics statistics; private final AbfsOutputStreamStatistics outputStreamStatistics; private IOStatistics ioStatistics; @@ -119,17 +110,27 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class); - public AbfsOutputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long position, - AbfsOutputStreamContext abfsOutputStreamContext, - TracingContext tracingContext) { - this.client = client; - this.statistics = statistics; - this.path = path; - this.position = position; + /** Factory for blocks. */ + private final DataBlocks.BlockFactory blockFactory; + + /** Current data block. Null means none currently active. */ + private DataBlocks.DataBlock activeBlock; + + /** Count of blocks uploaded. */ + private long blockCount = 0; + + /** The size of a single block. */ + private final int blockSize; + + /** Executor service to carry out the parallel upload requests. */ + private final ListeningExecutorService executorService; + + public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) + throws IOException { + this.client = abfsOutputStreamContext.getClient(); + this.statistics = abfsOutputStreamContext.getStatistics(); + this.path = abfsOutputStreamContext.getPath(); + this.position = abfsOutputStreamContext.getPosition(); this.closed = false; this.supportFlush = abfsOutputStreamContext.isEnableFlush(); this.disableOutputStreamFlush = abfsOutputStreamContext @@ -140,7 +141,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); - this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.numOfAppendsToServerSinceLastFlush = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); @@ -157,23 +157,20 @@ public class AbfsOutputStream extends OutputStream implements Syncable, this.lease = abfsOutputStreamContext.getLease(); this.leaseId = abfsOutputStreamContext.getLeaseId(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + this.executorService = + MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService()); this.cachedSasToken = new CachedSASToken( abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); - if (outputStreamStatistics != null) { - this.ioStatistics = outputStreamStatistics.getIOStatistics(); - } this.outputStreamId = createOutputStreamId(); - this.tracingContext = new TracingContext(tracingContext); + this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext()); this.tracingContext.setStreamID(outputStreamId); this.tracingContext.setOperation(FSOperationType.WRITE); + this.ioStatistics = outputStreamStatistics.getIOStatistics(); + this.blockFactory = abfsOutputStreamContext.getBlockFactory(); + this.blockSize = bufferSize; + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); } private String createOutputStreamId() { @@ -219,10 +216,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, @Override public synchronized void write(final byte[] data, final int off, final int length) throws IOException { + // validate if data is not null and index out of bounds. + DataBlocks.validateWriteArgs(data, off, length); maybeThrowLastError(); - Preconditions.checkArgument(data != null, "null data"); - if (off < 0 || length < 0 || length > data.length - off) { throw new IndexOutOfBoundsException(); } @@ -230,29 +227,184 @@ public class AbfsOutputStream extends OutputStream implements Syncable, if (hasLease() && isLeaseFreed()) { throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE); } + DataBlocks.DataBlock block = createBlockIfNeeded(); + int written = block.write(data, off, length); + int remainingCapacity = block.remainingCapacity(); - int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; - int numberOfBytesToWrite = length; - - while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + if (written < length) { + // Number of bytes to write is more than the data block capacity, + // trigger an upload and then write on the next block. + LOG.debug("writing more data than block capacity -triggering upload"); + uploadCurrentBlock(); + // tail recursion is mildly expensive, but given buffer sizes must be MB. + // it's unlikely to recurse very deeply. + this.write(data, off + written, length - written); + } else { + if (remainingCapacity == 0) { + // the whole buffer is done, trigger an upload + uploadCurrentBlock(); } - - writableBytes = bufferSize - bufferIndex; } incrementWriteOps(); } + /** + * Demand create a destination block. + * + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized DataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockCount++; + activeBlock = blockFactory + .create(blockCount, this.blockSize, outputStreamStatistics); + } + return activeBlock; + } + + /** + * Start an asynchronous upload of the current block. + * + * @throws IOException Problems opening the destination for upload, + * initializing the upload, or if a previous operation has failed. + */ + private synchronized void uploadCurrentBlock() throws IOException { + checkState(hasActiveBlock(), "No active block"); + LOG.debug("Writing block # {}", blockCount); + try { + uploadBlockAsync(getActiveBlock(), false, false); + } finally { + // set the block to null, so the next write will create a new block. + clearActiveBlock(); + } + } + + /** + * Upload a block of data. + * This will take the block. + * + * @param blockToUpload block to upload. + * @throws IOException upload failure + */ + private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, + boolean isFlush, boolean isClose) + throws IOException { + if (this.isAppendBlob) { + writeAppendBlobCurrentBufferToService(); + return; + } + if (!blockToUpload.hasData()) { + return; + } + numOfAppendsToServerSinceLastFlush++; + + final int bytesLength = blockToUpload.dataSize(); + final long offset = position; + position += bytesLength; + outputStreamStatistics.bytesToUpload(bytesLength); + outputStreamStatistics.writeCurrentBuffer(); + DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload(); + final Future job = + executorService.submit(() -> { + AbfsPerfTracker tracker = + client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AppendRequestParameters.Mode + mode = APPEND_MODE; + if (isFlush & isClose) { + mode = FLUSH_CLOSE_MODE; + } else if (isFlush) { + mode = FLUSH_MODE; + } + /* + * Parameters Required for an APPEND call. + * offset(here) - refers to the position in the file. + * bytesLength - Data to be uploaded from the block. + * mode - If it's append, flush or flush_close. + * leaseId - The AbfsLeaseId for this request. + */ + AppendRequestParameters reqParams = new AppendRequestParameters( + offset, 0, bytesLength, mode, false, leaseId); + AbfsRestOperation op = + client.append(path, blockUploadData.toByteArray(), reqParams, + cachedSasToken.get(), new TracingContext(tracingContext)); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()); + perfInfo.registerSuccess(true); + outputStreamStatistics.uploadSuccessful(bytesLength); + return null; + } finally { + IOUtils.close(blockUploadData); + } + }); + writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + + /** + * A method to set the lastError if an exception is caught. + * @param ex Exception caught. + * @throws IOException Throws the lastError. + */ + private void failureWhileSubmit(Exception ex) throws IOException { + if (ex instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex).getStatusCode() + == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + if (ex instanceof IOException) { + lastError = (IOException) ex; + } else { + lastError = new IOException(ex); + } + throw lastError; + } + + /** + * Synchronized accessor to the active block. + * + * @return the active block; null if there isn't one. + */ + private synchronized DataBlocks.DataBlock getActiveBlock() { + return activeBlock; + } + + /** + * Predicate to query whether or not there is an active block. + * + * @return true if there is an active block. + */ + private synchronized boolean hasActiveBlock() { + return activeBlock != null; + } + + /** + * Is there an active block and is there any data in it to upload? + * + * @return true if there is some data to upload in an active block else false. + */ + private boolean hasActiveBlockDataToUpload() { + return hasActiveBlock() && getActiveBlock().hasData(); + } + + /** + * Clear the active block. + */ + private void clearActiveBlock() { + if (activeBlock != null) { + LOG.debug("Clearing active block"); + } + synchronized (this) { + activeBlock = null; + } + } + /** * Increment Write Operations. */ @@ -335,7 +487,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, try { flushInternal(true); - threadExecutor.shutdown(); } catch (IOException e) { // Problems surface in try-with-resources clauses if // the exception thrown in a close == the one already thrown @@ -352,9 +503,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, bufferIndex = 0; closed = true; writeOperations.clear(); - byteBufferPool = null; - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); + if (hasActiveBlock()) { + clearActiveBlock(); } } LOG.debug("Closing AbfsOutputStream : {}", this); @@ -368,19 +518,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, && enableSmallWriteOptimization && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes && (writeOperations.size() == 0) // double checking no appends in progress - && (bufferIndex > 0)) { // there is some data that is pending to be written + && hasActiveBlockDataToUpload()) { // there is + // some data that is pending to be written smallWriteOptimizedflushInternal(isClose); return; } - writeCurrentBufferToService(); + if (hasActiveBlockDataToUpload()) { + uploadCurrentBlock(); + } flushWrittenBytesToService(isClose); numOfAppendsToServerSinceLastFlush = 0; } private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException { // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush - writeCurrentBufferToService(true, isClose); + uploadBlockAsync(getActiveBlock(), true, isClose); waitForAppendsToComplete(); shrinkWriteOperationQueue(); maybeThrowLastError(); @@ -389,131 +542,60 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private synchronized void flushInternalAsync() throws IOException { maybeThrowLastError(); - writeCurrentBufferToService(); + if (hasActiveBlockDataToUpload()) { + uploadCurrentBlock(); + } + waitForAppendsToComplete(); flushWrittenBytesToServiceAsync(); } + /** + * Appending the current active data block to service. Clearing the active + * data block and releasing all buffered data. + * @throws IOException if there is any failure while starting an upload for + * the dataBlock or while closing the BlockUploadData. + */ private void writeAppendBlobCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + DataBlocks.DataBlock activeBlock = getActiveBlock(); + // No data, return. + if (!hasActiveBlockDataToUpload()) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - if (outputStreamStatistics != null) { - outputStreamStatistics.writeCurrentBuffer(); - outputStreamStatistics.bytesToUpload(bytesLength); - } - buffer = byteBufferPool.getBuffer(false, bufferSize).array(); - bufferIndex = 0; + + final int bytesLength = activeBlock.dataSize(); + DataBlocks.BlockUploadData uploadData = activeBlock.startUpload(); + clearActiveBlock(); + outputStreamStatistics.writeCurrentBuffer(); + outputStreamStatistics.bytesToUpload(bytesLength); final long offset = position; position += bytesLength; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { + "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, bytesLength, APPEND_MODE, true, leaseId); - AbfsRestOperation op = client - .append(path, bytes, reqParams, cachedSasToken.get(), - new TracingContext(tracingContext)); + AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams, + cachedSasToken.get(), new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); - if (outputStreamStatistics != null) { - outputStreamStatistics.uploadSuccessful(bytesLength); - } + outputStreamStatistics.uploadSuccessful(bytesLength); + perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); perfInfo.registerSuccess(true); return; } catch (Exception ex) { - if (ex instanceof AbfsRestOperationException) { - if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(ex.getMessage()); - } - } - if (ex instanceof AzureBlobFileSystemException) { - ex = (AzureBlobFileSystemException) ex; - } - lastError = new IOException(ex); - throw lastError; + outputStreamStatistics.uploadFailed(bytesLength); + failureWhileSubmit(ex); + } finally { + IOUtils.close(uploadData); } } - private synchronized void writeCurrentBufferToService() throws IOException { - writeCurrentBufferToService(false, false); - } - - private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException { - if (this.isAppendBlob) { - writeAppendBlobCurrentBufferToService(); - return; - } - - if (bufferIndex == 0) { - return; - } - numOfAppendsToServerSinceLastFlush++; - - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - if (outputStreamStatistics != null) { - outputStreamStatistics.writeCurrentBuffer(); - outputStreamStatistics.bytesToUpload(bytesLength); - } - buffer = byteBufferPool.getBuffer(false, bufferSize).array(); - bufferIndex = 0; - final long offset = position; - position += bytesLength; - - if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { - //Tracking time spent on waiting for task to complete. - if (outputStreamStatistics != null) { - try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) { - waitForTaskToComplete(); - } - } else { - waitForTaskToComplete(); - } - } - final Future job = completionService.submit(() -> { - AbfsPerfTracker tracker = client.getAbfsPerfTracker(); - try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { - AppendRequestParameters.Mode - mode = APPEND_MODE; - if (isFlush & isClose) { - mode = FLUSH_CLOSE_MODE; - } else if (isFlush) { - mode = FLUSH_MODE; - } - AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); - AbfsRestOperation op = client.append(path, bytes, reqParams, - cachedSasToken.get(), new TracingContext(tracingContext)); - cachedSasToken.update(op.getSasToken()); - perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); - perfInfo.registerSuccess(true); - return null; - } - }); - - if (outputStreamStatistics != null) { - if (job.isCancelled()) { - outputStreamStatistics.uploadFailed(bytesLength); - } else { - outputStreamStatistics.uploadSuccessful(bytesLength); - } - } - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - } - private synchronized void waitForAppendsToComplete() throws IOException { for (WriteOperation writeOperation : writeOperations) { try { writeOperation.task.get(); } catch (Exception ex) { + outputStreamStatistics.uploadFailed(writeOperation.length); if (ex.getCause() instanceof AbfsRestOperationException) { if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ex.getMessage()); @@ -563,7 +645,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, throw new FileNotFoundException(ex.getMessage()); } } - throw new IOException(ex); + lastError = new IOException(ex); + throw lastError; } this.lastFlushOffset = offset; } @@ -574,14 +657,14 @@ public class AbfsOutputStream extends OutputStream implements Syncable, */ private synchronized void shrinkWriteOperationQueue() throws IOException { try { - while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { - writeOperations.peek().task.get(); - lastTotalAppendOffset += writeOperations.peek().length; + WriteOperation peek = writeOperations.peek(); + while (peek != null && peek.task.isDone()) { + peek.task.get(); + lastTotalAppendOffset += peek.length; writeOperations.remove(); + peek = writeOperations.peek(); // Incrementing statistics to indicate queue has been shrunk. - if (outputStreamStatistics != null) { - outputStreamStatistics.queueShrunk(); - } + outputStreamStatistics.queueShrunk(); } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -593,26 +676,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, } } - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - // for AppendBLob, jobs are not submitted to completion service - if (isAppendBlob) { - completed = true; - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - private static class WriteOperation { private final Future task; private final long startOffset; @@ -631,7 +694,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, @VisibleForTesting public synchronized void waitForPendingUploads() throws IOException { - waitForTaskToComplete(); + waitForAppendsToComplete(); } /** @@ -695,12 +758,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); - if (outputStreamStatistics != null) { - sb.append("AbfsOutputStream@").append(this.hashCode()); - sb.append("){"); - sb.append(outputStreamStatistics.toString()); - sb.append("}"); - } + sb.append("AbfsOutputStream@").append(this.hashCode()); + sb.append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); return sb.toString(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 48f6f540810..ad303823e0c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -18,6 +18,12 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.store.DataBlocks; + /** * Class to hold extra output stream configs. */ @@ -41,6 +47,22 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private AbfsLease lease; + private DataBlocks.BlockFactory blockFactory; + + private int blockOutputActiveBlocks; + + private AbfsClient client; + + private long position; + + private FileSystem.Statistics statistics; + + private String path; + + private ExecutorService executorService; + + private TracingContext tracingContext; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -79,11 +101,64 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } - public AbfsOutputStreamContext build() { - // Validation of parameters to be done here. + public AbfsOutputStreamContext withBlockFactory( + final DataBlocks.BlockFactory blockFactory) { + this.blockFactory = blockFactory; return this; } + public AbfsOutputStreamContext withBlockOutputActiveBlocks( + final int blockOutputActiveBlocks) { + this.blockOutputActiveBlocks = blockOutputActiveBlocks; + return this; + } + + + public AbfsOutputStreamContext withClient( + final AbfsClient client) { + this.client = client; + return this; + } + + public AbfsOutputStreamContext withPosition( + final long position) { + this.position = position; + return this; + } + + public AbfsOutputStreamContext withFsStatistics( + final FileSystem.Statistics statistics) { + this.statistics = statistics; + return this; + } + + public AbfsOutputStreamContext withPath( + final String path) { + this.path = path; + return this; + } + + public AbfsOutputStreamContext withExecutorService( + final ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public AbfsOutputStreamContext withTracingContext( + final TracingContext tracingContext) { + this.tracingContext = tracingContext; + return this; + } + + public AbfsOutputStreamContext build() { + // Validation of parameters to be done here. + if (streamStatistics == null) { + streamStatistics = new AbfsOutputStreamStatisticsImpl(); + } + return this; + } + + public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount( final int writeMaxConcurrentRequestCount) { this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount; @@ -143,4 +218,36 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { } return this.lease.getLeaseID(); } + + public DataBlocks.BlockFactory getBlockFactory() { + return blockFactory; + } + + public int getBlockOutputActiveBlocks() { + return blockOutputActiveBlocks; + } + + public AbfsClient getClient() { + return client; + } + + public FileSystem.Statistics getStatistics() { + return statistics; + } + + public String getPath() { + return path; + } + + public long getPosition() { + return position; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public TracingContext getTracingContext() { + return tracingContext; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java index c57d5d9bcaa..a9e088c025b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -22,12 +22,14 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.store.BlockUploadStatistics; /** * Interface for {@link AbfsOutputStream} statistics. */ @InterfaceStability.Unstable -public interface AbfsOutputStreamStatistics extends IOStatisticsSource { +public interface AbfsOutputStreamStatistics extends IOStatisticsSource, + BlockUploadStatistics { /** * Number of bytes to be uploaded. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java index b07cf28a710..cb054e2915d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -42,7 +42,9 @@ public class AbfsOutputStreamStatisticsImpl StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL, StreamStatisticNames.BYTES_UPLOAD_FAILED, StreamStatisticNames.QUEUE_SHRUNK_OPS, - StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS + StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS, + StreamStatisticNames.BLOCKS_ALLOCATED, + StreamStatisticNames.BLOCKS_RELEASED ) .withDurationTracking( StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, @@ -60,6 +62,11 @@ public class AbfsOutputStreamStatisticsImpl private final AtomicLong writeCurrentBufferOps = ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS); + private final AtomicLong blocksAllocated = + ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_ALLOCATED); + private final AtomicLong blocksReleased = + ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_RELEASED); + /** * Records the need to upload bytes and increments the total bytes that * needs to be uploaded. @@ -133,6 +140,22 @@ public class AbfsOutputStreamStatisticsImpl writeCurrentBufferOps.incrementAndGet(); } + /** + * Increment the counter to indicate a block has been allocated. + */ + @Override + public void blockAllocated() { + blocksAllocated.incrementAndGet(); + } + + /** + * Increment the counter to indicate a block has been released. + */ + @Override + public void blockReleased() { + blocksReleased.incrementAndGet(); + } + /** * {@inheritDoc} * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java index 82907c57475..231c54825f2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java @@ -20,6 +20,8 @@ package org.apache.hadoop.fs.azure.integration; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; + /** * Constants for the Azure tests. */ @@ -175,4 +177,15 @@ public interface AzureTestConstants { * Base directory for page blobs. */ Path PAGE_BLOB_DIR = new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY); + + /** + * Huge file for testing AbfsOutputStream uploads: {@value} + */ + String AZURE_SCALE_HUGE_FILE_UPLOAD = AZURE_SCALE_TEST + "huge.upload"; + + /** + * Default value for Huge file to be tested for AbfsOutputStream uploads: + * {@value} + */ + int AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT = 2 * DEFAULT_WRITE_BUFFER_SIZE; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index d4e44c37f63..56d553819fe 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -488,7 +488,7 @@ public abstract class AbstractAbfsIntegrationTest extends */ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( AzureBlobFileSystem fs, - Path path) throws AzureBlobFileSystemException { + Path path) throws IOException { AzureBlobFileSystemStore abfss = fs.getAbfsStore(); abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java new file mode 100644 index 00000000000..510e0a7596b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.store.DataBlocks; + +import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD; +import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT; +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume; +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.getTestPropertyInt; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; + +/** + * Testing Huge file for AbfsOutputStream. + */ +@RunWith(Parameterized.class) +public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest { + private static final int ONE_MB = 1024 * 1024; + private static final int EIGHT_MB = 8 * ONE_MB; + // Configurable huge file upload: "fs.azure.scale.test.huge.upload", + // default is 2 * DEFAULT_WRITE_BUFFER_SIZE(8M). + private static final int HUGE_FILE; + + // Set the HUGE_FILE. + static { + HUGE_FILE = getTestPropertyInt(new Configuration(), + AZURE_SCALE_HUGE_FILE_UPLOAD, AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT); + } + + // Writing block size to be used in this test. + private int size; + // Block Factory to be used in this test. + private String blockFactoryName; + + @Parameterized.Parameters(name = "size [{0}] ; blockFactoryName " + + "[{1}]") + public static Collection sizes() { + return Arrays.asList(new Object[][] { + { DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BUFFER_DISK }, + { HUGE_FILE, DataBlocks.DATA_BLOCKS_BUFFER_DISK }, + { DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BUFFER_ARRAY }, + { HUGE_FILE, DataBlocks.DATA_BLOCKS_BUFFER_ARRAY }, + { DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BYTEBUFFER }, + { HUGE_FILE, DataBlocks.DATA_BLOCKS_BYTEBUFFER }, + }); + } + + public ITestAbfsHugeFiles(int size, String blockFactoryName) + throws Exception { + this.size = size; + this.blockFactoryName = blockFactoryName; + } + + @Before + public void setUp() throws Exception { + Configuration configuration = getRawConfiguration(); + configuration.unset(DATA_BLOCKS_BUFFER); + configuration.set(DATA_BLOCKS_BUFFER, blockFactoryName); + super.setup(); + } + + /** + * Testing Huge files written at once on AbfsOutputStream. + */ + @Test + public void testHugeFileWrite() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path(getMethodName()); + final byte[] b = new byte[size]; + new Random().nextBytes(b); + try (FSDataOutputStream out = fs.create(filePath)) { + out.write(b); + } + // Verify correct length was uploaded. Don't want to verify contents + // here, as this would increase the test time significantly. + assertEquals("Mismatch in content length of file uploaded", size, + fs.getFileStatus(filePath).getLen()); + } + + /** + * Testing Huge files written in chunks of 8M in lots of writes. + */ + @Test + public void testLotsOfWrites() throws IOException { + assume("If the size isn't a multiple of 8M this test would not pass, so " + + "skip", + size % EIGHT_MB == 0); + AzureBlobFileSystem fs = getFileSystem(); + Path filePath = path(getMethodName()); + final byte[] b = new byte[size]; + new Random().nextBytes(b); + try (FSDataOutputStream out = fs.create(filePath)) { + int offset = 0; + for (int i = 0; i < size / EIGHT_MB; i++) { + out.write(b, offset, EIGHT_MB); + offset += EIGHT_MB; + } + } + // Verify correct length was uploaded. Don't want to verify contents + // here, as this would increase the test time significantly. + assertEquals("Mismatch in content length of file uploaded", size, + fs.getFileStatus(filePath).getLen()); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java index 2894abe4d0e..07b8a6f2bb2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java @@ -48,7 +48,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED; -import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED; @@ -292,18 +291,14 @@ public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest { final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1); fs.mkdirs(testFilePath.getParent()); - FSDataOutputStream out = fs.create(testFilePath); - out.write(0); - Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed()); + try (FSDataOutputStream out = fs.create(testFilePath)) { + out.write(0); + Assert.assertFalse("Store leases should exist", + fs.getAbfsStore().areLeasesFreed()); + } fs.close(); Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); - LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT - : ERR_LEASE_EXPIRED, () -> { - out.close(); - return "Expected exception on close after closed FS but got " + out; - }); - LambdaTestUtils.intercept(RejectedExecutionException.class, () -> { try (FSDataOutputStream out2 = fs.append(testFilePath)) { } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index f01c81b74ee..0673e387bfb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -21,6 +21,8 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.util.Arrays; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -32,7 +34,14 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; +import org.apache.hadoop.fs.store.DataBlocks; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.refEq; @@ -58,12 +67,26 @@ public final class TestAbfsOutputStream { private final String accountKey1 = globalKey + "." + accountName1; private final String accountValue1 = "one"; - private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize, - boolean isFlushEnabled, - boolean disableOutputStreamFlush, - boolean isAppendBlob) throws IOException, IllegalAccessException { + private AbfsOutputStreamContext populateAbfsOutputStreamContext( + int writeBufferSize, + boolean isFlushEnabled, + boolean disableOutputStreamFlush, + boolean isAppendBlob, + AbfsClient client, + String path, + TracingContext tracingContext, + ExecutorService executorService) throws IOException, + IllegalAccessException { AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(), accountName1); + String blockFactoryName = + abfsConf.getRawConfiguration().getTrimmed(DATA_BLOCKS_BUFFER, + DATA_BLOCKS_BUFFER_DEFAULT); + DataBlocks.BlockFactory blockFactory = + DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR, + abfsConf.getRawConfiguration(), + blockFactoryName); + return new AbfsOutputStreamContext(2) .withWriteBufferSize(writeBufferSize) .enableFlush(isFlushEnabled) @@ -72,6 +95,11 @@ public final class TestAbfsOutputStream { .withAppendBlob(isAppendBlob) .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue()) + .withClient(client) + .withPath(path) + .withTracingContext(tracingContext) + .withExecutorService(executorService) + .withBlockFactory(blockFactory) .build(); } @@ -95,11 +123,18 @@ public final class TestAbfsOutputStream { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null)); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null), + createExecutorService(abfsConf))); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); out.write(b); @@ -149,9 +184,16 @@ public final class TestAbfsOutputStream { when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(TracingContext.class))).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), - tracingContext); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + tracingContext, + createExecutorService(abfsConf))); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); @@ -216,9 +258,16 @@ public final class TestAbfsOutputStream { when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), - tracingContext); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + tracingContext, + createExecutorService(abfsConf))); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -280,11 +329,18 @@ public final class TestAbfsOutputStream { when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null)); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null), + createExecutorService(abfsConf))); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -328,11 +384,18 @@ public final class TestAbfsOutputStream { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true), - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), - null)); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + true, + client, + PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), + null), + createExecutorService(abfsConf))); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -380,10 +443,18 @@ public final class TestAbfsOutputStream { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), - null)); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), + null), + createExecutorService(abfsConf))); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -441,11 +512,18 @@ public final class TestAbfsOutputStream { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, - populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null)); + AbfsOutputStream out = new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, + true, + false, + false, + client, + PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null), + createExecutorService(abfsConf))); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -469,4 +547,21 @@ public final class TestAbfsOutputStream { verify(client, times(2)).append( eq(PATH), any(byte[].class), any(), any(), any(TracingContext.class)); } + + /** + * Method to create an executor Service for AbfsOutputStream. + * @param abfsConf Configuration. + * @return ExecutorService. + */ + private ExecutorService createExecutorService( + AbfsConfiguration abfsConf) { + ExecutorService executorService = + new SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance( + abfsConf.getWriteMaxConcurrentRequestCount(), + abfsConf.getMaxWriteRequestsToQueue(), + 10L, TimeUnit.SECONDS, + "abfs-test-bounded"), + BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT, true); + return executorService; + } }