diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 89b9b29726d..1b0929b5c80 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -48,7 +47,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.util.Progressable; @@ -178,10 +176,10 @@ private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() if (activeBlock == null) { blockCount++; if (blockCount>= Constants.MAX_MULTIPART_COUNT) { - LOG.error("Number of partitions in stream exceeds limit for S3: " + + LOG.error("Number of partitions in stream exceeds limit for S3: " + Constants.MAX_MULTIPART_COUNT + " write may fail."); } - activeBlock = blockFactory.create(this.blockSize); + activeBlock = blockFactory.create(blockCount, this.blockSize, statistics); } return activeBlock; } @@ -206,7 +204,9 @@ private synchronized boolean hasActiveBlock() { * Clear the active block. */ private void clearActiveBlock() { - LOG.debug("Clearing active block"); + if (activeBlock != null) { + LOG.debug("Clearing active block"); + } synchronized (this) { activeBlock = null; } @@ -356,11 +356,9 @@ public void close() throws IOException { writeOperationHelper.writeFailed(ioe); throw ioe; } finally { - LOG.debug("Closing block and factory"); - IOUtils.closeStream(block); - IOUtils.closeStream(blockFactory); + closeAll(LOG, block, blockFactory); LOG.debug("Statistics: {}", statistics); - IOUtils.closeStream(statistics); + closeAll(LOG, statistics); clearActiveBlock(); } // All end of write operations, including deleting fake parent directories @@ -378,10 +376,10 @@ private void putObject() throws IOException { final S3ADataBlocks.DataBlock block = getActiveBlock(); int size = block.dataSize(); - final PutObjectRequest putObjectRequest = - writeOperationHelper.newPutRequest( - block.startUpload(), - size); + final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); + final PutObjectRequest putObjectRequest = uploadData.hasFile() ? + writeOperationHelper.newPutRequest(uploadData.getFile()) + : writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size); fs.setOptionalPutRequestParameters(putObjectRequest); long transferQueueTime = now(); BlockUploadProgress callback = @@ -393,8 +391,14 @@ private void putObject() throws IOException { executorService.submit(new Callable() { @Override public PutObjectResult call() throws Exception { - PutObjectResult result = fs.putObjectDirect(putObjectRequest); - block.close(); + PutObjectResult result; + try { + // the putObject call automatically closes the input + // stream afterwards. + result = writeOperationHelper.putObject(putObjectRequest); + } finally { + closeAll(LOG, uploadData, block); + } return result; } }); @@ -437,6 +441,14 @@ private long now() { return System.currentTimeMillis(); } + /** + * Get the statistics for this stream. + * @return stream statistics + */ + S3AInstrumentation.OutputStreamStatistics getStatistics() { + return statistics; + } + /** * Multiple partition upload. */ @@ -444,7 +456,7 @@ private class MultiPartUpload { private final String uploadId; private final List> partETagsFutures; - public MultiPartUpload() throws IOException { + MultiPartUpload() throws IOException { this.uploadId = writeOperationHelper.initiateMultiPartUpload(); this.partETagsFutures = new ArrayList<>(2); LOG.debug("Initiated multi-part upload for {} with " + @@ -461,14 +473,16 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) throws IOException { LOG.debug("Queueing upload of {}", block); final int size = block.dataSize(); - final InputStream uploadStream = block.startUpload(); + final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request = writeOperationHelper.newUploadPartRequest( uploadId, - uploadStream, currentPartNumber, - size); + size, + uploadData.getUploadStream(), + uploadData.getFile()); + long transferQueueTime = now(); BlockUploadProgress callback = new BlockUploadProgress( @@ -483,12 +497,16 @@ public PartETag call() throws Exception { LOG.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId); // do the upload - PartETag partETag = fs.uploadPart(request).getPartETag(); - LOG.debug("Completed upload of {}", block); - LOG.debug("Stream statistics of {}", statistics); - - // close the block - block.close(); + PartETag partETag; + try { + partETag = fs.uploadPart(request).getPartETag(); + LOG.debug("Completed upload of {} to part {}", block, + partETag.getETag()); + LOG.debug("Stream statistics of {}", statistics); + } finally { + // close the stream and block + closeAll(LOG, uploadData, block); + } return partETag; } }); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 05f8efe6ebe..9bc8dcd190a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -24,10 +24,8 @@ import java.io.Closeable; import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -42,10 +40,11 @@ import org.apache.hadoop.util.DirectBufferPool; import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll; /** * Set of classes to support output streaming into blocks which are then - * uploaded as partitions. + * uploaded as to S3 as a single PUT, or as part of a multipart request. */ final class S3ADataBlocks { @@ -96,6 +95,70 @@ static BlockFactory createFactory(S3AFileSystem owner, } } + /** + * The output information for an upload. + * It can be one of a file or an input stream. + * When closed, any stream is closed. Any source file is untouched. + */ + static final class BlockUploadData implements Closeable { + private final File file; + private final InputStream uploadStream; + + /** + * File constructor; input stream will be null. + * @param file file to upload + */ + BlockUploadData(File file) { + Preconditions.checkArgument(file.exists(), "No file: " + file); + this.file = file; + this.uploadStream = null; + } + + /** + * Stream constructor, file field will be null. + * @param uploadStream stream to upload + */ + BlockUploadData(InputStream uploadStream) { + Preconditions.checkNotNull(uploadStream, "rawUploadStream"); + this.uploadStream = uploadStream; + this.file = null; + } + + /** + * Predicate: does this instance contain a file reference. + * @return true if there is a file. + */ + boolean hasFile() { + return file != null; + } + + /** + * Get the file, if there is one. + * @return the file for uploading, or null. + */ + File getFile() { + return file; + } + + /** + * Get the raw upload stream, if the object was + * created with one. + * @return the upload stream or null. + */ + InputStream getUploadStream() { + return uploadStream; + } + + /** + * Close: closes any upload stream provided in the constructor. + * @throws IOException inherited exception + */ + @Override + public void close() throws IOException { + closeAll(LOG, uploadStream); + } + } + /** * Base class for block factories. */ @@ -110,15 +173,21 @@ protected BlockFactory(S3AFileSystem owner) { /** * Create a block. + * + * @param index index of block * @param limit limit of the block. + * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(int limit) throws IOException; + abstract DataBlock create(long index, int limit, + S3AInstrumentation.OutputStreamStatistics statistics) + throws IOException; /** * Implement any close/cleanup operation. * Base class is a no-op - * @throws IOException -ideally, it shouldn't. + * @throws IOException Inherited exception; implementations should + * avoid raising it. */ @Override public void close() throws IOException { @@ -140,6 +209,14 @@ static abstract class DataBlock implements Closeable { enum DestState {Writing, Upload, Closed} private volatile DestState state = Writing; + protected final long index; + protected final S3AInstrumentation.OutputStreamStatistics statistics; + + protected DataBlock(long index, + S3AInstrumentation.OutputStreamStatistics statistics) { + this.index = index; + this.statistics = statistics; + } /** * Atomically enter a state, verifying current state. @@ -243,8 +320,8 @@ void flush() throws IOException { * @return the stream * @throws IOException trouble */ - InputStream startUpload() throws IOException { - LOG.debug("Start datablock upload"); + BlockUploadData startUpload() throws IOException { + LOG.debug("Start datablock[{}] upload", index); enterState(Writing, Upload); return null; } @@ -278,6 +355,23 @@ protected void innerClose() throws IOException { } + /** + * A block has been allocated. + */ + protected void blockAllocated() { + if (statistics != null) { + statistics.blockAllocated(); + } + } + + /** + * A block has been released. + */ + protected void blockReleased() { + if (statistics != null) { + statistics.blockReleased(); + } + } } // ==================================================================== @@ -292,8 +386,10 @@ static class ArrayBlockFactory extends BlockFactory { } @Override - DataBlock create(int limit) throws IOException { - return new ByteArrayBlock(limit); + DataBlock create(long index, int limit, + S3AInstrumentation.OutputStreamStatistics statistics) + throws IOException { + return new ByteArrayBlock(0, limit, statistics); } } @@ -334,9 +430,13 @@ static class ByteArrayBlock extends DataBlock { // cache data size so that it is consistent after the buffer is reset. private Integer dataSize; - ByteArrayBlock(int limit) { + ByteArrayBlock(long index, + int limit, + S3AInstrumentation.OutputStreamStatistics statistics) { + super(index, statistics); this.limit = limit; buffer = new S3AByteArrayOutputStream(limit); + blockAllocated(); } /** @@ -349,12 +449,12 @@ int dataSize() { } @Override - InputStream startUpload() throws IOException { + BlockUploadData startUpload() throws IOException { super.startUpload(); dataSize = buffer.size(); ByteArrayInputStream bufferData = buffer.getInputStream(); buffer = null; - return bufferData; + return new BlockUploadData(bufferData); } @Override @@ -378,12 +478,14 @@ int write(byte[] b, int offset, int len) throws IOException { @Override protected void innerClose() { buffer = null; + blockReleased(); } @Override public String toString() { - return "ByteArrayBlock{" + - "state=" + getState() + + return "ByteArrayBlock{" + +"index=" + index + + ", state=" + getState() + ", limit=" + limit + ", dataSize=" + dataSize + '}'; @@ -395,12 +497,6 @@ public String toString() { /** * Stream via Direct ByteBuffers; these are allocated off heap * via {@link DirectBufferPool}. - * This is actually the most complex of all the block factories, - * due to the need to explicitly recycle buffers; in comparison, the - * {@link DiskBlock} buffer delegates the work of deleting files to - * the {@link DiskBlock.FileDeletingInputStream}. Here the - * input stream {@link ByteBufferInputStream} has a similar task, along - * with the foundational work of streaming data from a byte array. */ static class ByteBufferBlockFactory extends BlockFactory { @@ -413,8 +509,10 @@ static class ByteBufferBlockFactory extends BlockFactory { } @Override - ByteBufferBlock create(int limit) throws IOException { - return new ByteBufferBlock(limit); + ByteBufferBlock create(long index, int limit, + S3AInstrumentation.OutputStreamStatistics statistics) + throws IOException { + return new ByteBufferBlock(index, limit, statistics); } private ByteBuffer requestBuffer(int limit) { @@ -446,21 +544,27 @@ public String toString() { /** * A DataBlock which requests a buffer from pool on creation; returns - * it when the output stream is closed. + * it when it is closed. */ class ByteBufferBlock extends DataBlock { - private ByteBuffer buffer; + private ByteBuffer blockBuffer; private final int bufferSize; // cache data size so that it is consistent after the buffer is reset. private Integer dataSize; /** * Instantiate. This will request a ByteBuffer of the desired size. + * @param index block index * @param bufferSize buffer size + * @param statistics statistics to update */ - ByteBufferBlock(int bufferSize) { + ByteBufferBlock(long index, + int bufferSize, + S3AInstrumentation.OutputStreamStatistics statistics) { + super(index, statistics); this.bufferSize = bufferSize; - buffer = requestBuffer(bufferSize); + blockBuffer = requestBuffer(bufferSize); + blockAllocated(); } /** @@ -473,13 +577,14 @@ int dataSize() { } @Override - ByteBufferInputStream startUpload() throws IOException { + BlockUploadData startUpload() throws IOException { super.startUpload(); dataSize = bufferCapacityUsed(); // set the buffer up from reading from the beginning - buffer.limit(buffer.position()); - buffer.position(0); - return new ByteBufferInputStream(dataSize, buffer); + blockBuffer.limit(blockBuffer.position()); + blockBuffer.position(0); + return new BlockUploadData( + new ByteBufferInputStream(dataSize, blockBuffer)); } @Override @@ -489,182 +594,190 @@ public boolean hasCapacity(long bytes) { @Override public int remainingCapacity() { - return buffer != null ? buffer.remaining() : 0; + return blockBuffer != null ? blockBuffer.remaining() : 0; } private int bufferCapacityUsed() { - return buffer.capacity() - buffer.remaining(); + return blockBuffer.capacity() - blockBuffer.remaining(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); int written = Math.min(remainingCapacity(), len); - buffer.put(b, offset, written); + blockBuffer.put(b, offset, written); return written; } + /** + * Closing the block will release the buffer. + */ @Override protected void innerClose() { - buffer = null; + if (blockBuffer != null) { + blockReleased(); + releaseBuffer(blockBuffer); + blockBuffer = null; + } } @Override public String toString() { return "ByteBufferBlock{" - + "state=" + getState() + + + "index=" + index + + ", state=" + getState() + ", dataSize=" + dataSize() + ", limit=" + bufferSize + ", remainingCapacity=" + remainingCapacity() + '}'; } - } - - /** - * Provide an input stream from a byte buffer; supporting - * {@link #mark(int)}, which is required to enable replay of failed - * PUT attempts. - * This input stream returns the buffer to the pool afterwards. - */ - class ByteBufferInputStream extends InputStream { - - private final int size; - private ByteBuffer byteBuffer; - - ByteBufferInputStream(int size, ByteBuffer byteBuffer) { - LOG.debug("Creating ByteBufferInputStream of size {}", size); - this.size = size; - this.byteBuffer = byteBuffer; - } - /** - * Return the buffer to the pool after the stream is closed. + * Provide an input stream from a byte buffer; supporting + * {@link #mark(int)}, which is required to enable replay of failed + * PUT attempts. */ - @Override - public synchronized void close() { - if (byteBuffer != null) { - LOG.debug("releasing buffer"); - releaseBuffer(byteBuffer); + class ByteBufferInputStream extends InputStream { + + private final int size; + private ByteBuffer byteBuffer; + + ByteBufferInputStream(int size, + ByteBuffer byteBuffer) { + LOG.debug("Creating ByteBufferInputStream of size {}", size); + this.size = size; + this.byteBuffer = byteBuffer; + } + + /** + * After the stream is closed, set the local reference to the byte + * buffer to null; this guarantees that future attempts to use + * stream methods will fail. + */ + @Override + public synchronized void close() { + LOG.debug("ByteBufferInputStream.close() for {}", + ByteBufferBlock.super.toString()); byteBuffer = null; } - } - /** - * Verify that the stream is open. - * @throws IOException if the stream is closed - */ - private void verifyOpen() throws IOException { - if (byteBuffer == null) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } - - public synchronized int read() throws IOException { - if (available() > 0) { - return byteBuffer.get() & 0xFF; - } else { - return -1; - } - } - - @Override - public synchronized long skip(long offset) throws IOException { - verifyOpen(); - long newPos = position() + offset; - if (newPos < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); - } - if (newPos > size) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - byteBuffer.position((int) newPos); - return newPos; - } - - @Override - public synchronized int available() { - Preconditions.checkState(byteBuffer != null, - FSExceptionMessages.STREAM_IS_CLOSED); - return byteBuffer.remaining(); - } - - /** - * Get the current buffer position. - * @return the buffer position - */ - public synchronized int position() { - return byteBuffer.position(); - } - - /** - * Check if there is data left. - * @return true if there is data remaining in the buffer. - */ - public synchronized boolean hasRemaining() { - return byteBuffer.hasRemaining(); - } - - @Override - public synchronized void mark(int readlimit) { - LOG.debug("mark at {}", position()); - byteBuffer.mark(); - } - - @Override - public synchronized void reset() throws IOException { - LOG.debug("reset"); - byteBuffer.reset(); - } - - @Override - public boolean markSupported() { - return true; - } - - /** - * Read in data. - * @param buffer destination buffer - * @param offset offset within the buffer - * @param length length of bytes to read - * @throws EOFException if the position is negative - * @throws IndexOutOfBoundsException if there isn't space for the - * amount of data requested. - * @throws IllegalArgumentException other arguments are invalid. - */ - @SuppressWarnings("NullableProblems") - public synchronized int read(byte[] buffer, int offset, int length) - throws IOException { - Preconditions.checkArgument(length >= 0, "length is negative"); - Preconditions.checkArgument(buffer != null, "Null buffer"); - if (buffer.length - offset < length) { - throw new IndexOutOfBoundsException( - FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER - + ": request length =" + length - + ", with offset =" + offset - + "; buffer capacity =" + (buffer.length - offset)); - } - verifyOpen(); - if (!hasRemaining()) { - return -1; + /** + * Verify that the stream is open. + * @throws IOException if the stream is closed + */ + private void verifyOpen() throws IOException { + if (byteBuffer == null) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } } - int toRead = Math.min(length, available()); - byteBuffer.get(buffer, offset, toRead); - return toRead; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "ByteBufferInputStream{"); - sb.append("size=").append(size); - ByteBuffer buffer = this.byteBuffer; - if (buffer != null) { - sb.append(", available=").append(buffer.remaining()); + public synchronized int read() throws IOException { + if (available() > 0) { + return byteBuffer.get() & 0xFF; + } else { + return -1; + } + } + + @Override + public synchronized long skip(long offset) throws IOException { + verifyOpen(); + long newPos = position() + offset; + if (newPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (newPos > size) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + byteBuffer.position((int) newPos); + return newPos; + } + + @Override + public synchronized int available() { + Preconditions.checkState(byteBuffer != null, + FSExceptionMessages.STREAM_IS_CLOSED); + return byteBuffer.remaining(); + } + + /** + * Get the current buffer position. + * @return the buffer position + */ + public synchronized int position() { + return byteBuffer.position(); + } + + /** + * Check if there is data left. + * @return true if there is data remaining in the buffer. + */ + public synchronized boolean hasRemaining() { + return byteBuffer.hasRemaining(); + } + + @Override + public synchronized void mark(int readlimit) { + LOG.debug("mark at {}", position()); + byteBuffer.mark(); + } + + @Override + public synchronized void reset() throws IOException { + LOG.debug("reset"); + byteBuffer.reset(); + } + + @Override + public boolean markSupported() { + return true; + } + + /** + * Read in data. + * @param b destination buffer + * @param offset offset within the buffer + * @param length length of bytes to read + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the + * amount of data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + @SuppressWarnings("NullableProblems") + public synchronized int read(byte[] b, int offset, int length) + throws IOException { + Preconditions.checkArgument(length >= 0, "length is negative"); + Preconditions.checkArgument(b != null, "Null buffer"); + if (b.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER + + ": request length =" + length + + ", with offset =" + offset + + "; buffer capacity =" + (b.length - offset)); + } + verifyOpen(); + if (!hasRemaining()) { + return -1; + } + + int toRead = Math.min(length, available()); + byteBuffer.get(b, offset, toRead); + return toRead; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ByteBufferInputStream{"); + sb.append("size=").append(size); + ByteBuffer buf = this.byteBuffer; + if (buf != null) { + sb.append(", available=").append(buf.remaining()); + } + sb.append(", ").append(ByteBufferBlock.super.toString()); + sb.append('}'); + return sb.toString(); } - sb.append('}'); - return sb.toString(); } } } @@ -681,22 +794,29 @@ static class DiskBlockFactory extends BlockFactory { } /** - * Create a temp file and a block which writes to it. + * Create a temp file and a {@link DiskBlock} instance to manage it. + * + * @param index block index * @param limit limit of the block. + * @param statistics statistics to update * @return the new block * @throws IOException IO problems */ @Override - DataBlock create(int limit) throws IOException { + DataBlock create(long index, + int limit, + S3AInstrumentation.OutputStreamStatistics statistics) + throws IOException { File destFile = getOwner() - .createTmpFileForWrite("s3ablock", limit, getOwner().getConf()); - return new DiskBlock(destFile, limit); + .createTmpFileForWrite(String.format("s3ablock-%04d-", index), + limit, getOwner().getConf()); + return new DiskBlock(destFile, limit, index, statistics); } } /** * Stream to a file. - * This will stop at the limit; the caller is expected to create a new block + * This will stop at the limit; the caller is expected to create a new block. */ static class DiskBlock extends DataBlock { @@ -704,12 +824,17 @@ static class DiskBlock extends DataBlock { private final File bufferFile; private final int limit; private BufferedOutputStream out; - private InputStream uploadStream; + private final AtomicBoolean closed = new AtomicBoolean(false); - DiskBlock(File bufferFile, int limit) + DiskBlock(File bufferFile, + int limit, + long index, + S3AInstrumentation.OutputStreamStatistics statistics) throws FileNotFoundException { + super(index, statistics); this.limit = limit; this.bufferFile = bufferFile; + blockAllocated(); out = new BufferedOutputStream(new FileOutputStream(bufferFile)); } @@ -738,7 +863,7 @@ int write(byte[] b, int offset, int len) throws IOException { } @Override - InputStream startUpload() throws IOException { + BlockUploadData startUpload() throws IOException { super.startUpload(); try { out.flush(); @@ -746,8 +871,7 @@ InputStream startUpload() throws IOException { out.close(); out = null; } - uploadStream = new FileInputStream(bufferFile); - return new FileDeletingInputStream(uploadStream); + return new BlockUploadData(bufferFile); } /** @@ -755,6 +879,7 @@ InputStream startUpload() throws IOException { * exists. * @throws IOException IO problems */ + @SuppressWarnings("UnnecessaryDefault") @Override protected void innerClose() throws IOException { final DestState state = getState(); @@ -763,20 +888,19 @@ protected void innerClose() throws IOException { case Writing: if (bufferFile.exists()) { // file was not uploaded - LOG.debug("Deleting buffer file as upload did not start"); - boolean deleted = bufferFile.delete(); - if (!deleted && bufferFile.exists()) { - LOG.warn("Failed to delete buffer file {}", bufferFile); - } + LOG.debug("Block[{}]: Deleting buffer file as upload did not start", + index); + closeBlock(); } break; case Upload: - LOG.debug("Buffer file {} exists —close upload stream", bufferFile); + LOG.debug("Block[{}]: Buffer file {} exists —close upload stream", + index, bufferFile); break; case Closed: - // no-op + closeBlock(); break; default: @@ -798,7 +922,8 @@ void flush() throws IOException { @Override public String toString() { String sb = "FileBlock{" - + "destFile=" + bufferFile + + + "index=" + index + + ", destFile=" + bufferFile + ", state=" + getState() + ", dataSize=" + dataSize() + ", limit=" + limit + @@ -807,31 +932,20 @@ public String toString() { } /** - * An input stream which deletes the buffer file when closed. + * Close the block. + * This will delete the block's buffer file if the block has + * not previously been closed. */ - private final class FileDeletingInputStream extends FilterInputStream { - private final AtomicBoolean closed = new AtomicBoolean(false); - - FileDeletingInputStream(InputStream source) { - super(source); - } - - /** - * Delete the input file when closed. - * @throws IOException IO problem - */ - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - if (!closed.getAndSet(true)) { - if (!bufferFile.delete()) { - LOG.warn("delete({}) returned false", - bufferFile.getAbsoluteFile()); - } - } + void closeBlock() { + LOG.debug("block[{}]: closeBlock()", index); + if (!closed.getAndSet(true)) { + blockReleased(); + if (!bufferFile.delete() && bufferFile.exists()) { + LOG.warn("delete({}) returned false", + bufferFile.getAbsoluteFile()); } + } else { + LOG.debug("block[{}]: skipping re-entrant closeBlock()", index); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 75ae414ae22..c26ba60573f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1021,6 +1021,7 @@ private void deleteObjects(DeleteObjectsRequest deleteRequest) { */ public PutObjectRequest newPutObjectRequest(String key, ObjectMetadata metadata, File srcfile) { + Preconditions.checkNotNull(srcfile); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); setOptionalPutRequestParameters(putObjectRequest); @@ -1038,8 +1039,9 @@ public PutObjectRequest newPutObjectRequest(String key, * @param inputStream source data. * @return the request */ - PutObjectRequest newPutObjectRequest(String key, + private PutObjectRequest newPutObjectRequest(String key, ObjectMetadata metadata, InputStream inputStream) { + Preconditions.checkNotNull(inputStream); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, inputStream, metadata); setOptionalPutRequestParameters(putObjectRequest); @@ -1076,12 +1078,16 @@ public ObjectMetadata newObjectMetadata(long length) { } /** - * PUT an object, incrementing the put requests and put bytes + * Start a transfer-manager managed async PUT of an object, + * incrementing the put requests and put bytes * counters. * It does not update the other counters, * as existing code does that as progress callbacks come in. * Byte length is calculated from the file length, or, if there is no * file, from the content length of the header. + * Because the operation is async, any stream supplied in the request + * must reference data (files, buffers) which stay valid until the upload + * completes. * @param putObjectRequest the request * @return the upload initiated */ @@ -1107,6 +1113,7 @@ public Upload putObject(PutObjectRequest putObjectRequest) { * PUT an object directly (i.e. not via the transfer manager). * Byte length is calculated from the file length, or, if there is no * file, from the content length of the header. + * Important: this call will close any input stream in the request. * @param putObjectRequest the request * @return the upload initiated * @throws AmazonClientException on problems @@ -1132,7 +1139,8 @@ public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) /** * Upload part of a multi-partition file. - * Increments the write and put counters + * Increments the write and put counters. + * Important: this call does not close any input stream in the request. * @param request request * @return the result of the operation. * @throws AmazonClientException on problems @@ -2306,14 +2314,28 @@ private WriteOperationHelper(String key) { /** * Create a {@link PutObjectRequest} request. - * The metadata is assumed to have been configured with the size of the - * operation. + * If {@code length} is set, the metadata is configured with the size of + * the upload. * @param inputStream source data. * @param length size, if known. Use -1 for not known * @return the request */ PutObjectRequest newPutRequest(InputStream inputStream, long length) { - return newPutObjectRequest(key, newObjectMetadata(length), inputStream); + PutObjectRequest request = newPutObjectRequest(key, + newObjectMetadata(length), inputStream); + return request; + } + + /** + * Create a {@link PutObjectRequest} request to upload a file. + * @param sourceFile source file + * @return the request + */ + PutObjectRequest newPutRequest(File sourceFile) { + int length = (int) sourceFile.length(); + PutObjectRequest request = newPutObjectRequest(key, + newObjectMetadata(length), sourceFile); + return request; } /** @@ -2376,6 +2398,8 @@ CompleteMultipartUploadResult completeMultipartUpload(String uploadId, Preconditions.checkNotNull(partETags); Preconditions.checkArgument(!partETags.isEmpty(), "No partitions have been uploaded"); + LOG.debug("Completing multipart upload {} with {} parts", + uploadId, partETags.size()); return s3.completeMultipartUpload( new CompleteMultipartUploadRequest(bucket, key, @@ -2386,42 +2410,51 @@ CompleteMultipartUploadResult completeMultipartUpload(String uploadId, /** * Abort a multipart upload operation. * @param uploadId multipart operation Id - * @return the result * @throws AmazonClientException on problems. */ void abortMultipartUpload(String uploadId) throws AmazonClientException { + LOG.debug("Aborting multipart upload {}", uploadId); s3.abortMultipartUpload( new AbortMultipartUploadRequest(bucket, key, uploadId)); } /** * Create and initialize a part request of a multipart upload. + * Exactly one of: {@code uploadStream} or {@code sourceFile} + * must be specified. * @param uploadId ID of ongoing upload - * @param uploadStream source of data to upload * @param partNumber current part number of the upload * @param size amount of data + * @param uploadStream source of data to upload + * @param sourceFile optional source file. * @return the request. */ UploadPartRequest newUploadPartRequest(String uploadId, - InputStream uploadStream, - int partNumber, - int size) { + int partNumber, int size, InputStream uploadStream, File sourceFile) { Preconditions.checkNotNull(uploadId); - Preconditions.checkNotNull(uploadStream); + // exactly one source must be set; xor verifies this + Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null), + "Data source"); Preconditions.checkArgument(size > 0, "Invalid partition size %s", size); - Preconditions.checkArgument(partNumber> 0 && partNumber <=10000, + Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, "partNumber must be between 1 and 10000 inclusive, but is %s", partNumber); LOG.debug("Creating part upload request for {} #{} size {}", uploadId, partNumber, size); - return new UploadPartRequest() + UploadPartRequest request = new UploadPartRequest() .withBucketName(bucket) .withKey(key) .withUploadId(uploadId) - .withInputStream(uploadStream) .withPartNumber(partNumber) .withPartSize(size); + if (uploadStream != null) { + // there's an upload stream. Bind to it. + request.setInputStream(uploadStream); + } else { + request.setFile(sourceFile); + } + return request; } /** @@ -2436,6 +2469,21 @@ public String toString() { sb.append('}'); return sb.toString(); } + + /** + * PUT an object directly (i.e. not via the transfer manager). + * @param putObjectRequest the request + * @return the upload initiated + * @throws IOException on problems + */ + PutObjectResult putObject(PutObjectRequest putObjectRequest) + throws IOException { + try { + return putObjectDirect(putObjectRequest); + } catch (AmazonClientException e) { + throw translateException("put", putObjectRequest.getKey(), e); + } + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index fb8c85239d2..d2e7a88ca56 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -428,7 +429,7 @@ public void decrementGauge(Statistic op, long count) { if (gauge != null) { gauge.decr(count); } else { - LOG.debug("No Gauge: " + op); + LOG.debug("No Gauge: {}", op); } } @@ -676,6 +677,8 @@ public final class OutputStreamStatistics implements Closeable { private final AtomicLong transferDuration = new AtomicLong(0); private final AtomicLong queueDuration = new AtomicLong(0); private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); + private final AtomicInteger blocksAllocated = new AtomicInteger(0); + private final AtomicInteger blocksReleased = new AtomicInteger(0); private Statistics statistics; @@ -683,6 +686,20 @@ public OutputStreamStatistics(Statistics statistics){ this.statistics = statistics; } + /** + * A block has been allocated. + */ + void blockAllocated() { + blocksAllocated.incrementAndGet(); + } + + /** + * A block has been released. + */ + void blockReleased() { + blocksReleased.incrementAndGet(); + } + /** * Block is queued for upload. */ @@ -778,6 +795,24 @@ long totalUploadDuration() { return queueDuration.get() + transferDuration.get(); } + public int blocksAllocated() { + return blocksAllocated.get(); + } + + public int blocksReleased() { + return blocksReleased.get(); + } + + /** + * Get counters of blocks actively allocated; my be inaccurate + * if the numbers change during the (non-synchronized) calculation. + * @return the number of actively allocated blocks. + */ + public int blocksActivelyAllocated() { + return blocksAllocated.get() - blocksReleased.get(); + } + + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -789,6 +824,9 @@ public String toString() { sb.append(", blockUploadsFailed=").append(blockUploadsFailed); sb.append(", bytesPendingUpload=").append(bytesPendingUpload); sb.append(", bytesUploaded=").append(bytesUploaded); + sb.append(", blocksAllocated=").append(blocksAllocated); + sb.append(", blocksReleased=").append(blocksReleased); + sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated()); sb.append(", exceptionsInMultipartFinalize=").append( exceptionsInMultipartFinalize); sb.append(", transferDuration=").append(transferDuration).append(" ms"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 53112118bf1..84f3c9964e2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -733,4 +733,30 @@ static String getServerSideEncryptionKey(Configuration conf) { } return null; } + + /** + * Close the Closeable objects and ignore any Exception or + * null pointers. + * (This is the SLF4J equivalent of that in {@code IOUtils}). + * @param log the log to log at debug level. Can be null. + * @param closeables the objects to close + */ + public static void closeAll(Logger log, + java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + if (log != null) { + log.debug("Closing {}", c); + } + c.close(); + } catch (Exception e) { + if (log != null && log.isDebugEnabled()) { + log.debug("Exception in closing {}", c, e); + } + } + } + } + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java index 74cad00e0fd..87f676c3c84 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java @@ -24,9 +24,12 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.io.IOUtils; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.io.InputStream; +import java.net.URI; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -38,6 +41,14 @@ * multipart tests are kept in scale tests. */ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase { + private static final int BLOCK_SIZE = 256 * 1024; + + private static byte[] dataset; + + @BeforeClass + public static void setupDataset() { + dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256); + } @Override protected Configuration createConfiguration() { @@ -65,9 +76,9 @@ public void testRegularUpload() throws IOException { } @Test(expected = IOException.class) - public void testDoubleStreamClose() throws Throwable { - Path dest = path("testDoubleStreamClose"); - describe(" testDoubleStreamClose"); + public void testWriteAfterStreamClose() throws Throwable { + Path dest = path("testWriteAfterStreamClose"); + describe(" testWriteAfterStreamClose"); FSDataOutputStream stream = getFileSystem().create(dest, true); byte[] data = ContractTestUtils.dataset(16, 'a', 26); try { @@ -79,7 +90,25 @@ public void testDoubleStreamClose() throws Throwable { } } - public void verifyUpload(String name, int fileSize) throws IOException { + @Test + public void testBlocksClosed() throws Throwable { + Path dest = path("testBlocksClosed"); + describe(" testBlocksClosed"); + FSDataOutputStream stream = getFileSystem().create(dest, true); + S3AInstrumentation.OutputStreamStatistics statistics + = S3ATestUtils.getOutputStreamStatistics(stream); + byte[] data = ContractTestUtils.dataset(16, 'a', 26); + stream.write(data); + LOG.info("closing output stream"); + stream.close(); + assertEquals("total allocated blocks in " + statistics, + 1, statistics.blocksAllocated()); + assertEquals("actively allocated blocks in " + statistics, + 0, statistics.blocksActivelyAllocated()); + LOG.info("end of test case"); + } + + private void verifyUpload(String name, int fileSize) throws IOException { Path dest = path(name); describe(name + " upload to " + dest); ContractTestUtils.createAndVerifyFile( @@ -87,4 +116,43 @@ public void verifyUpload(String name, int fileSize) throws IOException { dest, fileSize); } + + /** + * Create a factory for used in mark/reset tests. + * @param fileSystem source FS + * @return the factory + */ + protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) { + return new S3ADataBlocks.ArrayBlockFactory(fileSystem); + } + + private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory) + throws Exception { + S3AInstrumentation instrumentation = + new S3AInstrumentation(new URI("s3a://example")); + S3AInstrumentation.OutputStreamStatistics outstats + = instrumentation.newOutputStreamStatistics(null); + S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats); + block.write(dataset, 0, dataset.length); + S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); + InputStream stream = uploadData.getUploadStream(); + assertNotNull(stream); + assertTrue("Mark not supported in " + stream, stream.markSupported()); + assertEquals(0, stream.read()); + stream.mark(BLOCK_SIZE); + // read a lot + long l = 0; + while (stream.read() != -1) { + // do nothing + l++; + } + stream.reset(); + assertEquals(1, stream.read()); + } + + @Test + public void testMarkReset() throws Throwable { + markAndResetDatablock(createFactory(getFileSystem())); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java index 504426b9107..02f3de094fb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.fs.s3a; - /** * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering. */ @@ -27,4 +26,8 @@ protected String getBlockOutputBufferName() { return Constants.FAST_UPLOAD_BYTEBUFFER; } + protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) { + return new S3ADataBlocks.ByteBufferBlockFactory(fileSystem); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java index 550706dff4e..abe8656be5f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.s3a; +import org.junit.Assume; + /** * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering. */ @@ -27,4 +29,14 @@ protected String getBlockOutputBufferName() { return Constants.FAST_UPLOAD_BUFFER_DISK; } + /** + * The disk stream doesn't support mark/reset; calls + * {@code Assume} to skip the test. + * @param fileSystem source FS + * @return null + */ + protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) { + Assume.assumeTrue("mark/reset nopt supoprted", false); + return null; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 567bacb1f61..95289674dc9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.junit.Assert; @@ -544,4 +545,16 @@ public static void assume(String message, boolean condition) { } Assume.assumeTrue(message, condition); } + + /** + * Get the statistics from a wrapped block output stream. + * @param out output stream + * @return the (active) stats of the write + */ + public static S3AInstrumentation.OutputStreamStatistics + getOutputStreamStatistics(FSDataOutputStream out) { + S3ABlockOutputStream blockOutputStream + = (S3ABlockOutputStream) out.getWrappedStream(); + return blockOutputStream.getStatistics(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java index 9fa95fd467f..700ef5ced3d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java @@ -51,9 +51,8 @@ public void testByteBufferIO() throws Throwable { new S3ADataBlocks.ByteBufferBlockFactory(null)) { int limit = 128; S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block - = factory.create(limit); - assertEquals("outstanding buffers in " + factory, - 1, factory.getOutstandingBufferCount()); + = factory.create(1, limit, null); + assertOutstandingBuffers(factory, 1); byte[] buffer = ContractTestUtils.toAsciiByteArray("test data"); int bufferLen = buffer.length; @@ -66,24 +65,23 @@ public void testByteBufferIO() throws Throwable { block.hasCapacity(limit - bufferLen)); // now start the write - S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream - stream = block.startUpload(); + S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload(); + S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream + stream = + (S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream) + blockUploadData.getUploadStream(); + assertTrue("Mark not supported in " + stream, stream.markSupported()); assertTrue("!hasRemaining() in " + stream, stream.hasRemaining()); int expected = bufferLen; assertEquals("wrong available() in " + stream, expected, stream.available()); assertEquals('t', stream.read()); + stream.mark(limit); expected--; assertEquals("wrong available() in " + stream, expected, stream.available()); - // close the block. The buffer must remain outstanding here; - // the stream manages the lifecycle of it now - block.close(); - assertEquals("outstanding buffers in " + factory, - 1, factory.getOutstandingBufferCount()); - block.close(); // read into a byte array with an offset int offset = 5; @@ -109,16 +107,31 @@ public void testByteBufferIO() throws Throwable { 0, stream.available()); assertTrue("hasRemaining() in " + stream, !stream.hasRemaining()); + // go the mark point + stream.reset(); + assertEquals('e', stream.read()); + // when the stream is closed, the data should be returned stream.close(); - assertEquals("outstanding buffers in " + factory, - 0, factory.getOutstandingBufferCount()); + assertOutstandingBuffers(factory, 1); + block.close(); + assertOutstandingBuffers(factory, 0); stream.close(); - assertEquals("outstanding buffers in " + factory, - 0, factory.getOutstandingBufferCount()); - + assertOutstandingBuffers(factory, 0); } } + /** + * Assert the number of buffers active for a block factory. + * @param factory factory + * @param expectedCount expected count. + */ + private static void assertOutstandingBuffers( + S3ADataBlocks.ByteBufferBlockFactory factory, + int expectedCount) { + assertEquals("outstanding buffers in " + factory, + expectedCount, factory.getOutstandingBufferCount()); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index fcb6444349c..89fae822d83 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -34,11 +34,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.util.Progressable; @@ -159,13 +161,20 @@ public void test_010_CreateHugeFile() throws IOException { Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING; ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - + S3AInstrumentation.OutputStreamStatistics streamStatistics; long blocksPer10MB = blocksPerMB * 10; ProgressCallback progress = new ProgressCallback(timer); try (FSDataOutputStream out = fs.create(hugefile, true, uploadBlockSize, progress)) { + try { + streamStatistics = getOutputStreamStatistics(out); + } catch (ClassCastException e) { + LOG.info("Wrapped output stream is not block stream: {}", + out.getWrappedStream()); + streamStatistics = null; + } for (long block = 1; block <= blocks; block++) { out.write(data); @@ -190,7 +199,8 @@ public void test_010_CreateHugeFile() throws IOException { } } // now close the file - LOG.info("Closing file and completing write operation"); + LOG.info("Closing stream {}", out); + LOG.info("Statistics : {}", streamStatistics); ContractTestUtils.NanoTimer closeTimer = new ContractTestUtils.NanoTimer(); out.close(); @@ -201,6 +211,7 @@ public void test_010_CreateHugeFile() throws IOException { filesizeMB, uploadBlockSize); logFSState(); bandwidth(timer, filesize); + LOG.info("Statistics after stream closed: {}", streamStatistics); long putRequestCount = storageStatistics.getLong(putRequests); Long putByteCount = storageStatistics.getLong(putBytes); LOG.info("PUT {} bytes in {} operations; {} MB/operation", @@ -214,7 +225,14 @@ public void test_010_CreateHugeFile() throws IOException { S3AFileStatus status = fs.getFileStatus(hugefile); ContractTestUtils.assertIsFile(hugefile, status); assertEquals("File size in " + status, filesize, status.getLen()); - progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize); + if (progress != null) { + progress.verifyNoFailures("Put file " + hugefile + + " of size " + filesize); + } + if (streamStatistics != null) { + assertEquals("actively allocated blocks in " + streamStatistics, + 0, streamStatistics.blocksActivelyAllocated()); + } } /** @@ -285,7 +303,9 @@ private void verifyNoFailures(String operation) { void assumeHugeFileExists() throws IOException { S3AFileSystem fs = getFileSystem(); ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); - ContractTestUtils.assertIsFile(fs, hugefile); + FileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertTrue("File " + hugefile + " is empty", status.getLen() > 0); } private void logFSState() {