diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 7b5918e935c..6b4e2b4240e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -461,4 +461,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IOStatistics logging level. */
public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT
= IOSTATISTICS_LOGGING_LEVEL_DEBUG;
+
+ /**
+ * default hadoop temp dir on local system: {@value}.
+ */
+ public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index bbb8517118e..7e9137294c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -358,6 +358,18 @@ public final class StreamStatisticNames {
public static final String REMOTE_BYTES_READ
= "remote_bytes_read";
+ /**
+ * Total number of Data blocks allocated by an outputStream.
+ */
+ public static final String BLOCKS_ALLOCATED
+ = "blocks_allocated";
+
+ /**
+ * Total number of Data blocks released by an outputStream.
+ */
+ public static final String BLOCKS_RELEASED
+ = "blocks_released";
+
private StreamStatisticNames() {
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java
new file mode 100644
index 00000000000..bf7cbbbc5d5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+public interface BlockUploadStatistics {
+
+ /**
+ * A block has been allocated.
+ */
+ void blockAllocated();
+
+ /**
+ * A block has been released.
+ */
+ void blockReleased();
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
new file mode 100644
index 00000000000..5c7c9378f16
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * A class to provide disk, byteBuffer and byteArray option for Filesystem
+ * OutputStreams.
+ *
+ *
+ * Disk: Uses Disk space to write the blocks. Is suited best to avoid
+ * OutOfMemory Errors in Java heap space.
+ *
+ *
+ * byteBuffer: Uses DirectByteBuffer to allocate memory off-heap to
+ * provide faster writing of DataBlocks with some risk of running
+ * OutOfMemory.
+ *
+ *
+ * byteArray: Uses byte[] to write a block of data. On heap and does have
+ * risk of running OutOfMemory fairly easily.
+ *
+ *
+ *
+ * Implementation of DataBlocks taken from HADOOP-13560 to support huge file
+ * uploads in S3A with different options rather than one.
+ */
+public final class DataBlocks {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataBlocks.class);
+
+ /**
+ * Buffer blocks to disk.
+ * Capacity is limited to available disk space.
+ */
+ public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
+
+ /**
+ * Use a byte buffer.
+ */
+ public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer";
+
+ /**
+ * Use an in-memory array. Fast but will run of heap rapidly.
+ */
+ public static final String DATA_BLOCKS_BUFFER_ARRAY = "array";
+
+ private DataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ *
+ * @param b byte array containing data.
+ * @param off offset in array where to start.
+ * @param len number of bytes to be written.
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ public static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ *
+ * @param keyToBufferDir Key to buffer directory config for a FS.
+ * @param configuration factory configurations.
+ * @param name factory name -the option from {@link CommonConfigurationKeys}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ public static BlockFactory createFactory(String keyToBufferDir,
+ Configuration configuration,
+ String name) {
+ LOG.debug("Creating DataFactory of type : {}", name);
+ switch (name) {
+ case DATA_BLOCKS_BUFFER_ARRAY:
+ return new ArrayBlockFactory(keyToBufferDir, configuration);
+ case DATA_BLOCKS_BUFFER_DISK:
+ return new DiskBlockFactory(keyToBufferDir, configuration);
+ case DATA_BLOCKS_BYTEBUFFER:
+ return new ByteBufferBlockFactory(keyToBufferDir, configuration);
+ default:
+ throw new IllegalArgumentException("Unsupported block buffer" +
+ " \"" + name + '"');
+ }
+ }
+
+ /**
+ * The output information for an upload.
+ * It can be one of a file, an input stream or a byteArray.
+ * {@link #toByteArray()} method to be used to convert the data into byte
+ * array to be done in this class as well.
+ * When closed, any stream is closed. Any source file is untouched.
+ */
+ public static final class BlockUploadData implements Closeable {
+ private final File file;
+ private InputStream uploadStream;
+ private byte[] byteArray;
+ private boolean isClosed;
+
+ /**
+ * Constructor for byteArray upload data block. File and uploadStream
+ * would be null.
+ *
+ * @param byteArray byteArray used to construct BlockUploadData.
+ */
+ public BlockUploadData(byte[] byteArray) {
+ this.file = null;
+ this.uploadStream = null;
+
+ this.byteArray = requireNonNull(byteArray);
+ }
+
+ /**
+ * File constructor; input stream and byteArray will be null.
+ *
+ * @param file file to upload
+ */
+ BlockUploadData(File file) {
+ Preconditions.checkArgument(file.exists(), "No file: %s", file);
+ this.file = file;
+ this.uploadStream = null;
+ this.byteArray = null;
+ }
+
+ /**
+ * Stream constructor, file and byteArray field will be null.
+ *
+ * @param uploadStream stream to upload.
+ */
+ BlockUploadData(InputStream uploadStream) {
+ requireNonNull(uploadStream, "rawUploadStream");
+ this.uploadStream = uploadStream;
+ this.file = null;
+ this.byteArray = null;
+ }
+
+ /**
+ * Predicate: does this instance contain a file reference.
+ *
+ * @return true if there is a file.
+ */
+ boolean hasFile() {
+ return file != null;
+ }
+
+ /**
+ * Get the file, if there is one.
+ *
+ * @return the file for uploading, or null.
+ */
+ File getFile() {
+ return file;
+ }
+
+ /**
+ * Get the raw upload stream, if the object was
+ * created with one.
+ *
+ * @return the upload stream or null.
+ */
+ InputStream getUploadStream() {
+ return uploadStream;
+ }
+
+ /**
+ * Convert to a byte array.
+ * If the data is stored in a file, it will be read and returned.
+ * If the data was passed in via an input stream (which happens if the
+ * data is stored in a bytebuffer) then it will be converted to a byte
+ * array -which will then be cached for any subsequent use.
+ *
+ * @return byte[] after converting the uploadBlock.
+ * @throws IOException throw if an exception is caught while reading
+ * File/InputStream or closing InputStream.
+ */
+ public byte[] toByteArray() throws IOException {
+ Preconditions.checkState(!isClosed, "Block is closed");
+ if (byteArray != null) {
+ return byteArray;
+ }
+ if (file != null) {
+ // Need to save byteArray here so that we don't read File if
+ // byteArray() is called more than once on the same file.
+ byteArray = FileUtils.readFileToByteArray(file);
+ return byteArray;
+ }
+ byteArray = IOUtils.toByteArray(uploadStream);
+ IOUtils.close(uploadStream);
+ uploadStream = null;
+ return byteArray;
+ }
+
+ /**
+ * Close: closes any upload stream and byteArray provided in the
+ * constructor.
+ *
+ * @throws IOException inherited exception.
+ */
+ @Override
+ public void close() throws IOException {
+ isClosed = true;
+ cleanupWithLogger(LOG, uploadStream);
+ byteArray = null;
+ if (file != null) {
+ LOG.debug("File deleted in BlockUploadData close: {}", file.delete());
+ }
+ }
+ }
+
+ /**
+ * Base class for block factories.
+ */
+ public static abstract class BlockFactory implements Closeable {
+
+ private final String keyToBufferDir;
+ private final Configuration conf;
+
+ protected BlockFactory(String keyToBufferDir, Configuration conf) {
+ this.keyToBufferDir = keyToBufferDir;
+ this.conf = conf;
+ }
+
+ /**
+ * Create a block.
+ *
+ * @param index index of block
+ * @param limit limit of the block.
+ * @param statistics stats to work with
+ * @return a new block.
+ */
+ public abstract DataBlock create(long index, int limit,
+ BlockUploadStatistics statistics)
+ throws IOException;
+
+ /**
+ * Implement any close/cleanup operation.
+ * Base class is a no-op.
+ *
+ * @throws IOException Inherited exception; implementations should
+ * avoid raising it.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Configuration.
+ *
+ * @return config passed to create the factory.
+ */
+ protected Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Key to Buffer Directory config for a FS instance.
+ *
+ * @return String containing key to Buffer dir.
+ */
+ public String getKeyToBufferDir() {
+ return keyToBufferDir;
+ }
+ }
+
+ /**
+ * This represents a block being uploaded.
+ */
+ public static abstract class DataBlock implements Closeable {
+
+ enum DestState {Writing, Upload, Closed}
+
+ private volatile DestState state = Writing;
+ private final long index;
+ private final BlockUploadStatistics statistics;
+
+ protected DataBlock(long index,
+ BlockUploadStatistics statistics) {
+ this.index = index;
+ this.statistics = statistics;
+ }
+
+ /**
+ * Atomically enter a state, verifying current state.
+ *
+ * @param current current state. null means "no check"
+ * @param next next state
+ * @throws IllegalStateException if the current state is not as expected
+ */
+ protected synchronized final void enterState(DestState current,
+ DestState next)
+ throws IllegalStateException {
+ verifyState(current);
+ LOG.debug("{}: entering state {}", this, next);
+ state = next;
+ }
+
+ /**
+ * Verify that the block is in the declared state.
+ *
+ * @param expected expected state.
+ * @throws IllegalStateException if the DataBlock is in the wrong state
+ */
+ protected final void verifyState(DestState expected)
+ throws IllegalStateException {
+ if (expected != null && state != expected) {
+ throw new IllegalStateException("Expected stream state " + expected
+ + " -but actual state is " + state + " in " + this);
+ }
+ }
+
+ /**
+ * Current state.
+ *
+ * @return the current state.
+ */
+ final DestState getState() {
+ return state;
+ }
+
+ /**
+ * Return the current data size.
+ *
+ * @return the size of the data.
+ */
+ public abstract int dataSize();
+
+ /**
+ * Predicate to verify that the block has the capacity to write
+ * the given set of bytes.
+ *
+ * @param bytes number of bytes desired to be written.
+ * @return true if there is enough space.
+ */
+ abstract boolean hasCapacity(long bytes);
+
+ /**
+ * Predicate to check if there is data in the block.
+ *
+ * @return true if there is
+ */
+ public boolean hasData() {
+ return dataSize() > 0;
+ }
+
+ /**
+ * The remaining capacity in the block before it is full.
+ *
+ * @return the number of bytes remaining.
+ */
+ public abstract int remainingCapacity();
+
+ /**
+ * Write a series of bytes from the buffer, from the offset.
+ * Returns the number of bytes written.
+ * Only valid in the state {@code Writing}.
+ * Base class verifies the state but does no writing.
+ *
+ * @param buffer buffer.
+ * @param offset offset.
+ * @param length length of write.
+ * @return number of bytes written.
+ * @throws IOException trouble
+ */
+ public int write(byte[] buffer, int offset, int length) throws IOException {
+ verifyState(Writing);
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(offset >= 0, "offset is negative");
+ Preconditions.checkArgument(
+ !(buffer.length - offset < length),
+ "buffer shorter than amount of data to write");
+ return 0;
+ }
+
+ /**
+ * Flush the output.
+ * Only valid in the state {@code Writing}.
+ * In the base class, this is a no-op
+ *
+ * @throws IOException any IO problem.
+ */
+ public void flush() throws IOException {
+ verifyState(Writing);
+ }
+
+ /**
+ * Switch to the upload state and return a stream for uploading.
+ * Base class calls {@link #enterState(DestState, DestState)} to
+ * manage the state machine.
+ *
+ * @return the stream.
+ * @throws IOException trouble
+ */
+ public BlockUploadData startUpload() throws IOException {
+ LOG.debug("Start datablock[{}] upload", index);
+ enterState(Writing, Upload);
+ return null;
+ }
+
+ /**
+ * Enter the closed state.
+ *
+ * @return true if the class was in any other state, implying that
+ * the subclass should do its close operations.
+ */
+ protected synchronized boolean enterClosedState() {
+ if (!state.equals(Closed)) {
+ enterState(null, Closed);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enterClosedState()) {
+ LOG.debug("Closed {}", this);
+ innerClose();
+ }
+ }
+
+ /**
+ * Inner close logic for subclasses to implement.
+ */
+ protected void innerClose() throws IOException {
+
+ }
+
+ /**
+ * A block has been allocated.
+ */
+ protected void blockAllocated() {
+ if (statistics != null) {
+ statistics.blockAllocated();
+ }
+ }
+
+ /**
+ * A block has been released.
+ */
+ protected void blockReleased() {
+ if (statistics != null) {
+ statistics.blockReleased();
+ }
+ }
+
+ protected BlockUploadStatistics getStatistics() {
+ return statistics;
+ }
+
+ public long getIndex() {
+ return index;
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Use byte arrays on the heap for storage.
+ */
+ static class ArrayBlockFactory extends BlockFactory {
+
+ ArrayBlockFactory(String keyToBufferDir, Configuration conf) {
+ super(keyToBufferDir, conf);
+ }
+
+ @Override
+ public DataBlock create(long index, int limit,
+ BlockUploadStatistics statistics)
+ throws IOException {
+ return new ByteArrayBlock(0, limit, statistics);
+ }
+
+ }
+
+ static class DataBlockByteArrayOutputStream extends ByteArrayOutputStream {
+
+ DataBlockByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ /**
+ * InputStream backed by the internal byte array.
+ *
+ * @return ByteArrayInputStream instance.
+ */
+ ByteArrayInputStream getInputStream() {
+ ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count);
+ this.reset();
+ this.buf = null;
+ return bin;
+ }
+ }
+
+ /**
+ * Stream to memory via a {@code ByteArrayOutputStream}.
+ *
+ * It can consume a lot of heap space
+ * proportional to the mismatch between writes to the stream and
+ * the JVM-wide upload bandwidth to a Store's endpoint.
+ * The memory consumption can be limited by tuning the filesystem settings
+ * to restrict the number of queued/active uploads.
+ */
+
+ static class ByteArrayBlock extends DataBlock {
+ private DataBlockByteArrayOutputStream buffer;
+ private final int limit;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ ByteArrayBlock(long index,
+ int limit,
+ BlockUploadStatistics statistics) {
+ super(index, statistics);
+ this.limit = limit;
+ this.buffer = new DataBlockByteArrayOutputStream(limit);
+ blockAllocated();
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ *
+ * @return the amount of data available to upload.
+ */
+ @Override
+ public int dataSize() {
+ return dataSize != null ? dataSize : buffer.size();
+ }
+
+ @Override
+ public BlockUploadData startUpload() throws IOException {
+ super.startUpload();
+ dataSize = buffer.size();
+ ByteArrayInputStream bufferData = buffer.getInputStream();
+ buffer = null;
+ return new BlockUploadData(bufferData);
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return limit - dataSize();
+ }
+
+ @Override
+ public int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.write(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ blockReleased();
+ }
+
+ @Override
+ public String toString() {
+ return "ByteArrayBlock{"
+ + "index=" + getIndex() +
+ ", state=" + getState() +
+ ", limit=" + limit +
+ ", dataSize=" + dataSize +
+ '}';
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Stream via Direct ByteBuffers; these are allocated off heap
+ * via {@link DirectBufferPool}.
+ */
+
+ static class ByteBufferBlockFactory extends BlockFactory {
+
+ private final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+ ByteBufferBlockFactory(String keyToBufferDir, Configuration conf) {
+ super(keyToBufferDir, conf);
+ }
+
+ @Override public ByteBufferBlock create(long index, int limit,
+ BlockUploadStatistics statistics)
+ throws IOException {
+ return new ByteBufferBlock(index, limit, statistics);
+ }
+
+ private ByteBuffer requestBuffer(int limit) {
+ LOG.debug("Requesting buffer of size {}", limit);
+ buffersOutstanding.incrementAndGet();
+ return bufferPool.getBuffer(limit);
+ }
+
+ private void releaseBuffer(ByteBuffer buffer) {
+ LOG.debug("Releasing buffer");
+ bufferPool.returnBuffer(buffer);
+ buffersOutstanding.decrementAndGet();
+ }
+
+ /**
+ * Get count of outstanding buffers.
+ *
+ * @return the current buffer count.
+ */
+ public int getOutstandingBufferCount() {
+ return buffersOutstanding.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlockFactory{"
+ + "buffersOutstanding=" + buffersOutstanding +
+ '}';
+ }
+
+ /**
+ * A DataBlock which requests a buffer from pool on creation; returns
+ * it when it is closed.
+ */
+ class ByteBufferBlock extends DataBlock {
+ private ByteBuffer blockBuffer;
+ private final int bufferSize;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ /**
+ * Instantiate. This will request a ByteBuffer of the desired size.
+ *
+ * @param index block index.
+ * @param bufferSize buffer size.
+ * @param statistics statistics to update.
+ */
+ ByteBufferBlock(long index,
+ int bufferSize,
+ BlockUploadStatistics statistics) {
+ super(index, statistics);
+ this.bufferSize = bufferSize;
+ this.blockBuffer = requestBuffer(bufferSize);
+ blockAllocated();
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ *
+ * @return the amount of data available to upload.
+ */
+ @Override public int dataSize() {
+ return dataSize != null ? dataSize : bufferCapacityUsed();
+ }
+
+ @Override
+ public BlockUploadData startUpload() throws IOException {
+ super.startUpload();
+ dataSize = bufferCapacityUsed();
+ // set the buffer up from reading from the beginning
+ blockBuffer.limit(blockBuffer.position());
+ blockBuffer.position(0);
+ return new BlockUploadData(
+ new ByteBufferInputStream(dataSize, blockBuffer));
+ }
+
+ @Override
+ public boolean hasCapacity(long bytes) {
+ return bytes <= remainingCapacity();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return blockBuffer != null ? blockBuffer.remaining() : 0;
+ }
+
+ private int bufferCapacityUsed() {
+ return blockBuffer.capacity() - blockBuffer.remaining();
+ }
+
+ @Override
+ public int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ blockBuffer.put(b, offset, written);
+ return written;
+ }
+
+ /**
+ * Closing the block will release the buffer.
+ */
+ @Override
+ protected void innerClose() {
+ if (blockBuffer != null) {
+ blockReleased();
+ releaseBuffer(blockBuffer);
+ blockBuffer = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlock{"
+ + "index=" + getIndex() +
+ ", state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + bufferSize +
+ ", remainingCapacity=" + remainingCapacity() +
+ '}';
+ }
+
+ /**
+ * Provide an input stream from a byte buffer; supporting
+ * {@link #mark(int)}, which is required to enable replay of failed
+ * PUT attempts.
+ */
+ class ByteBufferInputStream extends InputStream {
+
+ private final int size;
+ private ByteBuffer byteBuffer;
+
+ ByteBufferInputStream(int size,
+ ByteBuffer byteBuffer) {
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
+
+ /**
+ * After the stream is closed, set the local reference to the byte
+ * buffer to null; this guarantees that future attempts to use
+ * stream methods will fail.
+ */
+ @Override
+ public synchronized void close() {
+ LOG.debug("ByteBufferInputStream.close() for {}",
+ ByteBufferBlock.super.toString());
+ byteBuffer = null;
+ }
+
+ /**
+ * Verify that the stream is open.
+ *
+ * @throws IOException if the stream is closed
+ */
+ private void verifyOpen() throws IOException {
+ if (byteBuffer == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+ public synchronized int read() throws IOException {
+ if (available() > 0) {
+ return byteBuffer.get() & 0xFF;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public synchronized long skip(long offset) throws IOException {
+ verifyOpen();
+ long newPos = position() + offset;
+ if (newPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (newPos > size) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ byteBuffer.position((int) newPos);
+ return newPos;
+ }
+
+ @Override
+ public synchronized int available() {
+ Preconditions.checkState(byteBuffer != null,
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ return byteBuffer.remaining();
+ }
+
+ /**
+ * Get the current buffer position.
+ *
+ * @return the buffer position
+ */
+ public synchronized int position() {
+ return byteBuffer.position();
+ }
+
+ /**
+ * Check if there is data left.
+ *
+ * @return true if there is data remaining in the buffer.
+ */
+ public synchronized boolean hasRemaining() {
+ return byteBuffer.hasRemaining();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ LOG.debug("mark at {}", position());
+ byteBuffer.mark();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ LOG.debug("reset");
+ byteBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Read in data.
+ *
+ * @param b destination buffer.
+ * @param offset offset within the buffer.
+ * @param length length of bytes to read.
+ * @throws EOFException if the position is negative
+ * @throws IndexOutOfBoundsException if there isn't space for the
+ * amount of data requested.
+ * @throws IllegalArgumentException other arguments are invalid.
+ */
+ @SuppressWarnings("NullableProblems")
+ public synchronized int read(byte[] b, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(b != null, "Null buffer");
+ if (b.length - offset < length) {
+ throw new IndexOutOfBoundsException(
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ + ": request length =" + length
+ + ", with offset =" + offset
+ + "; buffer capacity =" + (b.length - offset));
+ }
+ verifyOpen();
+ if (!hasRemaining()) {
+ return -1;
+ }
+
+ int toRead = Math.min(length, available());
+ byteBuffer.get(b, offset, toRead);
+ return toRead;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ByteBufferInputStream{");
+ sb.append("size=").append(size);
+ ByteBuffer buf = this.byteBuffer;
+ if (buf != null) {
+ sb.append(", available=").append(buf.remaining());
+ }
+ sb.append(", ").append(ByteBufferBlock.super.toString());
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Buffer blocks to disk.
+ */
+ static class DiskBlockFactory extends BlockFactory {
+
+ private LocalDirAllocator directoryAllocator;
+
+ DiskBlockFactory(String keyToBufferDir, Configuration conf) {
+ super(keyToBufferDir, conf);
+ String bufferDir = conf.get(keyToBufferDir) != null
+ ? keyToBufferDir : HADOOP_TMP_DIR;
+ directoryAllocator = new LocalDirAllocator(bufferDir);
+ }
+
+ /**
+ * Create a temp file and a {@link DiskBlock} instance to manage it.
+ *
+ * @param index block index.
+ * @param limit limit of the block.
+ * @param statistics statistics to update.
+ * @return the new block.
+ * @throws IOException IO problems
+ */
+ @Override
+ public DataBlock create(long index,
+ int limit,
+ BlockUploadStatistics statistics)
+ throws IOException {
+ File destFile = createTmpFileForWrite(String.format("datablock-%04d-",
+ index),
+ limit, getConf());
+
+ return new DiskBlock(destFile, limit, index, statistics);
+ }
+
+ /**
+ * Demand create the directory allocator, then create a temporary file.
+ * This does not mark the file for deletion when a process exits.
+ * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
+ *
+ * @param pathStr prefix for the temporary file.
+ * @param size the size of the file that is going to be written.
+ * @param conf the Configuration object.
+ * @return a unique temporary file.
+ * @throws IOException IO problems
+ */
+ File createTmpFileForWrite(String pathStr, long size,
+ Configuration conf) throws IOException {
+ Path path = directoryAllocator.getLocalPathForWrite(pathStr,
+ size, conf);
+ File dir = new File(path.getParent().toUri().getPath());
+ String prefix = path.getName();
+ // create a temp file on this directory
+ return File.createTempFile(prefix, null, dir);
+ }
+ }
+
+ /**
+ * Stream to a file.
+ * This will stop at the limit; the caller is expected to create a new block.
+ */
+ static class DiskBlock extends DataBlock {
+
+ private int bytesWritten;
+ private final File bufferFile;
+ private final int limit;
+ private BufferedOutputStream out;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ DiskBlock(File bufferFile,
+ int limit,
+ long index,
+ BlockUploadStatistics statistics)
+ throws FileNotFoundException {
+ super(index, statistics);
+ this.limit = limit;
+ this.bufferFile = bufferFile;
+ blockAllocated();
+ out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+ }
+
+ @Override public int dataSize() {
+ return bytesWritten;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override public int remainingCapacity() {
+ return limit - bytesWritten;
+ }
+
+ @Override
+ public int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ out.write(b, offset, written);
+ bytesWritten += written;
+ return written;
+ }
+
+ @Override
+ public BlockUploadData startUpload() throws IOException {
+ super.startUpload();
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ out = null;
+ }
+ return new BlockUploadData(bufferFile);
+ }
+
+ /**
+ * The close operation will delete the destination file if it still
+ * exists.
+ *
+ * @throws IOException IO problems
+ */
+ @SuppressWarnings("UnnecessaryDefault")
+ @Override
+ protected void innerClose() throws IOException {
+ final DestState state = getState();
+ LOG.debug("Closing {}", this);
+ switch (state) {
+ case Writing:
+ if (bufferFile.exists()) {
+ // file was not uploaded
+ LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
+ getIndex());
+ closeBlock();
+ }
+ break;
+
+ case Upload:
+ LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
+ getIndex(), bufferFile);
+ break;
+
+ case Closed:
+ closeBlock();
+ break;
+
+ default:
+ // this state can never be reached, but checkstyle complains, so
+ // it is here.
+ }
+ }
+
+ /**
+ * Flush operation will flush to disk.
+ *
+ * @throws IOException IOE raised on FileOutputStream
+ */
+ @Override public void flush() throws IOException {
+ super.flush();
+ out.flush();
+ }
+
+ @Override
+ public String toString() {
+ String sb = "FileBlock{"
+ + "index=" + getIndex()
+ + ", destFile=" + bufferFile +
+ ", state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + limit +
+ '}';
+ return sb;
+ }
+
+ /**
+ * Close the block.
+ * This will delete the block's buffer file if the block has
+ * not previously been closed.
+ */
+ void closeBlock() {
+ LOG.debug("block[{}]: closeBlock()", getIndex());
+ if (!closed.getAndSet(true)) {
+ blockReleased();
+ if (!bufferFile.delete() && bufferFile.exists()) {
+ LOG.warn("delete({}) returned false",
+ bufferFile.getAbsoluteFile());
+ }
+ } else {
+ LOG.debug("block[{}]: skipping re-entrant closeBlock()", getIndex());
+ }
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 88d8220b1e2..08da7b6efc8 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2291,6 +2291,13 @@
+
+ fs.azure.buffer.dir
+ ${hadoop.tmp.dir}/abfs
+ Directory path for buffer files needed to upload data blocks
+ in AbfsOutputStream.
+
+
fs.AbstractFileSystem.gs.implcom.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java
new file mode 100644
index 00000000000..5698a08c7e1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * UTs to test {@link DataBlocks} functionalities.
+ */
+public class TestDataBlocks {
+ private final Configuration configuration = new Configuration();
+ private static final int ONE_KB = 1024;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDataBlocks.class);
+
+ /**
+ * Test to verify different DataBlocks factories, different operations.
+ */
+ @Test
+ public void testDataBlocksFactory() throws Exception {
+ testCreateFactory(DATA_BLOCKS_BUFFER_DISK);
+ testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY);
+ testCreateFactory(DATA_BLOCKS_BYTEBUFFER);
+ }
+
+ /**
+ * Verify creation of a data block factory and its operations.
+ *
+ * @param nameOfFactory Name of the DataBlock factory to be created.
+ * @throws IOException Throw IOE in case of failure while creating a block.
+ */
+ public void testCreateFactory(String nameOfFactory) throws Exception {
+ LOG.info("Testing: {}", nameOfFactory);
+ DataBlocks.BlockFactory blockFactory =
+ DataBlocks.createFactory("Dir", configuration, nameOfFactory);
+
+ DataBlocks.DataBlock dataBlock = blockFactory.create(0, ONE_KB, null);
+ assertWriteBlock(dataBlock);
+ assertToByteArray(dataBlock);
+ assertCloseBlock(dataBlock);
+ }
+
+ /**
+ * Verify Writing of a dataBlock.
+ *
+ * @param dataBlock DataBlock to be tested.
+ * @throws IOException Throw Exception in case of failures.
+ */
+ private void assertWriteBlock(DataBlocks.DataBlock dataBlock)
+ throws IOException {
+ byte[] oneKbBuff = new byte[ONE_KB];
+ new Random().nextBytes(oneKbBuff);
+ dataBlock.write(oneKbBuff, 0, ONE_KB);
+ // Verify DataBlock state is at Writing.
+ dataBlock.verifyState(DataBlocks.DataBlock.DestState.Writing);
+ // Verify that the DataBlock has data written.
+ assertTrue("Expected Data block to have data", dataBlock.hasData());
+ // Verify the size of data.
+ assertEquals("Mismatch in data size in block", ONE_KB,
+ dataBlock.dataSize());
+ // Verify that no capacity is left in the data block to write more.
+ assertFalse("Expected the data block to have no capacity to write 1 byte "
+ + "of data", dataBlock.hasCapacity(1));
+ }
+
+ /**
+ * Verify the Conversion of Data blocks into byte[].
+ *
+ * @param dataBlock data block to be tested.
+ * @throws Exception Throw Exception in case of failures.
+ */
+ private void assertToByteArray(DataBlocks.DataBlock dataBlock)
+ throws Exception {
+ DataBlocks.BlockUploadData blockUploadData = dataBlock.startUpload();
+ // Verify that the current state is in upload.
+ dataBlock.verifyState(DataBlocks.DataBlock.DestState.Upload);
+ // Convert the DataBlock upload to byteArray.
+ byte[] bytesWritten = blockUploadData.toByteArray();
+ // Verify that we can call toByteArray() more than once and gives the
+ // same byte[].
+ assertEquals("Mismatch in byteArray provided by toByteArray() the second "
+ + "time", bytesWritten, blockUploadData.toByteArray());
+ IOUtils.close(blockUploadData);
+ // Verify that after closing blockUploadData, we can't call toByteArray().
+ LambdaTestUtils.intercept(IllegalStateException.class,
+ "Block is closed",
+ "Expected to throw IllegalStateException.java after closing "
+ + "blockUploadData and trying to call toByteArray()",
+ () -> {
+ blockUploadData.toByteArray();
+ });
+ }
+
+ /**
+ * Verify the close() of data blocks.
+ *
+ * @param dataBlock data block to be tested.
+ * @throws IOException Throw Exception in case of failures.
+ */
+ private void assertCloseBlock(DataBlocks.DataBlock dataBlock)
+ throws IOException {
+ dataBlock.close();
+ // Verify that the current state is in Closed.
+ dataBlock.verifyState(DataBlocks.DataBlock.DestState.Closed);
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index a8bf7c16eec..e7ab884e1ae 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -90,6 +90,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
@@ -101,6 +102,11 @@ import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
@@ -125,6 +131,13 @@ public class AzureBlobFileSystem extends FileSystem
private TracingHeaderFormat tracingHeaderFormat;
private Listener listener;
+ /** Name of blockFactory to be used by AbfsOutputStream. */
+ private String blockOutputBuffer;
+ /** BlockFactory instance to be used. */
+ private DataBlocks.BlockFactory blockFactory;
+ /** Maximum Active blocks per OutputStream. */
+ private int blockOutputActiveBlocks;
+
@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
@@ -136,8 +149,33 @@ public class AzureBlobFileSystem extends FileSystem
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
abfsCounters = new AbfsCountersImpl(uri);
- this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
- configuration, abfsCounters);
+ // name of the blockFactory to be used.
+ this.blockOutputBuffer = configuration.getTrimmed(DATA_BLOCKS_BUFFER,
+ DATA_BLOCKS_BUFFER_DEFAULT);
+ // blockFactory used for this FS instance.
+ this.blockFactory =
+ DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
+ configuration, blockOutputBuffer);
+ this.blockOutputActiveBlocks =
+ configuration.getInt(FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS,
+ BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT);
+ if (blockOutputActiveBlocks < 1) {
+ blockOutputActiveBlocks = 1;
+ }
+
+ // AzureBlobFileSystemStore with params in builder.
+ AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder
+ systemStoreBuilder =
+ new AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder()
+ .withUri(uri)
+ .withSecureScheme(this.isSecureScheme())
+ .withConfiguration(configuration)
+ .withAbfsCounters(abfsCounters)
+ .withBlockFactory(blockFactory)
+ .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
+ .build();
+
+ this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
LOG.trace("AzureBlobFileSystemStore init complete");
final AbfsConfiguration abfsConfiguration = abfsStore
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 3a527f7f0c3..27beb34fccd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -51,6 +51,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -119,8 +121,12 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
@@ -169,10 +175,23 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
*/
private Set appendBlobDirSet;
- public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
- Configuration configuration,
- AbfsCounters abfsCounters) throws IOException {
- this.uri = uri;
+ /** BlockFactory being used by this instance.*/
+ private DataBlocks.BlockFactory blockFactory;
+ /** Number of active data blocks per AbfsOutputStream */
+ private int blockOutputActiveBlocks;
+ /** Bounded ThreadPool for this instance. */
+ private ExecutorService boundedThreadPool;
+
+ /**
+ * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
+ * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
+ * required.
+ * @param abfsStoreBuilder Builder for AzureBlobFileSystemStore.
+ * @throws IOException Throw IOE in case of failure during constructing.
+ */
+ public AzureBlobFileSystemStore(
+ AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException {
+ this.uri = abfsStoreBuilder.uri;
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
@@ -180,7 +199,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
- this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
+ this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
@@ -210,16 +229,16 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
- boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
+ boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
- this.abfsCounters = abfsCounters;
+ this.abfsCounters = abfsStoreBuilder.abfsCounters;
initializeClient(uri, fileSystemName, accountName, useHttps);
final Class extends IdentityTransformerInterface> identityTransformerClass =
- configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
+ abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
try {
this.identityTransformer =
- identityTransformerClass.getConstructor(Configuration.class).newInstance(configuration);
+ identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration);
} catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) {
throw new IOException(e);
}
@@ -233,6 +252,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
+ this.blockFactory = abfsStoreBuilder.blockFactory;
+ this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
+ this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ abfsConfiguration.getWriteMaxConcurrentRequestCount(),
+ abfsConfiguration.getMaxWriteRequestsToQueue(),
+ 10L, TimeUnit.SECONDS,
+ "abfs-bounded");
}
/**
@@ -269,6 +295,10 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
}
try {
Futures.allAsList(futures).get();
+ // shutdown the threadPool and set it to null.
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ 30, TimeUnit.SECONDS);
+ boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
@@ -495,7 +525,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
- TracingContext tracingContext) throws AzureBlobFileSystemException {
+ TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -546,12 +576,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
return new AbfsOutputStream(
- client,
- statistics,
- relativePath,
- 0,
- populateAbfsOutputStreamContext(isAppendBlob, lease),
- tracingContext);
+ populateAbfsOutputStreamContext(
+ isAppendBlob,
+ lease,
+ client,
+ statistics,
+ relativePath,
+ 0,
+ tracingContext));
}
}
@@ -625,8 +657,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
return op;
}
- private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
- AbfsLease lease) {
+ /**
+ * Method to populate AbfsOutputStreamContext with different parameters to
+ * be used to construct {@link AbfsOutputStream}.
+ *
+ * @param isAppendBlob is Append blob support enabled?
+ * @param lease instance of AbfsLease for this AbfsOutputStream.
+ * @param client AbfsClient.
+ * @param statistics FileSystem statistics.
+ * @param path Path for AbfsOutputStream.
+ * @param position Position or offset of the file being opened, set to 0
+ * when creating a new file, but needs to be set for APPEND
+ * calls on the same file.
+ * @param tracingContext instance of TracingContext for this AbfsOutputStream.
+ * @return AbfsOutputStreamContext instance with the desired parameters.
+ */
+ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
+ boolean isAppendBlob,
+ AbfsLease lease,
+ AbfsClient client,
+ FileSystem.Statistics statistics,
+ String path,
+ long position,
+ TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@@ -641,6 +694,15 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
+ .withBlockFactory(blockFactory)
+ .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
+ .withClient(client)
+ .withPosition(position)
+ .withFsStatistics(statistics)
+ .withPath(path)
+ .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
+ blockOutputActiveBlocks, true))
+ .withTracingContext(tracingContext)
.build();
}
@@ -732,7 +794,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
public OutputStream openFileForWrite(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
- TracingContext tracingContext) throws AzureBlobFileSystemException {
+ TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
client.getFileSystem(),
@@ -768,12 +830,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
return new AbfsOutputStream(
- client,
- statistics,
- relativePath,
- offset,
- populateAbfsOutputStreamContext(isAppendBlob, lease),
- tracingContext);
+ populateAbfsOutputStreamContext(
+ isAppendBlob,
+ lease,
+ client,
+ statistics,
+ relativePath,
+ offset,
+ tracingContext));
}
}
@@ -1721,6 +1785,57 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
}
}
+ /**
+ * A builder class for AzureBlobFileSystemStore.
+ */
+ public static final class AzureBlobFileSystemStoreBuilder {
+
+ private URI uri;
+ private boolean isSecureScheme;
+ private Configuration configuration;
+ private AbfsCounters abfsCounters;
+ private DataBlocks.BlockFactory blockFactory;
+ private int blockOutputActiveBlocks;
+
+ public AzureBlobFileSystemStoreBuilder withUri(URI value) {
+ this.uri = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) {
+ this.isSecureScheme = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder withConfiguration(
+ Configuration value) {
+ this.configuration = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder withAbfsCounters(
+ AbfsCounters value) {
+ this.abfsCounters = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder withBlockFactory(
+ DataBlocks.BlockFactory value) {
+ this.blockFactory = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
+ int value) {
+ this.blockOutputActiveBlocks = value;
+ return this;
+ }
+
+ public AzureBlobFileSystemStoreBuilder build() {
+ return this;
+ }
+ }
+
@VisibleForTesting
AbfsClient getClient() {
return this.client;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 4a2c5951bd5..12beb5a9bba 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -56,6 +56,37 @@ public final class ConfigurationKeys {
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
+
+ /**
+ * Maximum Number of blocks a single output stream can have
+ * active (uploading, or queued to the central FileSystem
+ * instance's pool of queued operations.
+ * This stops a single stream overloading the shared thread pool.
+ * {@value}
+ *
+ * Default is {@link FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT}
+ */
+ public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS =
+ "fs.azure.block.upload.active.blocks";
+
+ /**
+ * Buffer directory path for uploading AbfsOutputStream data blocks.
+ * Value: {@value}
+ */
+ public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR =
+ "fs.azure.buffer.dir";
+
+ /**
+ * What data block buffer to use.
+ *
+ * Options include: "disk"(Default), "array", and "bytebuffer".
+ *
+ * Default is {@link FileSystemConfigurations#DATA_BLOCKS_BUFFER_DEFAULT}.
+ * Value: {@value}
+ */
+ public static final String DATA_BLOCKS_BUFFER =
+ "fs.azure.data.blocks.buffer";
+
/** If the data size written by Hadoop app is small, i.e. data size :
* (a) before any of HFlush/HSync call is made or
* (b) between 2 HFlush/Hsync API calls
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a1de9dfc0ac..f58c61e8908 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -115,5 +115,23 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
+ /**
+ * Limit of queued block upload operations before writes
+ * block for an OutputStream. Value: {@value}
+ */
+ public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;
+
+ /**
+ * Buffer blocks to disk.
+ * Capacity is limited to available disk space.
+ */
+ public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
+
+ /**
+ * Default buffer option: {@value}.
+ */
+ public static final String DATA_BLOCKS_BUFFER_DEFAULT =
+ DATA_BLOCKS_BUFFER_DISK;
+
private FileSystemConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 91b068a78c9..61160f92bdf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -20,24 +20,20 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
-import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.UUID;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -47,10 +43,9 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
-import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
-import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
@@ -63,6 +58,7 @@ import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
+import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
/**
* The BlobFsOutputStream for Rest AbfsClient.
@@ -72,6 +68,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private final AbfsClient client;
private final String path;
+ /** The position in the file being uploaded, where the next block would be
+ * uploaded.
+ * This is used in constructing the AbfsClient requests to ensure that,
+ * even if blocks are uploaded out of order, they are reassembled in
+ * correct order.
+ * */
private long position;
private boolean closed;
private boolean supportFlush;
@@ -91,8 +93,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private final int maxRequestsThatCanBeQueued;
private ConcurrentLinkedDeque writeOperations;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService completionService;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
@@ -103,15 +103,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private AbfsLease lease;
private String leaseId;
- /**
- * Queue storing buffers with the size of the Azure block ready for
- * reuse. The pool allows reusing the blocks instead of allocating new
- * blocks. After the data is sent to the service, the buffer is returned
- * back to the queue
- */
- private ElasticByteBufferPool byteBufferPool
- = new ElasticByteBufferPool();
-
private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;
private IOStatistics ioStatistics;
@@ -119,17 +110,27 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class);
- public AbfsOutputStream(
- final AbfsClient client,
- final Statistics statistics,
- final String path,
- final long position,
- AbfsOutputStreamContext abfsOutputStreamContext,
- TracingContext tracingContext) {
- this.client = client;
- this.statistics = statistics;
- this.path = path;
- this.position = position;
+ /** Factory for blocks. */
+ private final DataBlocks.BlockFactory blockFactory;
+
+ /** Current data block. Null means none currently active. */
+ private DataBlocks.DataBlock activeBlock;
+
+ /** Count of blocks uploaded. */
+ private long blockCount = 0;
+
+ /** The size of a single block. */
+ private final int blockSize;
+
+ /** Executor service to carry out the parallel upload requests. */
+ private final ListeningExecutorService executorService;
+
+ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
+ throws IOException {
+ this.client = abfsOutputStreamContext.getClient();
+ this.statistics = abfsOutputStreamContext.getStatistics();
+ this.path = abfsOutputStreamContext.getPath();
+ this.position = abfsOutputStreamContext.getPosition();
this.closed = false;
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = abfsOutputStreamContext
@@ -140,7 +141,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
- this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
@@ -157,23 +157,20 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
this.lease = abfsOutputStreamContext.getLease();
this.leaseId = abfsOutputStreamContext.getLeaseId();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+ this.executorService =
+ MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService());
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
- if (outputStreamStatistics != null) {
- this.ioStatistics = outputStreamStatistics.getIOStatistics();
- }
this.outputStreamId = createOutputStreamId();
- this.tracingContext = new TracingContext(tracingContext);
+ this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext());
this.tracingContext.setStreamID(outputStreamId);
this.tracingContext.setOperation(FSOperationType.WRITE);
+ this.ioStatistics = outputStreamStatistics.getIOStatistics();
+ this.blockFactory = abfsOutputStreamContext.getBlockFactory();
+ this.blockSize = bufferSize;
+ // create that first block. This guarantees that an open + close sequence
+ // writes a 0-byte entry.
+ createBlockIfNeeded();
}
private String createOutputStreamId() {
@@ -219,10 +216,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
+ // validate if data is not null and index out of bounds.
+ DataBlocks.validateWriteArgs(data, off, length);
maybeThrowLastError();
- Preconditions.checkArgument(data != null, "null data");
-
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
@@ -230,29 +227,184 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
if (hasLease() && isLeaseFreed()) {
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
}
+ DataBlocks.DataBlock block = createBlockIfNeeded();
+ int written = block.write(data, off, length);
+ int remainingCapacity = block.remainingCapacity();
- int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
- int numberOfBytesToWrite = length;
-
- while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+ if (written < length) {
+ // Number of bytes to write is more than the data block capacity,
+ // trigger an upload and then write on the next block.
+ LOG.debug("writing more data than block capacity -triggering upload");
+ uploadCurrentBlock();
+ // tail recursion is mildly expensive, but given buffer sizes must be MB.
+ // it's unlikely to recurse very deeply.
+ this.write(data, off + written, length - written);
+ } else {
+ if (remainingCapacity == 0) {
+ // the whole buffer is done, trigger an upload
+ uploadCurrentBlock();
}
-
- writableBytes = bufferSize - bufferIndex;
}
incrementWriteOps();
}
+ /**
+ * Demand create a destination block.
+ *
+ * @return the active block; null if there isn't one.
+ * @throws IOException on any failure to create
+ */
+ private synchronized DataBlocks.DataBlock createBlockIfNeeded()
+ throws IOException {
+ if (activeBlock == null) {
+ blockCount++;
+ activeBlock = blockFactory
+ .create(blockCount, this.blockSize, outputStreamStatistics);
+ }
+ return activeBlock;
+ }
+
+ /**
+ * Start an asynchronous upload of the current block.
+ *
+ * @throws IOException Problems opening the destination for upload,
+ * initializing the upload, or if a previous operation has failed.
+ */
+ private synchronized void uploadCurrentBlock() throws IOException {
+ checkState(hasActiveBlock(), "No active block");
+ LOG.debug("Writing block # {}", blockCount);
+ try {
+ uploadBlockAsync(getActiveBlock(), false, false);
+ } finally {
+ // set the block to null, so the next write will create a new block.
+ clearActiveBlock();
+ }
+ }
+
+ /**
+ * Upload a block of data.
+ * This will take the block.
+ *
+ * @param blockToUpload block to upload.
+ * @throws IOException upload failure
+ */
+ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
+ boolean isFlush, boolean isClose)
+ throws IOException {
+ if (this.isAppendBlob) {
+ writeAppendBlobCurrentBufferToService();
+ return;
+ }
+ if (!blockToUpload.hasData()) {
+ return;
+ }
+ numOfAppendsToServerSinceLastFlush++;
+
+ final int bytesLength = blockToUpload.dataSize();
+ final long offset = position;
+ position += bytesLength;
+ outputStreamStatistics.bytesToUpload(bytesLength);
+ outputStreamStatistics.writeCurrentBuffer();
+ DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
+ final Future job =
+ executorService.submit(() -> {
+ AbfsPerfTracker tracker =
+ client.getAbfsPerfTracker();
+ try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+ "writeCurrentBufferToService", "append")) {
+ AppendRequestParameters.Mode
+ mode = APPEND_MODE;
+ if (isFlush & isClose) {
+ mode = FLUSH_CLOSE_MODE;
+ } else if (isFlush) {
+ mode = FLUSH_MODE;
+ }
+ /*
+ * Parameters Required for an APPEND call.
+ * offset(here) - refers to the position in the file.
+ * bytesLength - Data to be uploaded from the block.
+ * mode - If it's append, flush or flush_close.
+ * leaseId - The AbfsLeaseId for this request.
+ */
+ AppendRequestParameters reqParams = new AppendRequestParameters(
+ offset, 0, bytesLength, mode, false, leaseId);
+ AbfsRestOperation op =
+ client.append(path, blockUploadData.toByteArray(), reqParams,
+ cachedSasToken.get(), new TracingContext(tracingContext));
+ cachedSasToken.update(op.getSasToken());
+ perfInfo.registerResult(op.getResult());
+ perfInfo.registerSuccess(true);
+ outputStreamStatistics.uploadSuccessful(bytesLength);
+ return null;
+ } finally {
+ IOUtils.close(blockUploadData);
+ }
+ });
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+ }
+
+ /**
+ * A method to set the lastError if an exception is caught.
+ * @param ex Exception caught.
+ * @throws IOException Throws the lastError.
+ */
+ private void failureWhileSubmit(Exception ex) throws IOException {
+ if (ex instanceof AbfsRestOperationException) {
+ if (((AbfsRestOperationException) ex).getStatusCode()
+ == HttpURLConnection.HTTP_NOT_FOUND) {
+ throw new FileNotFoundException(ex.getMessage());
+ }
+ }
+ if (ex instanceof IOException) {
+ lastError = (IOException) ex;
+ } else {
+ lastError = new IOException(ex);
+ }
+ throw lastError;
+ }
+
+ /**
+ * Synchronized accessor to the active block.
+ *
+ * @return the active block; null if there isn't one.
+ */
+ private synchronized DataBlocks.DataBlock getActiveBlock() {
+ return activeBlock;
+ }
+
+ /**
+ * Predicate to query whether or not there is an active block.
+ *
+ * @return true if there is an active block.
+ */
+ private synchronized boolean hasActiveBlock() {
+ return activeBlock != null;
+ }
+
+ /**
+ * Is there an active block and is there any data in it to upload?
+ *
+ * @return true if there is some data to upload in an active block else false.
+ */
+ private boolean hasActiveBlockDataToUpload() {
+ return hasActiveBlock() && getActiveBlock().hasData();
+ }
+
+ /**
+ * Clear the active block.
+ */
+ private void clearActiveBlock() {
+ if (activeBlock != null) {
+ LOG.debug("Clearing active block");
+ }
+ synchronized (this) {
+ activeBlock = null;
+ }
+ }
+
/**
* Increment Write Operations.
*/
@@ -335,7 +487,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
try {
flushInternal(true);
- threadExecutor.shutdown();
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
// the exception thrown in a close == the one already thrown
@@ -352,9 +503,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
bufferIndex = 0;
closed = true;
writeOperations.clear();
- byteBufferPool = null;
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
+ if (hasActiveBlock()) {
+ clearActiveBlock();
}
}
LOG.debug("Closing AbfsOutputStream : {}", this);
@@ -368,19 +518,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
&& enableSmallWriteOptimization
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
&& (writeOperations.size() == 0) // double checking no appends in progress
- && (bufferIndex > 0)) { // there is some data that is pending to be written
+ && hasActiveBlockDataToUpload()) { // there is
+ // some data that is pending to be written
smallWriteOptimizedflushInternal(isClose);
return;
}
- writeCurrentBufferToService();
+ if (hasActiveBlockDataToUpload()) {
+ uploadCurrentBlock();
+ }
flushWrittenBytesToService(isClose);
numOfAppendsToServerSinceLastFlush = 0;
}
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
- writeCurrentBufferToService(true, isClose);
+ uploadBlockAsync(getActiveBlock(), true, isClose);
waitForAppendsToComplete();
shrinkWriteOperationQueue();
maybeThrowLastError();
@@ -389,131 +542,60 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
- writeCurrentBufferToService();
+ if (hasActiveBlockDataToUpload()) {
+ uploadCurrentBlock();
+ }
+ waitForAppendsToComplete();
flushWrittenBytesToServiceAsync();
}
+ /**
+ * Appending the current active data block to service. Clearing the active
+ * data block and releasing all buffered data.
+ * @throws IOException if there is any failure while starting an upload for
+ * the dataBlock or while closing the BlockUploadData.
+ */
private void writeAppendBlobCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ DataBlocks.DataBlock activeBlock = getActiveBlock();
+ // No data, return.
+ if (!hasActiveBlockDataToUpload()) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
- if (outputStreamStatistics != null) {
- outputStreamStatistics.writeCurrentBuffer();
- outputStreamStatistics.bytesToUpload(bytesLength);
- }
- buffer = byteBufferPool.getBuffer(false, bufferSize).array();
- bufferIndex = 0;
+
+ final int bytesLength = activeBlock.dataSize();
+ DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
+ clearActiveBlock();
+ outputStreamStatistics.writeCurrentBuffer();
+ outputStreamStatistics.bytesToUpload(bytesLength);
final long offset = position;
position += bytesLength;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
- "writeCurrentBufferToService", "append")) {
+ "writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
bytesLength, APPEND_MODE, true, leaseId);
- AbfsRestOperation op = client
- .append(path, bytes, reqParams, cachedSasToken.get(),
- new TracingContext(tracingContext));
+ AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
+ cachedSasToken.get(), new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
- if (outputStreamStatistics != null) {
- outputStreamStatistics.uploadSuccessful(bytesLength);
- }
+ outputStreamStatistics.uploadSuccessful(bytesLength);
+
perfInfo.registerResult(op.getResult());
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return;
} catch (Exception ex) {
- if (ex instanceof AbfsRestOperationException) {
- if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
- throw new FileNotFoundException(ex.getMessage());
- }
- }
- if (ex instanceof AzureBlobFileSystemException) {
- ex = (AzureBlobFileSystemException) ex;
- }
- lastError = new IOException(ex);
- throw lastError;
+ outputStreamStatistics.uploadFailed(bytesLength);
+ failureWhileSubmit(ex);
+ } finally {
+ IOUtils.close(uploadData);
}
}
- private synchronized void writeCurrentBufferToService() throws IOException {
- writeCurrentBufferToService(false, false);
- }
-
- private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
- if (this.isAppendBlob) {
- writeAppendBlobCurrentBufferToService();
- return;
- }
-
- if (bufferIndex == 0) {
- return;
- }
- numOfAppendsToServerSinceLastFlush++;
-
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
- if (outputStreamStatistics != null) {
- outputStreamStatistics.writeCurrentBuffer();
- outputStreamStatistics.bytesToUpload(bytesLength);
- }
- buffer = byteBufferPool.getBuffer(false, bufferSize).array();
- bufferIndex = 0;
- final long offset = position;
- position += bytesLength;
-
- if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
- //Tracking time spent on waiting for task to complete.
- if (outputStreamStatistics != null) {
- try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
- waitForTaskToComplete();
- }
- } else {
- waitForTaskToComplete();
- }
- }
- final Future job = completionService.submit(() -> {
- AbfsPerfTracker tracker = client.getAbfsPerfTracker();
- try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
- "writeCurrentBufferToService", "append")) {
- AppendRequestParameters.Mode
- mode = APPEND_MODE;
- if (isFlush & isClose) {
- mode = FLUSH_CLOSE_MODE;
- } else if (isFlush) {
- mode = FLUSH_MODE;
- }
- AppendRequestParameters reqParams = new AppendRequestParameters(
- offset, 0, bytesLength, mode, false, leaseId);
- AbfsRestOperation op = client.append(path, bytes, reqParams,
- cachedSasToken.get(), new TracingContext(tracingContext));
- cachedSasToken.update(op.getSasToken());
- perfInfo.registerResult(op.getResult());
- byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
- perfInfo.registerSuccess(true);
- return null;
- }
- });
-
- if (outputStreamStatistics != null) {
- if (job.isCancelled()) {
- outputStreamStatistics.uploadFailed(bytesLength);
- } else {
- outputStreamStatistics.uploadSuccessful(bytesLength);
- }
- }
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
- }
-
private synchronized void waitForAppendsToComplete() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
+ outputStreamStatistics.uploadFailed(writeOperation.length);
if (ex.getCause() instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
@@ -563,7 +645,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
throw new FileNotFoundException(ex.getMessage());
}
}
- throw new IOException(ex);
+ lastError = new IOException(ex);
+ throw lastError;
}
this.lastFlushOffset = offset;
}
@@ -574,14 +657,14 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
- writeOperations.peek().task.get();
- lastTotalAppendOffset += writeOperations.peek().length;
+ WriteOperation peek = writeOperations.peek();
+ while (peek != null && peek.task.isDone()) {
+ peek.task.get();
+ lastTotalAppendOffset += peek.length;
writeOperations.remove();
+ peek = writeOperations.peek();
// Incrementing statistics to indicate queue has been shrunk.
- if (outputStreamStatistics != null) {
- outputStreamStatistics.queueShrunk();
- }
+ outputStreamStatistics.queueShrunk();
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
@@ -593,26 +676,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
}
}
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
- // for AppendBLob, jobs are not submitted to completion service
- if (isAppendBlob) {
- completed = true;
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
private static class WriteOperation {
private final Future task;
private final long startOffset;
@@ -631,7 +694,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
@VisibleForTesting
public synchronized void waitForPendingUploads() throws IOException {
- waitForTaskToComplete();
+ waitForAppendsToComplete();
}
/**
@@ -695,12 +758,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
- if (outputStreamStatistics != null) {
- sb.append("AbfsOutputStream@").append(this.hashCode());
- sb.append("){");
- sb.append(outputStreamStatistics.toString());
- sb.append("}");
- }
+ sb.append("AbfsOutputStream@").append(this.hashCode());
+ sb.append("){");
+ sb.append(outputStreamStatistics.toString());
+ sb.append("}");
return sb.toString();
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 48f6f540810..ad303823e0c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -18,6 +18,12 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
/**
* Class to hold extra output stream configs.
*/
@@ -41,6 +47,22 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private AbfsLease lease;
+ private DataBlocks.BlockFactory blockFactory;
+
+ private int blockOutputActiveBlocks;
+
+ private AbfsClient client;
+
+ private long position;
+
+ private FileSystem.Statistics statistics;
+
+ private String path;
+
+ private ExecutorService executorService;
+
+ private TracingContext tracingContext;
+
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -79,11 +101,64 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
return this;
}
- public AbfsOutputStreamContext build() {
- // Validation of parameters to be done here.
+ public AbfsOutputStreamContext withBlockFactory(
+ final DataBlocks.BlockFactory blockFactory) {
+ this.blockFactory = blockFactory;
return this;
}
+ public AbfsOutputStreamContext withBlockOutputActiveBlocks(
+ final int blockOutputActiveBlocks) {
+ this.blockOutputActiveBlocks = blockOutputActiveBlocks;
+ return this;
+ }
+
+
+ public AbfsOutputStreamContext withClient(
+ final AbfsClient client) {
+ this.client = client;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withPosition(
+ final long position) {
+ this.position = position;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withFsStatistics(
+ final FileSystem.Statistics statistics) {
+ this.statistics = statistics;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withPath(
+ final String path) {
+ this.path = path;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withExecutorService(
+ final ExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+
+ public AbfsOutputStreamContext withTracingContext(
+ final TracingContext tracingContext) {
+ this.tracingContext = tracingContext;
+ return this;
+ }
+
+ public AbfsOutputStreamContext build() {
+ // Validation of parameters to be done here.
+ if (streamStatistics == null) {
+ streamStatistics = new AbfsOutputStreamStatisticsImpl();
+ }
+ return this;
+ }
+
+
public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
final int writeMaxConcurrentRequestCount) {
this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
@@ -143,4 +218,36 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
}
return this.lease.getLeaseID();
}
+
+ public DataBlocks.BlockFactory getBlockFactory() {
+ return blockFactory;
+ }
+
+ public int getBlockOutputActiveBlocks() {
+ return blockOutputActiveBlocks;
+ }
+
+ public AbfsClient getClient() {
+ return client;
+ }
+
+ public FileSystem.Statistics getStatistics() {
+ return statistics;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public TracingContext getTracingContext() {
+ return tracingContext;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
index c57d5d9bcaa..a9e088c025b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
@@ -22,12 +22,14 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.store.BlockUploadStatistics;
/**
* Interface for {@link AbfsOutputStream} statistics.
*/
@InterfaceStability.Unstable
-public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
+public interface AbfsOutputStreamStatistics extends IOStatisticsSource,
+ BlockUploadStatistics {
/**
* Number of bytes to be uploaded.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
index b07cf28a710..cb054e2915d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
@@ -42,7 +42,9 @@ public class AbfsOutputStreamStatisticsImpl
StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
StreamStatisticNames.BYTES_UPLOAD_FAILED,
StreamStatisticNames.QUEUE_SHRUNK_OPS,
- StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
+ StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS,
+ StreamStatisticNames.BLOCKS_ALLOCATED,
+ StreamStatisticNames.BLOCKS_RELEASED
)
.withDurationTracking(
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
@@ -60,6 +62,11 @@ public class AbfsOutputStreamStatisticsImpl
private final AtomicLong writeCurrentBufferOps =
ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
+ private final AtomicLong blocksAllocated =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_ALLOCATED);
+ private final AtomicLong blocksReleased =
+ ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_RELEASED);
+
/**
* Records the need to upload bytes and increments the total bytes that
* needs to be uploaded.
@@ -133,6 +140,22 @@ public class AbfsOutputStreamStatisticsImpl
writeCurrentBufferOps.incrementAndGet();
}
+ /**
+ * Increment the counter to indicate a block has been allocated.
+ */
+ @Override
+ public void blockAllocated() {
+ blocksAllocated.incrementAndGet();
+ }
+
+ /**
+ * Increment the counter to indicate a block has been released.
+ */
+ @Override
+ public void blockReleased() {
+ blocksReleased.incrementAndGet();
+ }
+
/**
* {@inheritDoc}
*
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java
index 82907c57475..231c54825f2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.azure.integration;
import org.apache.hadoop.fs.Path;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
+
/**
* Constants for the Azure tests.
*/
@@ -175,4 +177,15 @@ public interface AzureTestConstants {
* Base directory for page blobs.
*/
Path PAGE_BLOB_DIR = new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY);
+
+ /**
+ * Huge file for testing AbfsOutputStream uploads: {@value}
+ */
+ String AZURE_SCALE_HUGE_FILE_UPLOAD = AZURE_SCALE_TEST + "huge.upload";
+
+ /**
+ * Default value for Huge file to be tested for AbfsOutputStream uploads:
+ * {@value}
+ */
+ int AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT = 2 * DEFAULT_WRITE_BUFFER_SIZE;
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 2497f8f1b63..fd2f2690dae 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -463,7 +463,7 @@ public abstract class AbstractAbfsIntegrationTest extends
*/
protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
AzureBlobFileSystem fs,
- Path path) throws AzureBlobFileSystemException {
+ Path path) throws IOException {
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java
new file mode 100644
index 00000000000..510e0a7596b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD;
+import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.getTestPropertyInt;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
+
+/**
+ * Testing Huge file for AbfsOutputStream.
+ */
+@RunWith(Parameterized.class)
+public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest {
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int EIGHT_MB = 8 * ONE_MB;
+ // Configurable huge file upload: "fs.azure.scale.test.huge.upload",
+ // default is 2 * DEFAULT_WRITE_BUFFER_SIZE(8M).
+ private static final int HUGE_FILE;
+
+ // Set the HUGE_FILE.
+ static {
+ HUGE_FILE = getTestPropertyInt(new Configuration(),
+ AZURE_SCALE_HUGE_FILE_UPLOAD, AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT);
+ }
+
+ // Writing block size to be used in this test.
+ private int size;
+ // Block Factory to be used in this test.
+ private String blockFactoryName;
+
+ @Parameterized.Parameters(name = "size [{0}] ; blockFactoryName "
+ + "[{1}]")
+ public static Collection