HADOOP-14028. S3A BlockOutputStreams doesn't delete temporary files in multipart uploads or handle part upload failures.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2017-02-25 16:10:15 +00:00
parent ff87ca8441
commit 05ab09ca87
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
11 changed files with 653 additions and 280 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -178,10 +176,10 @@ class S3ABlockOutputStream extends OutputStream {
if (activeBlock == null) { if (activeBlock == null) {
blockCount++; blockCount++;
if (blockCount>= Constants.MAX_MULTIPART_COUNT) { if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
LOG.error("Number of partitions in stream exceeds limit for S3: " + LOG.error("Number of partitions in stream exceeds limit for S3: "
+ Constants.MAX_MULTIPART_COUNT + " write may fail."); + Constants.MAX_MULTIPART_COUNT + " write may fail.");
} }
activeBlock = blockFactory.create(this.blockSize); activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
} }
return activeBlock; return activeBlock;
} }
@ -206,7 +204,9 @@ class S3ABlockOutputStream extends OutputStream {
* Clear the active block. * Clear the active block.
*/ */
private void clearActiveBlock() { private void clearActiveBlock() {
if (activeBlock != null) {
LOG.debug("Clearing active block"); LOG.debug("Clearing active block");
}
synchronized (this) { synchronized (this) {
activeBlock = null; activeBlock = null;
} }
@ -356,11 +356,9 @@ class S3ABlockOutputStream extends OutputStream {
writeOperationHelper.writeFailed(ioe); writeOperationHelper.writeFailed(ioe);
throw ioe; throw ioe;
} finally { } finally {
LOG.debug("Closing block and factory"); closeAll(LOG, block, blockFactory);
IOUtils.closeStream(block);
IOUtils.closeStream(blockFactory);
LOG.debug("Statistics: {}", statistics); LOG.debug("Statistics: {}", statistics);
IOUtils.closeStream(statistics); closeAll(LOG, statistics);
clearActiveBlock(); clearActiveBlock();
} }
// All end of write operations, including deleting fake parent directories // All end of write operations, including deleting fake parent directories
@ -378,10 +376,10 @@ class S3ABlockOutputStream extends OutputStream {
final S3ADataBlocks.DataBlock block = getActiveBlock(); final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize(); int size = block.dataSize();
final PutObjectRequest putObjectRequest = final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
writeOperationHelper.newPutRequest( final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
block.startUpload(), writeOperationHelper.newPutRequest(uploadData.getFile())
size); : writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
long transferQueueTime = now(); long transferQueueTime = now();
BlockUploadProgress callback = BlockUploadProgress callback =
new BlockUploadProgress( new BlockUploadProgress(
@ -392,8 +390,14 @@ class S3ABlockOutputStream extends OutputStream {
executorService.submit(new Callable<PutObjectResult>() { executorService.submit(new Callable<PutObjectResult>() {
@Override @Override
public PutObjectResult call() throws Exception { public PutObjectResult call() throws Exception {
PutObjectResult result = fs.putObjectDirect(putObjectRequest); PutObjectResult result;
block.close(); try {
// the putObject call automatically closes the input
// stream afterwards.
result = writeOperationHelper.putObject(putObjectRequest);
} finally {
closeAll(LOG, uploadData, block);
}
return result; return result;
} }
}); });
@ -436,6 +440,14 @@ class S3ABlockOutputStream extends OutputStream {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
/**
* Get the statistics for this stream.
* @return stream statistics
*/
S3AInstrumentation.OutputStreamStatistics getStatistics() {
return statistics;
}
/** /**
* Multiple partition upload. * Multiple partition upload.
*/ */
@ -443,7 +455,7 @@ class S3ABlockOutputStream extends OutputStream {
private final String uploadId; private final String uploadId;
private final List<ListenableFuture<PartETag>> partETagsFutures; private final List<ListenableFuture<PartETag>> partETagsFutures;
public MultiPartUpload() throws IOException { MultiPartUpload() throws IOException {
this.uploadId = writeOperationHelper.initiateMultiPartUpload(); this.uploadId = writeOperationHelper.initiateMultiPartUpload();
this.partETagsFutures = new ArrayList<>(2); this.partETagsFutures = new ArrayList<>(2);
LOG.debug("Initiated multi-part upload for {} with " + LOG.debug("Initiated multi-part upload for {} with " +
@ -460,14 +472,16 @@ class S3ABlockOutputStream extends OutputStream {
throws IOException { throws IOException {
LOG.debug("Queueing upload of {}", block); LOG.debug("Queueing upload of {}", block);
final int size = block.dataSize(); final int size = block.dataSize();
final InputStream uploadStream = block.startUpload(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final int currentPartNumber = partETagsFutures.size() + 1; final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request = final UploadPartRequest request =
writeOperationHelper.newUploadPartRequest( writeOperationHelper.newUploadPartRequest(
uploadId, uploadId,
uploadStream,
currentPartNumber, currentPartNumber,
size); size,
uploadData.getUploadStream(),
uploadData.getFile());
long transferQueueTime = now(); long transferQueueTime = now();
BlockUploadProgress callback = BlockUploadProgress callback =
new BlockUploadProgress( new BlockUploadProgress(
@ -482,12 +496,16 @@ class S3ABlockOutputStream extends OutputStream {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber, LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId); uploadId);
// do the upload // do the upload
PartETag partETag = fs.uploadPart(request).getPartETag(); PartETag partETag;
LOG.debug("Completed upload of {}", block); try {
partETag = fs.uploadPart(request).getPartETag();
LOG.debug("Completed upload of {} to part {}", block,
partETag.getETag());
LOG.debug("Stream statistics of {}", statistics); LOG.debug("Stream statistics of {}", statistics);
} finally {
// close the block // close the stream and block
block.close(); closeAll(LOG, uploadData, block);
}
return partETag; return partETag;
} }
}); });

View File

@ -24,10 +24,8 @@ import java.io.ByteArrayOutputStream;
import java.io.Closeable; import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -42,10 +40,11 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.util.DirectBufferPool;
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*; import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
/** /**
* Set of classes to support output streaming into blocks which are then * Set of classes to support output streaming into blocks which are then
* uploaded as partitions. * uploaded as to S3 as a single PUT, or as part of a multipart request.
*/ */
final class S3ADataBlocks { final class S3ADataBlocks {
@ -96,6 +95,70 @@ final class S3ADataBlocks {
} }
} }
/**
* The output information for an upload.
* It can be one of a file or an input stream.
* When closed, any stream is closed. Any source file is untouched.
*/
static final class BlockUploadData implements Closeable {
private final File file;
private final InputStream uploadStream;
/**
* File constructor; input stream will be null.
* @param file file to upload
*/
BlockUploadData(File file) {
Preconditions.checkArgument(file.exists(), "No file: " + file);
this.file = file;
this.uploadStream = null;
}
/**
* Stream constructor, file field will be null.
* @param uploadStream stream to upload
*/
BlockUploadData(InputStream uploadStream) {
Preconditions.checkNotNull(uploadStream, "rawUploadStream");
this.uploadStream = uploadStream;
this.file = null;
}
/**
* Predicate: does this instance contain a file reference.
* @return true if there is a file.
*/
boolean hasFile() {
return file != null;
}
/**
* Get the file, if there is one.
* @return the file for uploading, or null.
*/
File getFile() {
return file;
}
/**
* Get the raw upload stream, if the object was
* created with one.
* @return the upload stream or null.
*/
InputStream getUploadStream() {
return uploadStream;
}
/**
* Close: closes any upload stream provided in the constructor.
* @throws IOException inherited exception
*/
@Override
public void close() throws IOException {
closeAll(LOG, uploadStream);
}
}
/** /**
* Base class for block factories. * Base class for block factories.
*/ */
@ -110,15 +173,21 @@ final class S3ADataBlocks {
/** /**
* Create a block. * Create a block.
*
* @param index index of block
* @param limit limit of the block. * @param limit limit of the block.
* @param statistics stats to work with
* @return a new block. * @return a new block.
*/ */
abstract DataBlock create(int limit) throws IOException; abstract DataBlock create(long index, int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException;
/** /**
* Implement any close/cleanup operation. * Implement any close/cleanup operation.
* Base class is a no-op * Base class is a no-op
* @throws IOException -ideally, it shouldn't. * @throws IOException Inherited exception; implementations should
* avoid raising it.
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
@ -140,6 +209,14 @@ final class S3ADataBlocks {
enum DestState {Writing, Upload, Closed} enum DestState {Writing, Upload, Closed}
private volatile DestState state = Writing; private volatile DestState state = Writing;
protected final long index;
protected final S3AInstrumentation.OutputStreamStatistics statistics;
protected DataBlock(long index,
S3AInstrumentation.OutputStreamStatistics statistics) {
this.index = index;
this.statistics = statistics;
}
/** /**
* Atomically enter a state, verifying current state. * Atomically enter a state, verifying current state.
@ -243,8 +320,8 @@ final class S3ADataBlocks {
* @return the stream * @return the stream
* @throws IOException trouble * @throws IOException trouble
*/ */
InputStream startUpload() throws IOException { BlockUploadData startUpload() throws IOException {
LOG.debug("Start datablock upload"); LOG.debug("Start datablock[{}] upload", index);
enterState(Writing, Upload); enterState(Writing, Upload);
return null; return null;
} }
@ -278,6 +355,23 @@ final class S3ADataBlocks {
} }
/**
* A block has been allocated.
*/
protected void blockAllocated() {
if (statistics != null) {
statistics.blockAllocated();
}
}
/**
* A block has been released.
*/
protected void blockReleased() {
if (statistics != null) {
statistics.blockReleased();
}
}
} }
// ==================================================================== // ====================================================================
@ -292,8 +386,10 @@ final class S3ADataBlocks {
} }
@Override @Override
DataBlock create(int limit) throws IOException { DataBlock create(long index, int limit,
return new ByteArrayBlock(limit); S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
return new ByteArrayBlock(0, limit, statistics);
} }
} }
@ -334,9 +430,13 @@ final class S3ADataBlocks {
// cache data size so that it is consistent after the buffer is reset. // cache data size so that it is consistent after the buffer is reset.
private Integer dataSize; private Integer dataSize;
ByteArrayBlock(int limit) { ByteArrayBlock(long index,
int limit,
S3AInstrumentation.OutputStreamStatistics statistics) {
super(index, statistics);
this.limit = limit; this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit); buffer = new S3AByteArrayOutputStream(limit);
blockAllocated();
} }
/** /**
@ -349,12 +449,12 @@ final class S3ADataBlocks {
} }
@Override @Override
InputStream startUpload() throws IOException { BlockUploadData startUpload() throws IOException {
super.startUpload(); super.startUpload();
dataSize = buffer.size(); dataSize = buffer.size();
ByteArrayInputStream bufferData = buffer.getInputStream(); ByteArrayInputStream bufferData = buffer.getInputStream();
buffer = null; buffer = null;
return bufferData; return new BlockUploadData(bufferData);
} }
@Override @Override
@ -378,12 +478,14 @@ final class S3ADataBlocks {
@Override @Override
protected void innerClose() { protected void innerClose() {
buffer = null; buffer = null;
blockReleased();
} }
@Override @Override
public String toString() { public String toString() {
return "ByteArrayBlock{" + return "ByteArrayBlock{"
"state=" + getState() + +"index=" + index +
", state=" + getState() +
", limit=" + limit + ", limit=" + limit +
", dataSize=" + dataSize + ", dataSize=" + dataSize +
'}'; '}';
@ -395,12 +497,6 @@ final class S3ADataBlocks {
/** /**
* Stream via Direct ByteBuffers; these are allocated off heap * Stream via Direct ByteBuffers; these are allocated off heap
* via {@link DirectBufferPool}. * via {@link DirectBufferPool}.
* This is actually the most complex of all the block factories,
* due to the need to explicitly recycle buffers; in comparison, the
* {@link DiskBlock} buffer delegates the work of deleting files to
* the {@link DiskBlock.FileDeletingInputStream}. Here the
* input stream {@link ByteBufferInputStream} has a similar task, along
* with the foundational work of streaming data from a byte array.
*/ */
static class ByteBufferBlockFactory extends BlockFactory { static class ByteBufferBlockFactory extends BlockFactory {
@ -413,8 +509,10 @@ final class S3ADataBlocks {
} }
@Override @Override
ByteBufferBlock create(int limit) throws IOException { ByteBufferBlock create(long index, int limit,
return new ByteBufferBlock(limit); S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
return new ByteBufferBlock(index, limit, statistics);
} }
private ByteBuffer requestBuffer(int limit) { private ByteBuffer requestBuffer(int limit) {
@ -446,21 +544,27 @@ final class S3ADataBlocks {
/** /**
* A DataBlock which requests a buffer from pool on creation; returns * A DataBlock which requests a buffer from pool on creation; returns
* it when the output stream is closed. * it when it is closed.
*/ */
class ByteBufferBlock extends DataBlock { class ByteBufferBlock extends DataBlock {
private ByteBuffer buffer; private ByteBuffer blockBuffer;
private final int bufferSize; private final int bufferSize;
// cache data size so that it is consistent after the buffer is reset. // cache data size so that it is consistent after the buffer is reset.
private Integer dataSize; private Integer dataSize;
/** /**
* Instantiate. This will request a ByteBuffer of the desired size. * Instantiate. This will request a ByteBuffer of the desired size.
* @param index block index
* @param bufferSize buffer size * @param bufferSize buffer size
* @param statistics statistics to update
*/ */
ByteBufferBlock(int bufferSize) { ByteBufferBlock(long index,
int bufferSize,
S3AInstrumentation.OutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
buffer = requestBuffer(bufferSize); blockBuffer = requestBuffer(bufferSize);
blockAllocated();
} }
/** /**
@ -473,13 +577,14 @@ final class S3ADataBlocks {
} }
@Override @Override
ByteBufferInputStream startUpload() throws IOException { BlockUploadData startUpload() throws IOException {
super.startUpload(); super.startUpload();
dataSize = bufferCapacityUsed(); dataSize = bufferCapacityUsed();
// set the buffer up from reading from the beginning // set the buffer up from reading from the beginning
buffer.limit(buffer.position()); blockBuffer.limit(blockBuffer.position());
buffer.position(0); blockBuffer.position(0);
return new ByteBufferInputStream(dataSize, buffer); return new BlockUploadData(
new ByteBufferInputStream(dataSize, blockBuffer));
} }
@Override @Override
@ -489,66 +594,72 @@ final class S3ADataBlocks {
@Override @Override
public int remainingCapacity() { public int remainingCapacity() {
return buffer != null ? buffer.remaining() : 0; return blockBuffer != null ? blockBuffer.remaining() : 0;
} }
private int bufferCapacityUsed() { private int bufferCapacityUsed() {
return buffer.capacity() - buffer.remaining(); return blockBuffer.capacity() - blockBuffer.remaining();
} }
@Override @Override
int write(byte[] b, int offset, int len) throws IOException { int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len); super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len); int written = Math.min(remainingCapacity(), len);
buffer.put(b, offset, written); blockBuffer.put(b, offset, written);
return written; return written;
} }
/**
* Closing the block will release the buffer.
*/
@Override @Override
protected void innerClose() { protected void innerClose() {
buffer = null; if (blockBuffer != null) {
blockReleased();
releaseBuffer(blockBuffer);
blockBuffer = null;
}
} }
@Override @Override
public String toString() { public String toString() {
return "ByteBufferBlock{" return "ByteBufferBlock{"
+ "state=" + getState() + + "index=" + index +
", state=" + getState() +
", dataSize=" + dataSize() + ", dataSize=" + dataSize() +
", limit=" + bufferSize + ", limit=" + bufferSize +
", remainingCapacity=" + remainingCapacity() + ", remainingCapacity=" + remainingCapacity() +
'}'; '}';
} }
}
/** /**
* Provide an input stream from a byte buffer; supporting * Provide an input stream from a byte buffer; supporting
* {@link #mark(int)}, which is required to enable replay of failed * {@link #mark(int)}, which is required to enable replay of failed
* PUT attempts. * PUT attempts.
* This input stream returns the buffer to the pool afterwards.
*/ */
class ByteBufferInputStream extends InputStream { class ByteBufferInputStream extends InputStream {
private final int size; private final int size;
private ByteBuffer byteBuffer; private ByteBuffer byteBuffer;
ByteBufferInputStream(int size, ByteBuffer byteBuffer) { ByteBufferInputStream(int size,
ByteBuffer byteBuffer) {
LOG.debug("Creating ByteBufferInputStream of size {}", size); LOG.debug("Creating ByteBufferInputStream of size {}", size);
this.size = size; this.size = size;
this.byteBuffer = byteBuffer; this.byteBuffer = byteBuffer;
} }
/** /**
* Return the buffer to the pool after the stream is closed. * After the stream is closed, set the local reference to the byte
* buffer to null; this guarantees that future attempts to use
* stream methods will fail.
*/ */
@Override @Override
public synchronized void close() { public synchronized void close() {
if (byteBuffer != null) { LOG.debug("ByteBufferInputStream.close() for {}",
LOG.debug("releasing buffer"); ByteBufferBlock.super.toString());
releaseBuffer(byteBuffer);
byteBuffer = null; byteBuffer = null;
} }
}
/** /**
* Verify that the stream is open. * Verify that the stream is open.
@ -624,7 +735,7 @@ final class S3ADataBlocks {
/** /**
* Read in data. * Read in data.
* @param buffer destination buffer * @param b destination buffer
* @param offset offset within the buffer * @param offset offset within the buffer
* @param length length of bytes to read * @param length length of bytes to read
* @throws EOFException if the position is negative * @throws EOFException if the position is negative
@ -633,16 +744,16 @@ final class S3ADataBlocks {
* @throws IllegalArgumentException other arguments are invalid. * @throws IllegalArgumentException other arguments are invalid.
*/ */
@SuppressWarnings("NullableProblems") @SuppressWarnings("NullableProblems")
public synchronized int read(byte[] buffer, int offset, int length) public synchronized int read(byte[] b, int offset, int length)
throws IOException { throws IOException {
Preconditions.checkArgument(length >= 0, "length is negative"); Preconditions.checkArgument(length >= 0, "length is negative");
Preconditions.checkArgument(buffer != null, "Null buffer"); Preconditions.checkArgument(b != null, "Null buffer");
if (buffer.length - offset < length) { if (b.length - offset < length) {
throw new IndexOutOfBoundsException( throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ ": request length =" + length + ": request length =" + length
+ ", with offset =" + offset + ", with offset =" + offset
+ "; buffer capacity =" + (buffer.length - offset)); + "; buffer capacity =" + (b.length - offset));
} }
verifyOpen(); verifyOpen();
if (!hasRemaining()) { if (!hasRemaining()) {
@ -650,7 +761,7 @@ final class S3ADataBlocks {
} }
int toRead = Math.min(length, available()); int toRead = Math.min(length, available());
byteBuffer.get(buffer, offset, toRead); byteBuffer.get(b, offset, toRead);
return toRead; return toRead;
} }
@ -659,15 +770,17 @@ final class S3ADataBlocks {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"ByteBufferInputStream{"); "ByteBufferInputStream{");
sb.append("size=").append(size); sb.append("size=").append(size);
ByteBuffer buffer = this.byteBuffer; ByteBuffer buf = this.byteBuffer;
if (buffer != null) { if (buf != null) {
sb.append(", available=").append(buffer.remaining()); sb.append(", available=").append(buf.remaining());
} }
sb.append(", ").append(ByteBufferBlock.super.toString());
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
} }
} }
}
// ==================================================================== // ====================================================================
@ -681,22 +794,29 @@ final class S3ADataBlocks {
} }
/** /**
* Create a temp file and a block which writes to it. * Create a temp file and a {@link DiskBlock} instance to manage it.
*
* @param index block index
* @param limit limit of the block. * @param limit limit of the block.
* @param statistics statistics to update
* @return the new block * @return the new block
* @throws IOException IO problems * @throws IOException IO problems
*/ */
@Override @Override
DataBlock create(int limit) throws IOException { DataBlock create(long index,
int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
File destFile = getOwner() File destFile = getOwner()
.createTmpFileForWrite("s3ablock", limit, getOwner().getConf()); .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
return new DiskBlock(destFile, limit); limit, getOwner().getConf());
return new DiskBlock(destFile, limit, index, statistics);
} }
} }
/** /**
* Stream to a file. * Stream to a file.
* This will stop at the limit; the caller is expected to create a new block * This will stop at the limit; the caller is expected to create a new block.
*/ */
static class DiskBlock extends DataBlock { static class DiskBlock extends DataBlock {
@ -704,12 +824,17 @@ final class S3ADataBlocks {
private final File bufferFile; private final File bufferFile;
private final int limit; private final int limit;
private BufferedOutputStream out; private BufferedOutputStream out;
private InputStream uploadStream; private final AtomicBoolean closed = new AtomicBoolean(false);
DiskBlock(File bufferFile, int limit) DiskBlock(File bufferFile,
int limit,
long index,
S3AInstrumentation.OutputStreamStatistics statistics)
throws FileNotFoundException { throws FileNotFoundException {
super(index, statistics);
this.limit = limit; this.limit = limit;
this.bufferFile = bufferFile; this.bufferFile = bufferFile;
blockAllocated();
out = new BufferedOutputStream(new FileOutputStream(bufferFile)); out = new BufferedOutputStream(new FileOutputStream(bufferFile));
} }
@ -738,7 +863,7 @@ final class S3ADataBlocks {
} }
@Override @Override
InputStream startUpload() throws IOException { BlockUploadData startUpload() throws IOException {
super.startUpload(); super.startUpload();
try { try {
out.flush(); out.flush();
@ -746,8 +871,7 @@ final class S3ADataBlocks {
out.close(); out.close();
out = null; out = null;
} }
uploadStream = new FileInputStream(bufferFile); return new BlockUploadData(bufferFile);
return new FileDeletingInputStream(uploadStream);
} }
/** /**
@ -755,6 +879,7 @@ final class S3ADataBlocks {
* exists. * exists.
* @throws IOException IO problems * @throws IOException IO problems
*/ */
@SuppressWarnings("UnnecessaryDefault")
@Override @Override
protected void innerClose() throws IOException { protected void innerClose() throws IOException {
final DestState state = getState(); final DestState state = getState();
@ -763,20 +888,19 @@ final class S3ADataBlocks {
case Writing: case Writing:
if (bufferFile.exists()) { if (bufferFile.exists()) {
// file was not uploaded // file was not uploaded
LOG.debug("Deleting buffer file as upload did not start"); LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
boolean deleted = bufferFile.delete(); index);
if (!deleted && bufferFile.exists()) { closeBlock();
LOG.warn("Failed to delete buffer file {}", bufferFile);
}
} }
break; break;
case Upload: case Upload:
LOG.debug("Buffer file {} exists —close upload stream", bufferFile); LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
index, bufferFile);
break; break;
case Closed: case Closed:
// no-op closeBlock();
break; break;
default: default:
@ -798,7 +922,8 @@ final class S3ADataBlocks {
@Override @Override
public String toString() { public String toString() {
String sb = "FileBlock{" String sb = "FileBlock{"
+ "destFile=" + bufferFile + + "index=" + index
+ ", destFile=" + bufferFile +
", state=" + getState() + ", state=" + getState() +
", dataSize=" + dataSize() + ", dataSize=" + dataSize() +
", limit=" + limit + ", limit=" + limit +
@ -807,31 +932,20 @@ final class S3ADataBlocks {
} }
/** /**
* An input stream which deletes the buffer file when closed. * Close the block.
* This will delete the block's buffer file if the block has
* not previously been closed.
*/ */
private final class FileDeletingInputStream extends FilterInputStream { void closeBlock() {
private final AtomicBoolean closed = new AtomicBoolean(false); LOG.debug("block[{}]: closeBlock()", index);
FileDeletingInputStream(InputStream source) {
super(source);
}
/**
* Delete the input file when closed.
* @throws IOException IO problem
*/
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (!closed.getAndSet(true)) { if (!closed.getAndSet(true)) {
if (!bufferFile.delete()) { blockReleased();
if (!bufferFile.delete() && bufferFile.exists()) {
LOG.warn("delete({}) returned false", LOG.warn("delete({}) returned false",
bufferFile.getAbsoluteFile()); bufferFile.getAbsoluteFile());
} }
} } else {
} LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
} }
} }
} }

View File

@ -993,6 +993,7 @@ public class S3AFileSystem extends FileSystem {
*/ */
public PutObjectRequest newPutObjectRequest(String key, public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) { ObjectMetadata metadata, File srcfile) {
Preconditions.checkNotNull(srcfile);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile); srcfile);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
@ -1009,8 +1010,9 @@ public class S3AFileSystem extends FileSystem {
* @param inputStream source data. * @param inputStream source data.
* @return the request * @return the request
*/ */
PutObjectRequest newPutObjectRequest(String key, private PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) { ObjectMetadata metadata, InputStream inputStream) {
Preconditions.checkNotNull(inputStream);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata); inputStream, metadata);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
@ -1048,12 +1050,16 @@ public class S3AFileSystem extends FileSystem {
} }
/** /**
* PUT an object, incrementing the put requests and put bytes * Start a transfer-manager managed async PUT of an object,
* incrementing the put requests and put bytes
* counters. * counters.
* It does not update the other counters, * It does not update the other counters,
* as existing code does that as progress callbacks come in. * as existing code does that as progress callbacks come in.
* Byte length is calculated from the file length, or, if there is no * Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header. * file, from the content length of the header.
* Because the operation is async, any stream supplied in the request
* must reference data (files, buffers) which stay valid until the upload
* completes.
* @param putObjectRequest the request * @param putObjectRequest the request
* @return the upload initiated * @return the upload initiated
*/ */
@ -1079,6 +1085,7 @@ public class S3AFileSystem extends FileSystem {
* PUT an object directly (i.e. not via the transfer manager). * PUT an object directly (i.e. not via the transfer manager).
* Byte length is calculated from the file length, or, if there is no * Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header. * file, from the content length of the header.
* <i>Important: this call will close any input stream in the request.</i>
* @param putObjectRequest the request * @param putObjectRequest the request
* @return the upload initiated * @return the upload initiated
* @throws AmazonClientException on problems * @throws AmazonClientException on problems
@ -1104,7 +1111,8 @@ public class S3AFileSystem extends FileSystem {
/** /**
* Upload part of a multi-partition file. * Upload part of a multi-partition file.
* Increments the write and put counters * Increments the write and put counters.
* <i>Important: this call does not close any input stream in the request.</i>
* @param request request * @param request request
* @return the result of the operation. * @return the result of the operation.
* @throws AmazonClientException on problems * @throws AmazonClientException on problems
@ -2202,14 +2210,28 @@ public class S3AFileSystem extends FileSystem {
/** /**
* Create a {@link PutObjectRequest} request. * Create a {@link PutObjectRequest} request.
* The metadata is assumed to have been configured with the size of the * If {@code length} is set, the metadata is configured with the size of
* operation. * the upload.
* @param inputStream source data. * @param inputStream source data.
* @param length size, if known. Use -1 for not known * @param length size, if known. Use -1 for not known
* @return the request * @return the request
*/ */
PutObjectRequest newPutRequest(InputStream inputStream, long length) { PutObjectRequest newPutRequest(InputStream inputStream, long length) {
return newPutObjectRequest(key, newObjectMetadata(length), inputStream); PutObjectRequest request = newPutObjectRequest(key,
newObjectMetadata(length), inputStream);
return request;
}
/**
* Create a {@link PutObjectRequest} request to upload a file.
* @param sourceFile source file
* @return the request
*/
PutObjectRequest newPutRequest(File sourceFile) {
int length = (int) sourceFile.length();
PutObjectRequest request = newPutObjectRequest(key,
newObjectMetadata(length), sourceFile);
return request;
} }
/** /**
@ -2271,6 +2293,8 @@ public class S3AFileSystem extends FileSystem {
Preconditions.checkNotNull(partETags); Preconditions.checkNotNull(partETags);
Preconditions.checkArgument(!partETags.isEmpty(), Preconditions.checkArgument(!partETags.isEmpty(),
"No partitions have been uploaded"); "No partitions have been uploaded");
LOG.debug("Completing multipart upload {} with {} parts",
uploadId, partETags.size());
return s3.completeMultipartUpload( return s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket, new CompleteMultipartUploadRequest(bucket,
key, key,
@ -2281,28 +2305,31 @@ public class S3AFileSystem extends FileSystem {
/** /**
* Abort a multipart upload operation. * Abort a multipart upload operation.
* @param uploadId multipart operation Id * @param uploadId multipart operation Id
* @return the result
* @throws AmazonClientException on problems. * @throws AmazonClientException on problems.
*/ */
void abortMultipartUpload(String uploadId) throws AmazonClientException { void abortMultipartUpload(String uploadId) throws AmazonClientException {
LOG.debug("Aborting multipart upload {}", uploadId);
s3.abortMultipartUpload( s3.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, key, uploadId)); new AbortMultipartUploadRequest(bucket, key, uploadId));
} }
/** /**
* Create and initialize a part request of a multipart upload. * Create and initialize a part request of a multipart upload.
* Exactly one of: {@code uploadStream} or {@code sourceFile}
* must be specified.
* @param uploadId ID of ongoing upload * @param uploadId ID of ongoing upload
* @param uploadStream source of data to upload
* @param partNumber current part number of the upload * @param partNumber current part number of the upload
* @param size amount of data * @param size amount of data
* @param uploadStream source of data to upload
* @param sourceFile optional source file.
* @return the request. * @return the request.
*/ */
UploadPartRequest newUploadPartRequest(String uploadId, UploadPartRequest newUploadPartRequest(String uploadId,
InputStream uploadStream, int partNumber, int size, InputStream uploadStream, File sourceFile) {
int partNumber,
int size) {
Preconditions.checkNotNull(uploadId); Preconditions.checkNotNull(uploadId);
Preconditions.checkNotNull(uploadStream); // exactly one source must be set; xor verifies this
Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null),
"Data source");
Preconditions.checkArgument(size > 0, "Invalid partition size %s", size); Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000,
"partNumber must be between 1 and 10000 inclusive, but is %s", "partNumber must be between 1 and 10000 inclusive, but is %s",
@ -2310,13 +2337,19 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("Creating part upload request for {} #{} size {}", LOG.debug("Creating part upload request for {} #{} size {}",
uploadId, partNumber, size); uploadId, partNumber, size);
return new UploadPartRequest() UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket) .withBucketName(bucket)
.withKey(key) .withKey(key)
.withUploadId(uploadId) .withUploadId(uploadId)
.withInputStream(uploadStream)
.withPartNumber(partNumber) .withPartNumber(partNumber)
.withPartSize(size); .withPartSize(size);
if (uploadStream != null) {
// there's an upload stream. Bind to it.
request.setInputStream(uploadStream);
} else {
request.setFile(sourceFile);
}
return request;
} }
/** /**
@ -2331,6 +2364,21 @@ public class S3AFileSystem extends FileSystem {
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
/**
* PUT an object directly (i.e. not via the transfer manager).
* @param putObjectRequest the request
* @return the upload initiated
* @throws IOException on problems
*/
PutObjectResult putObject(PutObjectRequest putObjectRequest)
throws IOException {
try {
return putObjectDirect(putObjectRequest);
} catch (AmazonClientException e) {
throw translateException("put", putObjectRequest.getKey(), e);
}
}
} }
} }

View File

@ -36,6 +36,7 @@ import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
@ -428,7 +429,7 @@ public class S3AInstrumentation {
if (gauge != null) { if (gauge != null) {
gauge.decr(count); gauge.decr(count);
} else { } else {
LOG.debug("No Gauge: " + op); LOG.debug("No Gauge: {}", op);
} }
} }
@ -676,6 +677,8 @@ public class S3AInstrumentation {
private final AtomicLong transferDuration = new AtomicLong(0); private final AtomicLong transferDuration = new AtomicLong(0);
private final AtomicLong queueDuration = new AtomicLong(0); private final AtomicLong queueDuration = new AtomicLong(0);
private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
private final AtomicInteger blocksAllocated = new AtomicInteger(0);
private final AtomicInteger blocksReleased = new AtomicInteger(0);
private Statistics statistics; private Statistics statistics;
@ -683,6 +686,20 @@ public class S3AInstrumentation {
this.statistics = statistics; this.statistics = statistics;
} }
/**
* A block has been allocated.
*/
void blockAllocated() {
blocksAllocated.incrementAndGet();
}
/**
* A block has been released.
*/
void blockReleased() {
blocksReleased.incrementAndGet();
}
/** /**
* Block is queued for upload. * Block is queued for upload.
*/ */
@ -778,6 +795,24 @@ public class S3AInstrumentation {
return queueDuration.get() + transferDuration.get(); return queueDuration.get() + transferDuration.get();
} }
public int blocksAllocated() {
return blocksAllocated.get();
}
public int blocksReleased() {
return blocksReleased.get();
}
/**
* Get counters of blocks actively allocated; my be inaccurate
* if the numbers change during the (non-synchronized) calculation.
* @return the number of actively allocated blocks.
*/
public int blocksActivelyAllocated() {
return blocksAllocated.get() - blocksReleased.get();
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
@ -789,6 +824,9 @@ public class S3AInstrumentation {
sb.append(", blockUploadsFailed=").append(blockUploadsFailed); sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
sb.append(", bytesPendingUpload=").append(bytesPendingUpload); sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
sb.append(", bytesUploaded=").append(bytesUploaded); sb.append(", bytesUploaded=").append(bytesUploaded);
sb.append(", blocksAllocated=").append(blocksAllocated);
sb.append(", blocksReleased=").append(blocksReleased);
sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated());
sb.append(", exceptionsInMultipartFinalize=").append( sb.append(", exceptionsInMultipartFinalize=").append(
exceptionsInMultipartFinalize); exceptionsInMultipartFinalize);
sb.append(", transferDuration=").append(transferDuration).append(" ms"); sb.append(", transferDuration=").append(transferDuration).append(" ms");

View File

@ -723,4 +723,30 @@ public final class S3AUtils {
"patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH); "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
} }
} }
/**
* Close the Closeable objects and <b>ignore</b> any Exception or
* null pointers.
* (This is the SLF4J equivalent of that in {@code IOUtils}).
* @param log the log to log at debug level. Can be null.
* @param closeables the objects to close
*/
public static void closeAll(Logger log,
java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
if (log != null) {
log.debug("Closing {}", c);
}
c.close();
} catch (Exception e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing {}", c, e);
}
}
}
}
}
} }

View File

@ -24,9 +24,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
@ -38,6 +41,14 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
* multipart tests are kept in scale tests. * multipart tests are kept in scale tests.
*/ */
public class ITestS3ABlockOutputArray extends AbstractS3ATestBase { public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
private static final int BLOCK_SIZE = 256 * 1024;
private static byte[] dataset;
@BeforeClass
public static void setupDataset() {
dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256);
}
@Override @Override
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
@ -65,9 +76,9 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
} }
@Test(expected = IOException.class) @Test(expected = IOException.class)
public void testDoubleStreamClose() throws Throwable { public void testWriteAfterStreamClose() throws Throwable {
Path dest = path("testDoubleStreamClose"); Path dest = path("testWriteAfterStreamClose");
describe(" testDoubleStreamClose"); describe(" testWriteAfterStreamClose");
FSDataOutputStream stream = getFileSystem().create(dest, true); FSDataOutputStream stream = getFileSystem().create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26); byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try { try {
@ -79,7 +90,25 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
} }
} }
public void verifyUpload(String name, int fileSize) throws IOException { @Test
public void testBlocksClosed() throws Throwable {
Path dest = path("testBlocksClosed");
describe(" testBlocksClosed");
FSDataOutputStream stream = getFileSystem().create(dest, true);
S3AInstrumentation.OutputStreamStatistics statistics
= S3ATestUtils.getOutputStreamStatistics(stream);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
stream.write(data);
LOG.info("closing output stream");
stream.close();
assertEquals("total allocated blocks in " + statistics,
1, statistics.blocksAllocated());
assertEquals("actively allocated blocks in " + statistics,
0, statistics.blocksActivelyAllocated());
LOG.info("end of test case");
}
private void verifyUpload(String name, int fileSize) throws IOException {
Path dest = path(name); Path dest = path(name);
describe(name + " upload to " + dest); describe(name + " upload to " + dest);
ContractTestUtils.createAndVerifyFile( ContractTestUtils.createAndVerifyFile(
@ -87,4 +116,43 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
dest, dest,
fileSize); fileSize);
} }
/**
* Create a factory for used in mark/reset tests.
* @param fileSystem source FS
* @return the factory
*/
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
return new S3ADataBlocks.ArrayBlockFactory(fileSystem);
}
private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
throws Exception {
S3AInstrumentation instrumentation =
new S3AInstrumentation(new URI("s3a://example"));
S3AInstrumentation.OutputStreamStatistics outstats
= instrumentation.newOutputStreamStatistics(null);
S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
block.write(dataset, 0, dataset.length);
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
InputStream stream = uploadData.getUploadStream();
assertNotNull(stream);
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertEquals(0, stream.read());
stream.mark(BLOCK_SIZE);
// read a lot
long l = 0;
while (stream.read() != -1) {
// do nothing
l++;
}
stream.reset();
assertEquals(1, stream.read());
}
@Test
public void testMarkReset() throws Throwable {
markAndResetDatablock(createFactory(getFileSystem()));
}
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
/** /**
* Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering. * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
*/ */
@ -27,4 +26,8 @@ public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BYTEBUFFER; return Constants.FAST_UPLOAD_BYTEBUFFER;
} }
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
return new S3ADataBlocks.ByteBufferBlockFactory(fileSystem);
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import org.junit.Assume;
/** /**
* Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering. * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
*/ */
@ -27,4 +29,14 @@ public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BUFFER_DISK; return Constants.FAST_UPLOAD_BUFFER_DISK;
} }
/**
* The disk stream doesn't support mark/reset; calls
* {@code Assume} to skip the test.
* @param fileSystem source FS
* @return null
*/
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
Assume.assumeTrue("mark/reset nopt supoprted", false);
return null;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Assert; import org.junit.Assert;
@ -544,4 +545,16 @@ public final class S3ATestUtils {
} }
Assume.assumeTrue(message, condition); Assume.assumeTrue(message, condition);
} }
/**
* Get the statistics from a wrapped block output stream.
* @param out output stream
* @return the (active) stats of the write
*/
public static S3AInstrumentation.OutputStreamStatistics
getOutputStreamStatistics(FSDataOutputStream out) {
S3ABlockOutputStream blockOutputStream
= (S3ABlockOutputStream) out.getWrappedStream();
return blockOutputStream.getStatistics();
}
} }

View File

@ -51,9 +51,8 @@ public class TestDataBlocks extends Assert {
new S3ADataBlocks.ByteBufferBlockFactory(null)) { new S3ADataBlocks.ByteBufferBlockFactory(null)) {
int limit = 128; int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
= factory.create(limit); = factory.create(1, limit, null);
assertEquals("outstanding buffers in " + factory, assertOutstandingBuffers(factory, 1);
1, factory.getOutstandingBufferCount());
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data"); byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length; int bufferLen = buffer.length;
@ -66,24 +65,23 @@ public class TestDataBlocks extends Assert {
block.hasCapacity(limit - bufferLen)); block.hasCapacity(limit - bufferLen));
// now start the write // now start the write
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
stream = block.startUpload(); S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
stream =
(S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
blockUploadData.getUploadStream();
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertTrue("!hasRemaining() in " + stream, stream.hasRemaining()); assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
int expected = bufferLen; int expected = bufferLen;
assertEquals("wrong available() in " + stream, assertEquals("wrong available() in " + stream,
expected, stream.available()); expected, stream.available());
assertEquals('t', stream.read()); assertEquals('t', stream.read());
stream.mark(limit);
expected--; expected--;
assertEquals("wrong available() in " + stream, assertEquals("wrong available() in " + stream,
expected, stream.available()); expected, stream.available());
// close the block. The buffer must remain outstanding here;
// the stream manages the lifecycle of it now
block.close();
assertEquals("outstanding buffers in " + factory,
1, factory.getOutstandingBufferCount());
block.close();
// read into a byte array with an offset // read into a byte array with an offset
int offset = 5; int offset = 5;
@ -109,16 +107,31 @@ public class TestDataBlocks extends Assert {
0, stream.available()); 0, stream.available());
assertTrue("hasRemaining() in " + stream, !stream.hasRemaining()); assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
// go the mark point
stream.reset();
assertEquals('e', stream.read());
// when the stream is closed, the data should be returned // when the stream is closed, the data should be returned
stream.close(); stream.close();
assertEquals("outstanding buffers in " + factory, assertOutstandingBuffers(factory, 1);
0, factory.getOutstandingBufferCount()); block.close();
assertOutstandingBuffers(factory, 0);
stream.close(); stream.close();
assertOutstandingBuffers(factory, 0);
}
}
/**
* Assert the number of buffers active for a block factory.
* @param factory factory
* @param expectedCount expected count.
*/
private static void assertOutstandingBuffers(
S3ADataBlocks.ByteBufferBlockFactory factory,
int expectedCount) {
assertEquals("outstanding buffers in " + factory, assertEquals("outstanding buffers in " + factory,
0, factory.getOutstandingBufferCount()); expectedCount, factory.getOutstandingBufferCount());
}
} }
} }

View File

@ -34,11 +34,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -159,13 +161,20 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING; Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
S3AInstrumentation.OutputStreamStatistics streamStatistics;
long blocksPer10MB = blocksPerMB * 10; long blocksPer10MB = blocksPerMB * 10;
ProgressCallback progress = new ProgressCallback(timer); ProgressCallback progress = new ProgressCallback(timer);
try (FSDataOutputStream out = fs.create(hugefile, try (FSDataOutputStream out = fs.create(hugefile,
true, true,
uploadBlockSize, uploadBlockSize,
progress)) { progress)) {
try {
streamStatistics = getOutputStreamStatistics(out);
} catch (ClassCastException e) {
LOG.info("Wrapped output stream is not block stream: {}",
out.getWrappedStream());
streamStatistics = null;
}
for (long block = 1; block <= blocks; block++) { for (long block = 1; block <= blocks; block++) {
out.write(data); out.write(data);
@ -190,7 +199,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
} }
} }
// now close the file // now close the file
LOG.info("Closing file and completing write operation"); LOG.info("Closing stream {}", out);
LOG.info("Statistics : {}", streamStatistics);
ContractTestUtils.NanoTimer closeTimer ContractTestUtils.NanoTimer closeTimer
= new ContractTestUtils.NanoTimer(); = new ContractTestUtils.NanoTimer();
out.close(); out.close();
@ -201,6 +211,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
filesizeMB, uploadBlockSize); filesizeMB, uploadBlockSize);
logFSState(); logFSState();
bandwidth(timer, filesize); bandwidth(timer, filesize);
LOG.info("Statistics after stream closed: {}", streamStatistics);
long putRequestCount = storageStatistics.getLong(putRequests); long putRequestCount = storageStatistics.getLong(putRequests);
Long putByteCount = storageStatistics.getLong(putBytes); Long putByteCount = storageStatistics.getLong(putBytes);
LOG.info("PUT {} bytes in {} operations; {} MB/operation", LOG.info("PUT {} bytes in {} operations; {} MB/operation",
@ -214,7 +225,14 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
S3AFileStatus status = fs.getFileStatus(hugefile); S3AFileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status); ContractTestUtils.assertIsFile(hugefile, status);
assertEquals("File size in " + status, filesize, status.getLen()); assertEquals("File size in " + status, filesize, status.getLen());
progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize); if (progress != null) {
progress.verifyNoFailures("Put file " + hugefile
+ " of size " + filesize);
}
if (streamStatistics != null) {
assertEquals("actively allocated blocks in " + streamStatistics,
0, streamStatistics.blocksActivelyAllocated());
}
} }
/** /**
@ -285,7 +303,9 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
void assumeHugeFileExists() throws IOException { void assumeHugeFileExists() throws IOException {
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
ContractTestUtils.assertIsFile(fs, hugefile); FileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status);
assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
} }
private void logFSState() { private void logFSState() {