HADOOP-16242. ABFS: add bufferpool to AbfsOutputStream.
Contributed by Da Zhou.
This commit is contained in:
parent
8e8c16dbc6
commit
e5bb8498df
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
|
|||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
|
@ -64,6 +66,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||
private final ThreadPoolExecutor threadExecutor;
|
||||
private final ExecutorCompletionService<Void> completionService;
|
||||
|
||||
/**
|
||||
* Queue storing buffers with the size of the Azure block ready for
|
||||
* reuse. The pool allows reusing the blocks instead of allocating new
|
||||
* blocks. After the data is sent to the service, the buffer is returned
|
||||
* back to the queue
|
||||
*/
|
||||
private final ElasticByteBufferPool byteBufferPool
|
||||
= new ElasticByteBufferPool();
|
||||
|
||||
public AbfsOutputStream(
|
||||
final AbfsClient client,
|
||||
final String path,
|
||||
|
@ -78,7 +89,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||
this.lastError = null;
|
||||
this.lastFlushOffset = 0;
|
||||
this.bufferSize = bufferSize;
|
||||
this.buffer = new byte[bufferSize];
|
||||
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
||||
this.bufferIndex = 0;
|
||||
this.writeOperations = new ConcurrentLinkedDeque<>();
|
||||
|
||||
|
@ -268,8 +279,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||
|
||||
final byte[] bytes = buffer;
|
||||
final int bytesLength = bufferIndex;
|
||||
|
||||
buffer = new byte[bufferSize];
|
||||
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
|
||||
bufferIndex = 0;
|
||||
final long offset = position;
|
||||
position += bytesLength;
|
||||
|
@ -283,6 +293,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
|
|||
public Void call() throws Exception {
|
||||
client.append(path, offset, bytes, 0,
|
||||
bytesLength);
|
||||
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue