HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.

Contributed by Da Zhou.

(cherry picked from commit 1cef194a28)
This commit is contained in:
Da Zhou 2019-04-29 13:27:28 +01:00 committed by Steve Loughran
parent 907a016142
commit bf0bb2470f
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
1 changed files with 14 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
private final ElasticByteBufferPool byteBufferPool
= new ElasticByteBufferPool();
public AbfsOutputStream( public AbfsOutputStream(
final AbfsClient client, final AbfsClient client,
final String path, final String path,
@ -78,7 +89,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize]; this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0; this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
@ -263,8 +274,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
final byte[] bytes = buffer; final byte[] bytes = buffer;
final int bytesLength = bufferIndex; final int bytesLength = bufferIndex;
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
buffer = new byte[bufferSize];
bufferIndex = 0; bufferIndex = 0;
final long offset = position; final long offset = position;
position += bytesLength; position += bytesLength;
@ -278,6 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
public Void call() throws Exception { public Void call() throws Exception {
client.append(path, offset, bytes, 0, client.append(path, offset, bytes, 0,
bytesLength); bytesLength);
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
return null; return null;
} }
}); });