diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 5764bcb5517..679f22e6ae2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.nio.ByteBuffer; import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -37,6 +38,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; @@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; + /** + * Queue storing buffers with the size of the Azure block ready for + * reuse. The pool allows reusing the blocks instead of allocating new + * blocks. After the data is sent to the service, the buffer is returned + * back to the queue + */ + private final ElasticByteBufferPool byteBufferPool + = new ElasticByteBufferPool(); + public AbfsOutputStream( final AbfsClient client, final String path, @@ -78,7 +89,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; + this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); @@ -268,8 +279,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa final byte[] bytes = buffer; final int bytesLength = bufferIndex; - - buffer = new byte[bufferSize]; + buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; position += bytesLength; @@ -283,6 +293,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa public Void call() throws Exception { client.append(path, offset, bytes, 0, bytesLength); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); return null; } });